本文整理了Java中io.reactivex.Flowable.doOnCancel()
方法的一些代码示例,展示了Flowable.doOnCancel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnCancel()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnCancel
[英]Calls the cancel Action if the downstream cancels the sequence.
The action is shared between subscriptions and thus may be called concurrently from multiple threads; the action must be thread-safe.
If the action throws a runtime exception, that exception is rethrown by the onCancel() call, sometimes as a CompositeException if there were multiple exceptions along the way.
Note that terminal events trigger the action unless the Publisher is subscribed to via unsafeSubscribe().
Backpressure: doOnCancel does not interact with backpressure requests or value delivery; backpressure behavior is preserved between its upstream and its downstream. Scheduler: doOnCancel does not operate by default on a particular Scheduler.
[中]如果下游取消序列,则调用取消操作。
该操作在订阅之间共享,因此可以从多个线程并发调用;操作必须是线程安全的。
如果该操作引发运行时异常,则onCancel()调用会重新引发该异常,如果在此过程中出现多个异常,有时会将该异常作为CompositeException。
请注意,除非通过unsafeSubscribe()订阅发布服务器,否则终端事件将触发该操作。
背压:doOnCancel不与背压请求或价值交付交互;背压行为在其上游和下游之间保持不变。调度程序:默认情况下,doOnCancel不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer v) throws Exception {
return Flowable.just(v).doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.<Integer>never()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
disposed.set(true);
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnCancelNull() {
just1.doOnCancel(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnDisposeNull() {
just1.doOnCancel(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).cache();
f.subscribe();
f.subscribe();
f.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsub = new AtomicBoolean();
Flowable.range(1, 10).concatWith(Flowable.<Integer>never())
.doOnCancel(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.subscribe().dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void exactSequence() {
Flowable.range(1, 5)
.doOnRequest(this)
.doOnCancel(this)
.limit(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(2, requests.size());
assertEquals(5, requests.get(0).intValue());
assertEquals(CANCELLED, requests.get(1));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
f.subscribe();
f.subscribe();
f.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeat() {
final AtomicInteger counter = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.repeat(5)
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCancelPreviousArray() {
final AtomicInteger counter = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
Flowable.concatArray(source, source, source, source, source)
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() throws Exception {
Action onCancel = mock(Action.class);
Flowable.range(1, 5)
.doOnCancel(onCancel)
.throttleLatest(1, TimeUnit.MINUTES)
.take(1)
.test()
.assertResult(1);
verify(onCancel).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void missingBackpressureExceptionFirst() throws Exception {
TestScheduler sch = new TestScheduler();
Action onCancel = mock(Action.class);
Flowable.just(1, 2)
.doOnCancel(onCancel)
.throttleLatest(1, TimeUnit.MINUTES, sch)
.test(0)
.assertFailure(MissingBackpressureException.class);
verify(onCancel).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public void onNext(Integer t) {
if (valueCount() == 2) {
source.doOnCancel(new Action() {
@Override
public void run() {
child2Unsubscribed.set(true);
}
}).take(5).subscribe(ts2);
}
super.onNext(t);
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstreamFlowable() {
final AtomicBoolean unsub = new AtomicBoolean();
Flowable.range(1, 10).concatWith(Flowable.<Integer>never())
.doOnCancel(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.toFlowable()
.subscribe().dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void limitZero() {
Flowable.range(1, 5)
.doOnCancel(this)
.doOnRequest(this)
.limit(0)
.test()
.assertResult();
assertEquals(1, requests.size());
assertEquals(CANCELLED, requests.get(0));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
TestSubscriber<String> subscriber = TestSubscriber.create();
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Single<String> single = Flowable.just("Hello World!").doOnCancel(new Action() {
@Override
public void run() {
unsubscribed.set(true);
}}).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertComplete();
Assert.assertFalse(unsubscribed.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
TestSubscriber<String> subscriber = TestSubscriber.create();
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Completable cmp = Flowable.just("Hello World!").doOnCancel(new Action() {
@Override
public void run() {
unsubscribed.set(true);
}}).ignoreElements();
cmp.<String>toFlowable().subscribe(subscriber);
subscriber.assertComplete();
assertFalse(unsubscribed.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void limitAndTake() {
Flowable.range(1, 5)
.doOnCancel(this)
.doOnRequest(this)
.limit(6)
.take(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(6L, CANCELLED), requests);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void missingBackpressureExceptionLatest() throws Exception {
TestScheduler sch = new TestScheduler();
Action onCancel = mock(Action.class);
TestSubscriber<Integer> ts = Flowable.just(1, 2)
.concatWith(Flowable.<Integer>never())
.doOnCancel(onCancel)
.throttleLatest(1, TimeUnit.SECONDS, sch, true)
.test(1);
sch.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertFailure(MissingBackpressureException.class, 1);
verify(onCancel).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
final int[] calls = { 0 };
Flowable.error(new TestException())
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.unsubscribeOn(Schedulers.single())
.test()
.assertFailure(TestException.class);
assertEquals(0, calls[0]);
}
内容来源于网络,如有侵权,请联系作者删除!