io.reactivex.Flowable.doOnRequest()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中io.reactivex.Flowable.doOnRequest()方法的一些代码示例,展示了Flowable.doOnRequest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnRequest()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnRequest

Flowable.doOnRequest介绍

[英]Modifies the source Publisher so that it invokes the given action when it receives a request for more items.

Note: This operator is for tracing the internal behavior of back-pressure request patterns and generally intended for debugging use. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnRequest does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在收到更多项目的请求时调用给定的操作。
注意:此操作符用于跟踪背压请求模式的内部行为,通常用于调试。背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:默认情况下,doOnRequest不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Long> apply(Long t) {
    return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L))
        .doOnRequest(new LongConsumer() {
          @Override
          public void accept(long v) {
            requests.add(v);
          }
        });
  }
}).take(3)).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnRequestNull() {
  just1.doOnRequest(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
  final AtomicLong request = new AtomicLong();
  Flowable.just(1).doOnRequest(new LongConsumer() {
    @Override
    public void accept(long n) {
      request.addAndGet(n);
    }
  }).blockingSingle();
  // FIXME single now triggers fast-path
  assertEquals(Long.MAX_VALUE, request.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
  final AtomicLong request = new AtomicLong();
  try {
    Flowable.empty().doOnRequest(new LongConsumer() {
      @Override
      public void accept(long n) {
        request.addAndGet(n);
      }
    }).blockingSingle();
  } catch (NoSuchElementException e) {
    // FIXME single now triggers fast-path
    assertEquals(Long.MAX_VALUE, request.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
  final AtomicLong request = new AtomicLong();
  try {
    Flowable.just(1, 2).doOnRequest(new LongConsumer() {
      @Override
      public void accept(long n) {
        request.addAndGet(n);
      }
    }).blockingSingle();
  } catch (IllegalArgumentException e) {
    // FIXME single now triggers fast-path
    assertEquals(Long.MAX_VALUE, request.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void longerSequence() {
  Flowable.range(1, 6)
  .doOnRequest(this)
  .limit(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, requests.get(0).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shorterSequence() {
  Flowable.range(1, 5)
  .doOnRequest(this)
  .limit(6)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(6, requests.get(0).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactSequence() {
  Flowable.range(1, 5)
  .doOnRequest(this)
  .doOnCancel(this)
  .limit(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(2, requests.size());
  assertEquals(5, requests.get(0).intValue());
  assertEquals(CANCELLED, requests.get(1));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void mergeDelayErrorObservableMaxConcurrent() {
  final List<Long> requested = new ArrayList<Long>();
  Flowable<Completable> cs = Flowable
      .just(normal.completable)
      .repeat(10)
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long v) {
          requested.add(v);
        }
      });
  Completable c = Completable.mergeDelayError(cs, 5);
  c.blockingAwait();
  // FIXME this request pattern looks odd because all 10 completions trigger 1 requests
  Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void concatObservablePrefetch() {
  final List<Long> requested = new ArrayList<Long>();
  Flowable<Completable> cs = Flowable
      .just(normal.completable)
      .repeat(10)
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long v) {
          requested.add(v);
        }
      });
  Completable c = Completable.concat(cs, 5);
  c.blockingAwait();
  Assert.assertEquals(Arrays.asList(5L, 4L, 4L), requested);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void mergeObservableMaxConcurrent() {
  final List<Long> requested = new ArrayList<Long>();
  Flowable<Completable> cs = Flowable
      .just(normal.completable)
      .repeat(10)
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long v) {
          requested.add(v);
        }
      });
  Completable c = Completable.merge(cs, 5);
  c.blockingAwait();
  // FIXME this request pattern looks odd because all 10 completions trigger 1 requests
  Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeHappensAgainstParent() {
  final AtomicBoolean unsubscribed = new AtomicBoolean(false);
  Flowable.just(1).concatWith(Flowable.<Integer>never())
  //
      .doOnCancel(new Action() {
        @Override
        public void run() {
          unsubscribed.set(true);
        }
      })
      //
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) {
          // do nothing
        }
      })
      //
      .subscribe().dispose();
  assertTrue(unsubscribed.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitZero() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(0)
  .test()
  .assertResult();
  assertEquals(1, requests.size());
  assertEquals(CANCELLED, requests.get(0));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaxConcurrent5() {
  final List<Long> requests = new ArrayList<Long>();
  Flowable.range(1, 100).doOnRequest(new LongConsumer() {
    @Override
    public void accept(long reqCount) {
      requests.add(reqCount);
    }
  }).concatMapEager(toJust, 5, Flowable.bufferSize()).subscribe(ts);
  ts.assertNoErrors();
  ts.assertValueCount(100);
  ts.assertComplete();
  Assert.assertEquals(5, (long) requests.get(0));
  Assert.assertEquals(1, (long) requests.get(1));
  Assert.assertEquals(1, (long) requests.get(2));
  Assert.assertEquals(1, (long) requests.get(3));
  Assert.assertEquals(1, (long) requests.get(4));
  Assert.assertEquals(1, (long) requests.get(5));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testBackpressureMultipleSmallAsyncRequests() throws InterruptedException {
    final AtomicLong requests = new AtomicLong(0);
    TestSubscriber<Long> ts = new TestSubscriber<Long>(0L);
    Flowable.interval(100, TimeUnit.MILLISECONDS)
        .doOnRequest(new LongConsumer() {
          @Override
          public void accept(long n) {
            requests.addAndGet(n);
          }
        }).skip(4).subscribe(ts);
    Thread.sleep(100);
    ts.request(1);
    ts.request(1);
    Thread.sleep(100);
    ts.dispose();
    // FIXME not assertable anymore
//        ts.assertUnsubscribed();
    ts.assertNoErrors();
    assertEquals(6, requests.get());
  }

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitAndTake() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(6)
  .take(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(6L, CANCELLED), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void synchronousRebatching() {
  final List<Long> requests = new ArrayList<Long>();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 50)
  .doOnRequest(new LongConsumer() {
    @Override
    public void accept(long r) {
      requests.add(r);
    }
  })
  .rebatchRequests(20)
  .subscribe(ts);
  ts.assertValueCount(50);
  ts.assertNoErrors();
  ts.assertComplete();
  assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitStep() {
  TestSubscriber<Integer> ts = Flowable.range(1, 6)
  .doOnRequest(this)
  .limit(5)
  .test(0L);
  assertEquals(0, requests.size());
  ts.request(1);
  ts.assertValue(1);
  ts.request(2);
  ts.assertValues(1, 2, 3);
  ts.request(3);
  ts.assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1L, 2L, 2L), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayErrorMaxConcurrent() {
  final List<Long> requests = new ArrayList<Long>();
  Flowable<Integer> source = Flowable.mergeDelayError(Flowable.just(
      Flowable.just(1).hide(),
      Flowable.<Integer>error(new TestException()))
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t1) {
          requests.add(t1);
        }
      }), 1);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.subscribe(ts);
  ts.assertValue(1);
  ts.assertTerminated();
  ts.assertError(TestException.class);
  assertEquals(Arrays.asList(1L, 1L, 1L), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureBounded() {
  final AtomicLong requested = new AtomicLong();
  Flowable<Integer> source = Flowable.range(1, 1000)
      .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) {
          requested.addAndGet(t);
        }
      });
  ConnectableFlowable<Integer> cf = source.replay(50);
  TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(10L);
  TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(90L);
  cf.subscribe(ts1);
  cf.subscribe(ts2);
  ts2.request(10);
  cf.connect();
  ts1.assertValueCount(10);
  ts1.assertNotTerminated();
  ts2.assertValueCount(100);
  ts2.assertNotTerminated();
  Assert.assertEquals(100, requested.get());
}

相关文章

Flowable类方法