本文整理了Java中io.reactivex.Flowable.doFinally()
方法的一些代码示例,展示了Flowable.doFinally()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doFinally()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doFinally
[英]Calls the specified action after this Flowable signals onError or onCompleted or gets canceled by the downstream.
In case of a race between a terminal event and a cancellation, the provided onFinally action is executed once per subscription.
Note that the onFinally action is shared between subscriptions and as such should be thread-safe. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doFinally does not operate by default on a particular Scheduler. Operator-fusion: This operator supports normal and conditional Subscribers as well as boundary-limited synchronous or asynchronous queue-fusion.
History: 2.0.1 - experimental
[中]在此可流动信号发出onError或onCompleted或被下游取消后调用指定的操作。
如果终端事件和取消之间存在竞争,则提供的onFinally操作将在每个订阅中执行一次。
请注意,onFinally操作在订阅之间共享,因此应该是线程安全的。背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doFinally默认情况下不会在特定的计划程序上运行。运算符融合:此运算符支持普通和条件订阅服务器以及边界限制的同步或异步队列融合。
历史:2.0.1-实验性
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
return f.doFinally(FlowableDoFinallyTest.this);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).doFinally(Functions.EMPTY_ACTION)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
return f.doFinally(FlowableDoFinallyTest.this).filter(Functions.alwaysTrue());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void nullAction() {
Flowable.just(1).doFinally(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalEmpty() {
Flowable.empty()
.doFinally(this)
.test()
.assertResult();
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalJust() {
Flowable.just(1)
.doFinally(this)
.test()
.assertResult(1);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalTake() {
Flowable.range(1, 10)
.doFinally(this)
.take(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalError() {
Flowable.error(new TestException())
.doFinally(this)
.test()
.assertFailure(TestException.class);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalEmptyConditional() {
Flowable.empty()
.doFinally(this)
.filter(Functions.alwaysTrue())
.test()
.assertResult();
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalJustConditional() {
Flowable.just(1)
.doFinally(this)
.filter(Functions.alwaysTrue())
.test()
.assertResult(1);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5)
.doFinally(this)
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedBoundary() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC | QueueFuseable.BOUNDARY);
Flowable.range(1, 5)
.doFinally(this)
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nonFused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5).hide()
.doFinally(this)
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalErrorConditional() {
Flowable.error(new TestException())
.doFinally(this)
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalTakeConditional() {
Flowable.range(1, 10)
.doFinally(this)
.filter(Functions.alwaysTrue())
.take(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedBoundaryConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC | QueueFuseable.BOUNDARY);
Flowable.range(1, 5)
.doFinally(this)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5)
.doFinally(this)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void actionThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doFinally(new Action() {
@Override
public void run() throws Exception {
throw new TestException();
}
})
.test()
.assertResult(1)
.cancel();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nonFusedConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5).hide()
.doFinally(this)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void actionThrowsConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doFinally(new Action() {
@Override
public void run() throws Exception {
throw new TestException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertResult(1)
.cancel();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!