io.reactivex.Flowable.debounce()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(205)

本文整理了Java中io.reactivex.Flowable.debounce()方法的一些代码示例,展示了Flowable.debounce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.debounce()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:debounce

Flowable.debounce介绍

[英]Returns a Flowable that mirrors the source Publisher, except that it drops items emitted by the source Publisher that are followed by newer items before a timeout value expires. The timer resets on each emission.

Note: If items keep being emitted by the source Publisher faster than the timeout then no items will be emitted by the resulting Publisher.

Information on debounce vs throttle:

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Integer> f) throws Exception {
  3. return f.debounce(new Function<Integer, Flowable<Long>>() {
  4. @Override
  5. public Flowable<Long> apply(Integer v) throws Exception {
  6. return Flowable.timer(1, TimeUnit.SECONDS);
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Object> apply(Flowable<Object> f)
  3. throws Exception {
  4. return f.debounce(1, TimeUnit.SECONDS);
  5. }
  6. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Object> apply(Flowable<Object> f)
  3. throws Exception {
  4. return f.debounce(1, TimeUnit.SECONDS);
  5. }
  6. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(final Flowable<Integer> f) throws Exception {
  3. return Flowable.just(1).debounce(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return f;
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceTimedUnitNull() {
  3. just1.debounce(1, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceTimedSchedulerNull() {
  3. just1.debounce(1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceFunctionNull() {
  3. just1.debounce(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceFunctionReturnsNull() {
  3. just1.debounce(new Function<Integer, Publisher<Object>>() {
  4. @Override
  5. public Publisher<Object> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.debounce(Functions.justFunction(Flowable.never()));
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timedBadRequest() {
  3. TestHelper.assertBadRequestReported(Flowable.never().debounce(1, TimeUnit.SECONDS));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDebounceWithCompleted() {
  3. Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
  4. @Override
  5. public void subscribe(Subscriber<? super String> subscriber) {
  6. subscriber.onSubscribe(new BooleanSubscription());
  7. publishNext(subscriber, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
  8. publishNext(subscriber, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
  9. publishNext(subscriber, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires.
  10. publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
  11. }
  12. });
  13. Flowable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
  14. sampled.subscribe(Subscriber);
  15. scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  16. InOrder inOrder = inOrder(Subscriber);
  17. // must go to 800 since it must be 400 after when two is sent, which is at 400
  18. scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
  19. inOrder.verify(Subscriber, times(1)).onNext("two");
  20. scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  21. inOrder.verify(Subscriber, times(1)).onComplete();
  22. inOrder.verifyNoMoreInteractions();
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emitLate() {
  3. final AtomicReference<Subscriber<? super Integer>> ref = new AtomicReference<Subscriber<? super Integer>>();
  4. TestSubscriber<Integer> ts = Flowable.range(1, 2)
  5. .debounce(new Function<Integer, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(Integer o) throws Exception {
  8. if (o != 1) {
  9. return Flowable.never();
  10. }
  11. return new Flowable<Integer>() {
  12. @Override
  13. protected void subscribeActual(Subscriber<? super Integer> subscriber) {
  14. subscriber.onSubscribe(new BooleanSubscription());
  15. ref.set(subscriber);
  16. }
  17. };
  18. }
  19. })
  20. .test();
  21. ref.get().onNext(1);
  22. ts
  23. .assertResult(2);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void backpressureNoRequestTimed() {
  3. Flowable.just(1)
  4. .debounce(1, TimeUnit.MILLISECONDS)
  5. .test(0L)
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertFailure(MissingBackpressureException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timedError() {
  3. Flowable.error(new TestException())
  4. .debounce(1, TimeUnit.SECONDS)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceDefault() throws Exception {
  3. Flowable.just(1).debounce(1, TimeUnit.SECONDS)
  4. .test()
  5. .awaitDone(5, TimeUnit.SECONDS)
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void badRequestReported() {
  3. TestHelper.assertBadRequestReported(Flowable.never().debounce(Functions.justFunction(Flowable.never())));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceWithEmpty() {
  3. Flowable.just(1).debounce(Functions.justFunction(Flowable.empty()))
  4. .test()
  5. .assertResult(1);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void backpressureNoRequest() {
  3. Flowable.just(1)
  4. .debounce(Functions.justFunction(Flowable.timer(1, TimeUnit.MILLISECONDS)))
  5. .test(0L)
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertFailure(MissingBackpressureException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceDefaultScheduler() throws Exception {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. Flowable.range(1, 1000).debounce(1, TimeUnit.SECONDS).subscribe(ts);
  5. ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  6. ts.assertValue(1000);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceWithTimeBackpressure() throws InterruptedException {
  3. TestScheduler scheduler = new TestScheduler();
  4. TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
  5. Flowable.merge(
  6. Flowable.just(1),
  7. Flowable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
  8. ).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(subscriber);
  9. scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
  10. subscriber.assertValue(2);
  11. subscriber.assertTerminated();
  12. subscriber.assertNoErrors();
  13. }

相关文章

Flowable类方法