io.reactivex.Flowable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(282)

本文整理了Java中io.reactivex.Flowable.doOnNext()方法的一些代码示例,展示了Flowable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnNext()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnNext

Flowable.doOnNext介绍

[英]Modifies the source Publisher so that it invokes an action when it calls onNext.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onNext时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doOnNext默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnNextNull() {
  just1.doOnNext(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<List<Integer>> apply(List<Integer> v)
      throws Exception {
    return Flowable.just(v)
        .subscribeOn(Schedulers.io())
        .doOnNext(new Consumer<List<Integer>>() {
          @Override
          public void accept(List<Integer> v)
              throws Exception {
            Thread.sleep(new Random().nextInt(20));
          }
        });
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).doOnNext(Functions.emptyConsumer())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t) {
    return Flowable.range(1, Flowable.bufferSize() * 2)
        .doOnNext(new Consumer<Integer>() {
          @Override
          public void accept(Integer t) {
            count.getAndIncrement();
          }
        }).hide();
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEach() {
  final AtomicReference<String> r = new AtomicReference<String>();
  String output = Flowable.just("one").doOnNext(new Consumer<String>() {
    @Override
    public void accept(String v) {
      r.set(v);
    }
  }).blockingSingle();
  assertEquals("one", output);
  assertEquals("one", r.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
  final AtomicInteger upstreamCount = new AtomicInteger();
  final int num = 10;
  long count = Flowable.range(1, num).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      upstreamCount.incrementAndGet();
    }})
    .takeLast(0).count().blockingGet();
  assertEquals(num, upstreamCount.get());
  assertEquals(0L, count);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUpstreamIsProcessedButIgnored() {
  final int num = 10;
  final AtomicInteger upstreamCount = new AtomicInteger();
  Object count = Flowable.range(1, num)
      .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) {
          upstreamCount.incrementAndGet();
        }
      })
      .ignoreElements()
      .blockingGet();
  assertEquals(num, upstreamCount.get());
  assertNull(count);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextPrefetch() {
  final List<Integer> list = new ArrayList<Integer>();
  Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  });
  Flowable.switchOnNext(Flowable.just(source).hide(), 2)
  .test(1);
  assertEquals(Arrays.asList(1, 2, 3), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayError() {
  final List<Integer> list = new ArrayList<Integer>();
  Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  });
  Flowable.switchOnNextDelayError(Flowable.just(source).hide())
  .test(1);
  assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayErrorPrefetch() {
  final List<Integer> list = new ArrayList<Integer>();
  Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  });
  Flowable.switchOnNextDelayError(Flowable.just(source).hide(), 2)
  .test(1);
  assertEquals(Arrays.asList(1, 2, 3), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUpstreamIsProcessedButIgnoredFlowable() {
  final int num = 10;
  final AtomicInteger upstreamCount = new AtomicInteger();
  long count = Flowable.range(1, num)
      .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) {
          upstreamCount.incrementAndGet();
        }
      })
      .ignoreElements()
      .toFlowable()
      .count().blockingGet();
  assertEquals(num, upstreamCount.get());
  assertEquals(0, count);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithCompletionCausingError() {
  TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>();
  final RuntimeException ex = new RuntimeException("boo");
  Flowable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
    @Override
    public void accept(Object t) {
      throw ex;
    }
  }).subscribe(ts);
  ts.assertError(ex);
  ts.assertNoValues();
  ts.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWindowUnsubscribeOverlapping() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final AtomicInteger count = new AtomicInteger();
  Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      count.incrementAndGet();
    }
  }).window(5, 4).take(2)).subscribe(ts);
  ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  ts.assertTerminated();
  //        System.out.println(ts.getOnNextEvents());
  ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
  assertEquals(9, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWindowUnsubscribeNonOverlapping() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final AtomicInteger count = new AtomicInteger();
  Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      count.incrementAndGet();
    }
  }).window(5).take(2)).subscribe(ts);
  ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  ts.assertTerminated();
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  //        System.out.println(ts.getOnNextEvents());
  assertEquals(10, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1).groupBy(Functions.justFunction(1)));
  Flowable.just(1)
  .groupBy(Functions.justFunction(1))
  .doOnNext(new Consumer<GroupedFlowable<Integer, Integer>>() {
    @Override
    public void accept(GroupedFlowable<Integer, Integer> g) throws Exception {
      TestHelper.checkDisposed(g);
    }
  })
  .test();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testReentrantTake() {
  final PublishProcessor<Integer> source = PublishProcessor.create();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.take(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) {
      source.onNext(2);
    }
  }).subscribe(ts);
  source.onNext(1);
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorSkipInner() {
  @SuppressWarnings("rawtypes")
  final TestSubscriber[] to = { null };
  Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .window(2, 3)
  .doOnNext(new Consumer<Flowable<Integer>>() {
    @Override
    public void accept(Flowable<Integer> w) throws Exception {
      to[0] = w.test();
    }
  })
  .test()
  .assertError(TestException.class);
  to[0].assertFailure(TestException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorExactInner() {
  @SuppressWarnings("rawtypes")
  final TestSubscriber[] to = { null };
  Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .window(2)
  .doOnNext(new Consumer<Flowable<Integer>>() {
    @Override
    public void accept(Flowable<Integer> w) throws Exception {
      to[0] = w.test();
    }
  })
  .test()
  .assertError(TestException.class);
  to[0].assertFailure(TestException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Test
  public void errorOverlapInner() {
    @SuppressWarnings("rawtypes")
    final TestSubscriber[] to = { null };
    Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
    .window(3, 2)
    .doOnNext(new Consumer<Flowable<Integer>>() {
      @Override
      public void accept(Flowable<Integer> w) throws Exception {
        to[0] = w.test();
      }
    })
    .test()
    .assertError(TestException.class);

    to[0].assertFailure(TestException.class, 1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayError() {
  Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new TestException()))
  .observeOn(Schedulers.computation(), true)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      if (v == 1) {
        Thread.sleep(100);
      }
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class, 1, 2, 3, 4, 5);
}

相关文章

Flowable类方法