io.reactivex.Flowable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(336)

本文整理了Java中io.reactivex.Flowable.doOnNext()方法的一些代码示例,展示了Flowable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnNext()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnNext

Flowable.doOnNext介绍

[英]Modifies the source Publisher so that it invokes an action when it calls onNext.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onNext时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doOnNext默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnNextNull() {
  3. just1.doOnNext(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<List<Integer>> apply(List<Integer> v)
  3. throws Exception {
  4. return Flowable.just(v)
  5. .subscribeOn(Schedulers.io())
  6. .doOnNext(new Consumer<List<Integer>>() {
  7. @Override
  8. public void accept(List<Integer> v)
  9. throws Exception {
  10. Thread.sleep(new Random().nextInt(20));
  11. }
  12. });
  13. }
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).doOnNext(Functions.emptyConsumer())
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Integer> apply(Integer t) {
  3. return Flowable.range(1, Flowable.bufferSize() * 2)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer t) {
  7. count.getAndIncrement();
  8. }
  9. }).hide();
  10. }
  11. }).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEach() {
  3. final AtomicReference<String> r = new AtomicReference<String>();
  4. String output = Flowable.just("one").doOnNext(new Consumer<String>() {
  5. @Override
  6. public void accept(String v) {
  7. r.set(v);
  8. }
  9. }).blockingSingle();
  10. assertEquals("one", output);
  11. assertEquals("one", r.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
  3. final AtomicInteger upstreamCount = new AtomicInteger();
  4. final int num = 10;
  5. long count = Flowable.range(1, num).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. upstreamCount.incrementAndGet();
  9. }})
  10. .takeLast(0).count().blockingGet();
  11. assertEquals(num, upstreamCount.get());
  12. assertEquals(0L, count);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUpstreamIsProcessedButIgnored() {
  3. final int num = 10;
  4. final AtomicInteger upstreamCount = new AtomicInteger();
  5. Object count = Flowable.range(1, num)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer t) {
  9. upstreamCount.incrementAndGet();
  10. }
  11. })
  12. .ignoreElements()
  13. .blockingGet();
  14. assertEquals(num, upstreamCount.get());
  15. assertNull(count);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchOnNextPrefetch() {
  3. final List<Integer> list = new ArrayList<Integer>();
  4. Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. list.add(v);
  8. }
  9. });
  10. Flowable.switchOnNext(Flowable.just(source).hide(), 2)
  11. .test(1);
  12. assertEquals(Arrays.asList(1, 2, 3), list);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchOnNextDelayError() {
  3. final List<Integer> list = new ArrayList<Integer>();
  4. Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. list.add(v);
  8. }
  9. });
  10. Flowable.switchOnNextDelayError(Flowable.just(source).hide())
  11. .test(1);
  12. assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchOnNextDelayErrorPrefetch() {
  3. final List<Integer> list = new ArrayList<Integer>();
  4. Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. list.add(v);
  8. }
  9. });
  10. Flowable.switchOnNextDelayError(Flowable.just(source).hide(), 2)
  11. .test(1);
  12. assertEquals(Arrays.asList(1, 2, 3), list);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUpstreamIsProcessedButIgnoredFlowable() {
  3. final int num = 10;
  4. final AtomicInteger upstreamCount = new AtomicInteger();
  5. long count = Flowable.range(1, num)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer t) {
  9. upstreamCount.incrementAndGet();
  10. }
  11. })
  12. .ignoreElements()
  13. .toFlowable()
  14. .count().blockingGet();
  15. assertEquals(num, upstreamCount.get());
  16. assertEquals(0, count);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithCompletionCausingError() {
  3. TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>();
  4. final RuntimeException ex = new RuntimeException("boo");
  5. Flowable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
  6. @Override
  7. public void accept(Object t) {
  8. throw ex;
  9. }
  10. }).subscribe(ts);
  11. ts.assertError(ex);
  12. ts.assertNoValues();
  13. ts.assertTerminated();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWindowUnsubscribeOverlapping() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. final AtomicInteger count = new AtomicInteger();
  5. Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t1) {
  8. count.incrementAndGet();
  9. }
  10. }).window(5, 4).take(2)).subscribe(ts);
  11. ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  12. ts.assertTerminated();
  13. // System.out.println(ts.getOnNextEvents());
  14. ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
  15. assertEquals(9, count.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWindowUnsubscribeNonOverlapping() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. final AtomicInteger count = new AtomicInteger();
  5. Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t1) {
  8. count.incrementAndGet();
  9. }
  10. }).window(5).take(2)).subscribe(ts);
  11. ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  12. ts.assertTerminated();
  13. ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  14. // System.out.println(ts.getOnNextEvents());
  15. assertEquals(10, count.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.just(1).groupBy(Functions.justFunction(1)));
  4. Flowable.just(1)
  5. .groupBy(Functions.justFunction(1))
  6. .doOnNext(new Consumer<GroupedFlowable<Integer, Integer>>() {
  7. @Override
  8. public void accept(GroupedFlowable<Integer, Integer> g) throws Exception {
  9. TestHelper.checkDisposed(g);
  10. }
  11. })
  12. .test();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testReentrantTake() {
  3. final PublishProcessor<Integer> source = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. source.take(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer v) {
  8. source.onNext(2);
  9. }
  10. }).subscribe(ts);
  11. source.onNext(1);
  12. ts.assertValue(1);
  13. ts.assertNoErrors();
  14. ts.assertComplete();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void errorSkipInner() {
  4. @SuppressWarnings("rawtypes")
  5. final TestSubscriber[] to = { null };
  6. Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  7. .window(2, 3)
  8. .doOnNext(new Consumer<Flowable<Integer>>() {
  9. @Override
  10. public void accept(Flowable<Integer> w) throws Exception {
  11. to[0] = w.test();
  12. }
  13. })
  14. .test()
  15. .assertError(TestException.class);
  16. to[0].assertFailure(TestException.class, 1);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void errorExactInner() {
  4. @SuppressWarnings("rawtypes")
  5. final TestSubscriber[] to = { null };
  6. Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  7. .window(2)
  8. .doOnNext(new Consumer<Flowable<Integer>>() {
  9. @Override
  10. public void accept(Flowable<Integer> w) throws Exception {
  11. to[0] = w.test();
  12. }
  13. })
  14. .test()
  15. .assertError(TestException.class);
  16. to[0].assertFailure(TestException.class, 1);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void errorOverlapInner() {
  4. @SuppressWarnings("rawtypes")
  5. final TestSubscriber[] to = { null };
  6. Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  7. .window(3, 2)
  8. .doOnNext(new Consumer<Flowable<Integer>>() {
  9. @Override
  10. public void accept(Flowable<Integer> w) throws Exception {
  11. to[0] = w.test();
  12. }
  13. })
  14. .test()
  15. .assertError(TestException.class);
  16. to[0].assertFailure(TestException.class, 1);
  17. }
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayError() {
  3. Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new TestException()))
  4. .observeOn(Schedulers.computation(), true)
  5. .doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer v) throws Exception {
  8. if (v == 1) {
  9. Thread.sleep(100);
  10. }
  11. }
  12. })
  13. .test()
  14. .awaitDone(5, TimeUnit.SECONDS)
  15. .assertFailure(TestException.class, 1, 2, 3, 4, 5);
  16. }

相关文章

Flowable类方法