本文整理了Java中io.reactivex.Flowable.doOnComplete()
方法的一些代码示例,展示了Flowable.doOnComplete()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnComplete()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnComplete
[英]Modifies the source Publisher so that it invokes an action when it calls onComplete.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onComplete时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnComplete不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Flowable<Integer> w) {
return w.startWith(indicator)
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("inner done: " + wip.incrementAndGet());
}
})
;
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnCompleteNull() {
just1.doOnComplete(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnCompleted() {
final AtomicBoolean r = new AtomicBoolean();
String output = Flowable.just("one").doOnComplete(new Action() {
@Override
public void run() {
r.set(true);
}
}).blockingSingle();
assertEquals("one", output);
assertTrue(r.get());
}
代码示例来源:origin: ReactiveX/RxJava
private static <T> Flowable<T> composer(Flowable<T> source, final AtomicInteger subscriptionCount, final int m) {
return source.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
int n = subscriptionCount.getAndIncrement();
if (n >= m) {
Assert.fail("Too many subscriptions! " + (n + 1));
}
}
}).doOnComplete(new Action() {
@Override
public void run() {
int n = subscriptionCount.decrementAndGet();
if (n < 0) {
Assert.fail("Too many unsubscriptions! " + (n - 1));
}
}
});
}
代码示例来源:origin: ReactiveX/RxJava
.doOnNext(sourceNext)
.doOnCancel(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.replay();
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrash() {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrashConditional() {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
.doOnComplete(new Action() {
@Override
public void run() {
代码示例来源:origin: ReactiveX/RxJava
.window(300, TimeUnit.MILLISECONDS)
.take(10)
.doOnComplete(new Action() {
@Override
public void run() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnCancel(unsub)
.doOnComplete(completion);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("disposed", "completed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnCancel(unsub)
.doOnComplete(completion);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("completed", "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedOnErrorCrash() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0 };
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
})
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertFailure(TestException.class);
assertEquals(0, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
ConnectableFlowable<Integer> is = Flowable.range(1, Flowable.bufferSize() * 2).publish();
Flowable<Integer> fast = is.observeOn(Schedulers.computation())
.doOnComplete(new Action() {
@Override
public void run() {
}).doOnComplete(new Action() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedAsync() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastProcessor<Integer> up = UnicastProcessor.create();
up
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.subscribe(ts);
TestHelper.emit(up, 1, 2, 3, 4, 5);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
if (group.getKey() < 3) {
return group.map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "first groups: " + t1;
}
})
// must take(2) so an onComplete + unsubscribe happens on these first 2 groups
.take(2).doOnComplete(new Action() {
@Override
public void run() {
first.countDown();
}
});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "last group: " + t1;
}
}).doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> t1) {
System.err.println("subscribeOn notification => " + t1);
}
});
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedOnErrorCrashConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0 };
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertFailure(TestException.class);
assertEquals(0, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
Flowable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedAsyncConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastProcessor<Integer> up = UnicastProcessor.create();
up
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(ts);
TestHelper.emit(up, 1, 2, 3, 4, 5);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedAsyncConditional2() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastProcessor<Integer> up = UnicastProcessor.create();
up.hide()
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(ts);
TestHelper.emit(up, 1, 2, 3, 4, 5);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
内容来源于网络,如有侵权,请联系作者删除!