io.reactivex.Flowable.doOnComplete()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(390)

本文整理了Java中io.reactivex.Flowable.doOnComplete()方法的一些代码示例,展示了Flowable.doOnComplete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnComplete()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnComplete

Flowable.doOnComplete介绍

[英]Modifies the source Publisher so that it invokes an action when it calls onComplete.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onComplete时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnComplete不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Integer> apply(Flowable<Integer> w) {
  3. return w.startWith(indicator)
  4. .doOnComplete(new Action() {
  5. @Override
  6. public void run() {
  7. System.out.println("inner done: " + wip.incrementAndGet());
  8. }
  9. })
  10. ;
  11. }
  12. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnCompleteNull() {
  3. just1.doOnComplete(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnCompleted() {
  3. final AtomicBoolean r = new AtomicBoolean();
  4. String output = Flowable.just("one").doOnComplete(new Action() {
  5. @Override
  6. public void run() {
  7. r.set(true);
  8. }
  9. }).blockingSingle();
  10. assertEquals("one", output);
  11. assertTrue(r.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. private static <T> Flowable<T> composer(Flowable<T> source, final AtomicInteger subscriptionCount, final int m) {
  2. return source.doOnSubscribe(new Consumer<Subscription>() {
  3. @Override
  4. public void accept(Subscription s) {
  5. int n = subscriptionCount.getAndIncrement();
  6. if (n >= m) {
  7. Assert.fail("Too many subscriptions! " + (n + 1));
  8. }
  9. }
  10. }).doOnComplete(new Action() {
  11. @Override
  12. public void run() {
  13. int n = subscriptionCount.decrementAndGet();
  14. if (n < 0) {
  15. Assert.fail("Too many unsubscriptions! " + (n - 1));
  16. }
  17. }
  18. });
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnNext(sourceNext)
  2. .doOnCancel(sourceUnsubscribed)
  3. .doOnComplete(sourceCompleted)
  4. .replay();

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrash() {
  3. Flowable.fromPublisher(new Publisher<Object>() {
  4. @Override
  5. public void subscribe(Subscriber<? super Object> s) {
  6. s.onSubscribe(new BooleanSubscription());
  7. s.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .test()
  17. .assertFailure(IOException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrashConditional() {
  3. Flowable.fromPublisher(new Publisher<Object>() {
  4. @Override
  5. public void subscribe(Subscriber<? super Object> s) {
  6. s.onSubscribe(new BooleanSubscription());
  7. s.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .filter(Functions.alwaysTrue())
  17. .test()
  18. .assertFailure(IOException.class);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnComplete(new Action() {
  2. @Override
  3. public void run() {

代码示例来源:origin: ReactiveX/RxJava

  1. .window(300, TimeUnit.MILLISECONDS)
  2. .take(10)
  3. .doOnComplete(new Action() {
  4. @Override
  5. public void run() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  14. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), true)
  16. .doOnCancel(unsub)
  17. .doOnComplete(completion);
  18. flowable.safeSubscribe(subscriber);
  19. assertEquals(Arrays.asList("disposed", "completed"), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  14. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), false)
  16. .doOnCancel(unsub)
  17. .doOnComplete(completion);
  18. flowable.safeSubscribe(subscriber);
  19. assertEquals(Arrays.asList("completed", "disposed"), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedOnErrorCrash() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0 };
  5. Flowable.range(1, 5)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer v) throws Exception {
  9. throw new TestException();
  10. }
  11. })
  12. .doOnComplete(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. call[0]++;
  16. }
  17. })
  18. .subscribe(ts);
  19. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  20. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  21. .assertFailure(TestException.class);
  22. assertEquals(0, call[0]);
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fused() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0, 0 };
  5. Flowable.range(1, 5)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer v) throws Exception {
  9. call[0]++;
  10. }
  11. })
  12. .doOnComplete(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. call[1]++;
  16. }
  17. })
  18. .subscribe(ts);
  19. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  20. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  21. .assertResult(1, 2, 3, 4, 5);
  22. assertEquals(5, call[0]);
  23. assertEquals(1, call[1]);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. ConnectableFlowable<Integer> is = Flowable.range(1, Flowable.bufferSize() * 2).publish();
  2. Flowable<Integer> fast = is.observeOn(Schedulers.computation())
  3. .doOnComplete(new Action() {
  4. @Override
  5. public void run() {
  6. }).doOnComplete(new Action() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedAsync() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0, 0 };
  5. UnicastProcessor<Integer> up = UnicastProcessor.create();
  6. up
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. call[0]++;
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[1]++;
  17. }
  18. })
  19. .subscribe(ts);
  20. TestHelper.emit(up, 1, 2, 3, 4, 5);
  21. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  22. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  23. .assertResult(1, 2, 3, 4, 5);
  24. assertEquals(5, call[0]);
  25. assertEquals(1, call[1]);
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
  3. if (group.getKey() < 3) {
  4. return group.map(new Function<Integer, String>() {
  5. @Override
  6. public String apply(Integer t1) {
  7. return "first groups: " + t1;
  8. }
  9. })
  10. // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
  11. .take(2).doOnComplete(new Action() {
  12. @Override
  13. public void run() {
  14. first.countDown();
  15. }
  16. });
  17. } else {
  18. return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
  19. @Override
  20. public String apply(Integer t1) {
  21. return "last group: " + t1;
  22. }
  23. }).doOnEach(new Consumer<Notification<String>>() {
  24. @Override
  25. public void accept(Notification<String> t1) {
  26. System.err.println("subscribeOn notification => " + t1);
  27. }
  28. });
  29. }
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedOnErrorCrashConditional() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0 };
  5. Flowable.range(1, 5)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer v) throws Exception {
  9. throw new TestException();
  10. }
  11. })
  12. .doOnComplete(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. call[0]++;
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .subscribe(ts);
  20. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  21. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  22. .assertFailure(TestException.class);
  23. assertEquals(0, call[0]);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedConditional() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0, 0 };
  5. Flowable.range(1, 5)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer v) throws Exception {
  9. call[0]++;
  10. }
  11. })
  12. .doOnComplete(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. call[1]++;
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .subscribe(ts);
  20. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  21. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  22. .assertResult(1, 2, 3, 4, 5);
  23. assertEquals(5, call[0]);
  24. assertEquals(1, call[1]);
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedAsyncConditional() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0, 0 };
  5. UnicastProcessor<Integer> up = UnicastProcessor.create();
  6. up
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. call[0]++;
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[1]++;
  17. }
  18. })
  19. .filter(Functions.alwaysTrue())
  20. .subscribe(ts);
  21. TestHelper.emit(up, 1, 2, 3, 4, 5);
  22. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  23. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  24. .assertResult(1, 2, 3, 4, 5);
  25. assertEquals(5, call[0]);
  26. assertEquals(1, call[1]);
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedAsyncConditional2() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. final int[] call = { 0, 0 };
  5. UnicastProcessor<Integer> up = UnicastProcessor.create();
  6. up.hide()
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. call[0]++;
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[1]++;
  17. }
  18. })
  19. .filter(Functions.alwaysTrue())
  20. .subscribe(ts);
  21. TestHelper.emit(up, 1, 2, 3, 4, 5);
  22. ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  23. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  24. .assertResult(1, 2, 3, 4, 5);
  25. assertEquals(5, call[0]);
  26. assertEquals(1, call[1]);
  27. }

相关文章

Flowable类方法