io.reactivex.Flowable.range()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.5k)|赞(0)|评价(0)|浏览(245)

本文整理了Java中io.reactivex.Flowable.range()方法的一些代码示例,展示了Flowable.range()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.range()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:range

Flowable.range介绍

[英]Returns a Flowable that emits a sequence of Integers within a specified range.

Backpressure: The operator honors backpressure from downstream and signals values on-demand (i.e., when requested). Scheduler: range does not operate by default on a particular Scheduler.
[中]返回在指定范围内发出整数序列的可流动项。
背压:操作员接受来自下游的背压,并按需(即,当要求时)发送信号。调度程序:默认情况下,范围不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> apply(Integer t) {
  3. return Flowable.range(t, 2);
  4. }
  5. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Integer> apply(Integer v) {
  3. return Flowable.range(v, 2);
  4. }
  5. }).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).timeout(1, TimeUnit.DAYS)
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).onBackpressureBuffer()
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).publish().autoConnect()
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(final long elements) {
  3. return
  4. Flowable.range(1, 1000).reduce(new BiFunction<Integer, Integer, Integer>() {
  5. @Override
  6. public Integer apply(Integer a, Integer b) throws Exception {
  7. return a + b;
  8. }
  9. }).toFlowable()
  10. ;
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Boolean> createPublisher(final long elements) {
  3. return
  4. Flowable.range(1, 1000).any(new Predicate<Integer>() {
  5. @Override
  6. public boolean test(Integer e) throws Exception {
  7. return e == 500;
  8. }
  9. }).toFlowable()
  10. ;
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).doAfterNext(Functions.emptyConsumer())
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void bufferTimeSkipDefault() {
  4. Flowable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
  5. .test()
  6. .assertResult(Arrays.asList(1, 2, 3, 4, 5));
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Integer> apply(Integer t) {
  3. return Flowable.range(1, Flowable.bufferSize() * 2)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer t) {
  7. count.getAndIncrement();
  8. }
  9. }).hide();
  10. }
  11. }).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void range() {
  3. Flowable.range(1, 5)
  4. .throttleLatest(1, TimeUnit.MINUTES)
  5. .test()
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeEmitLatest() {
  3. Flowable.range(1, 5)
  4. .throttleLatest(1, TimeUnit.MINUTES, true)
  5. .test()
  6. .assertResult(1, 5);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void badParallelismStage() {
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. Flowable.range(1, 10)
  6. .parallel(2)
  7. .subscribe(new Subscriber[] { ts });
  8. ts.assertFailure(IllegalArgumentException.class);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void awaitCountLess3() {
  3. Flowable.range(1, 4).delay(50, TimeUnit.MILLISECONDS)
  4. .test()
  5. .awaitCount(5, TestWaitStrategy.SLEEP_1MS)
  6. .assertResult(1, 2, 3, 4);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timedDefaultScheduler() {
  3. Flowable.range(1, 5).take(1, TimeUnit.MINUTES)
  4. .test()
  5. .awaitDone(5, TimeUnit.SECONDS)
  6. .assertResult(1, 2, 3, 4, 5);
  7. }
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayErrorCancelBackpressured() {
  3. TestSubscriber<Integer> ts = Flowable.range(1, 3)
  4. .parallel(1)
  5. .sequentialDelayError(1)
  6. .test(0);
  7. ts
  8. .cancel();
  9. ts.assertEmpty();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emptyInner() {
  3. Flowable.range(1, 5)
  4. .switchMap(Functions.justFunction(Flowable.empty()))
  5. .test()
  6. .assertResult();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simple() {
  3. Flowable.range(1, 5)
  4. .concatMapCompletable(Functions.justFunction(Completable.complete()))
  5. .test()
  6. .assertResult();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeSource() {
  3. TestSubscriber<Integer> ts = TestSubscriber.create(0);
  4. TestingDeferredScalarSubscriber ds = new TestingDeferredScalarSubscriber(ts);
  5. ds.subscribeTo(Flowable.range(1, 10));
  6. ts.assertNoValues();
  7. ts.request(1);
  8. ts.assertValue(10);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure1() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. Flowable.range(1, 100000).takeLast(1)
  5. .observeOn(Schedulers.newThread())
  6. .map(newSlowProcessor()).subscribe(ts);
  7. ts.awaitTerminalEvent();
  8. ts.assertNoErrors();
  9. ts.assertValue(100000);
  10. }

相关文章

Flowable类方法