io.reactivex.Flowable.defer()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(239)

本文整理了Java中io.reactivex.Flowable.defer()方法的一些代码示例,展示了Flowable.defer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.defer()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:defer

Flowable.defer介绍

[英]Returns a Flowable that calls a Publisher factory to create a Publisher for each new Subscriber that subscribes. That is, for each subscriber, the actual Publisher that subscriber observes is determined by the factory function.

The defer Subscriber allows you to defer or delay emitting items from a Publisher until such time as a Subscriber subscribes to the Publisher. This allows a Subscriber to easily obtain updates or a refreshed version of the sequence. Backpressure: The operator itself doesn't interfere with backpressure which is determined by the Publisherreturned by the supplier. Scheduler: defer does not operate by default on a particular Scheduler.
[中]返回一个FlowTable,它调用发布者工厂为订阅的每个新订阅服务器创建发布者。也就是说,对于每个订阅服务器,订阅服务器观察到的实际发布服务器由工厂函数确定。
“延迟订阅服务器”允许您延迟或延迟从发布服务器发送项目,直到订阅服务器订阅发布服务器为止。这允许订阅者轻松获得序列的更新或刷新版本。背压:操作员本身不会干扰背压,背压由供应商提供的压力决定。调度程序:默认情况下,延迟不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<String> apply(Long t1) {
  3. return Flowable.defer(new Callable<Publisher<String>>() {
  4. @Override
  5. public Publisher<String> call() {
  6. return Flowable.<String>error(new TestException("Some exception"));
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Long> createPublisher(final long elements) {
  3. return
  4. Flowable.defer(new Callable<Publisher<Long>>() {
  5. @Override
  6. public Publisher<Long> call() throws Exception {
  7. return Flowable.fromIterable(iterate(elements));
  8. }
  9. }
  10. )
  11. ;
  12. }
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Long> apply(Integer t) {
  3. return Flowable.defer(new Callable<Flowable<Long>>() {
  4. @Override
  5. public Flowable<Long> call() {
  6. return Flowable.just(0l);
  7. }
  8. }).subscribeOn(sched);
  9. }
  10. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void deferFunctionNull() {
  3. Flowable.defer(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDeferFunctionThrows() throws Exception {
  3. Callable<Flowable<String>> factory = mock(Callable.class);
  4. when(factory.call()).thenThrow(new TestException());
  5. Flowable<String> result = Flowable.defer(factory);
  6. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  7. result.subscribe(subscriber);
  8. verify(subscriber).onError(any(TestException.class));
  9. verify(subscriber, never()).onNext(any(String.class));
  10. verify(subscriber, never()).onComplete();
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void deferFunctionReturnsNull() {
  3. Flowable.defer(new Callable<Publisher<Object>>() {
  4. @Override
  5. public Publisher<Object> call() {
  6. return null;
  7. }
  8. }).blockingLast();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRetry() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. final AtomicInteger times = new AtomicInteger();
  5. Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() throws Exception {
  8. if (times.getAndIncrement() < 4) {
  9. return Flowable.error(new TestException());
  10. }
  11. return Flowable.just(1);
  12. }
  13. })
  14. .doOnCancel(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. counter.getAndIncrement();
  18. }
  19. });
  20. source.retry(5)
  21. .test()
  22. .assertResult(1);
  23. assertEquals(0, counter.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRetryWhile() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. final AtomicInteger times = new AtomicInteger();
  5. Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() throws Exception {
  8. if (times.getAndIncrement() < 4) {
  9. return Flowable.error(new TestException());
  10. }
  11. return Flowable.just(1);
  12. }
  13. })
  14. .doOnCancel(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. counter.getAndIncrement();
  18. }
  19. });
  20. source.retry(5, Functions.alwaysTrue())
  21. .test()
  22. .assertResult(1);
  23. assertEquals(0, counter.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRetryWhile2() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. final AtomicInteger times = new AtomicInteger();
  5. Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() throws Exception {
  8. if (times.getAndIncrement() < 4) {
  9. return Flowable.error(new TestException());
  10. }
  11. return Flowable.just(1);
  12. }
  13. })
  14. .doOnCancel(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. counter.getAndIncrement();
  18. }
  19. });
  20. source.retry(new BiPredicate<Integer, Throwable>() {
  21. @Override
  22. public boolean test(Integer a, Throwable b) throws Exception {
  23. return a < 5;
  24. }
  25. })
  26. .test()
  27. .assertResult(1);
  28. assertEquals(0, counter.get());
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRetryUntil() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. final AtomicInteger times = new AtomicInteger();
  5. Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() throws Exception {
  8. if (times.getAndIncrement() < 4) {
  9. return Flowable.error(new TestException());
  10. }
  11. return Flowable.just(1);
  12. }
  13. })
  14. .doOnCancel(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. counter.getAndIncrement();
  18. }
  19. });
  20. source.retryUntil(new BooleanSupplier() {
  21. @Override
  22. public boolean getAsBoolean() throws Exception {
  23. return false;
  24. }
  25. })
  26. .test()
  27. .assertResult(1);
  28. assertEquals(0, counter.get());
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDefer() throws Throwable {
  3. Callable<Flowable<String>> factory = mock(Callable.class);
  4. Flowable<String> firstObservable = Flowable.just("one", "two");
  5. Flowable<String> secondObservable = Flowable.just("three", "four");
  6. when(factory.call()).thenReturn(firstObservable, secondObservable);
  7. Flowable<String> deferred = Flowable.defer(factory);
  8. verifyZeroInteractions(factory);
  9. Subscriber<String> firstSubscriber = TestHelper.mockSubscriber();
  10. deferred.subscribe(firstSubscriber);
  11. verify(factory, times(1)).call();
  12. verify(firstSubscriber, times(1)).onNext("one");
  13. verify(firstSubscriber, times(1)).onNext("two");
  14. verify(firstSubscriber, times(0)).onNext("three");
  15. verify(firstSubscriber, times(0)).onNext("four");
  16. verify(firstSubscriber, times(1)).onComplete();
  17. Subscriber<String> secondSubscriber = TestHelper.mockSubscriber();
  18. deferred.subscribe(secondSubscriber);
  19. verify(factory, times(2)).call();
  20. verify(secondSubscriber, times(0)).onNext("one");
  21. verify(secondSubscriber, times(0)).onNext("two");
  22. verify(secondSubscriber, times(1)).onNext("three");
  23. verify(secondSubscriber, times(1)).onNext("four");
  24. verify(secondSubscriber, times(1)).onComplete();
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTimeoutSelectorFirstThrows() {
  3. Flowable<Integer> source = Flowable.<Integer>never();
  4. final PublishProcessor<Integer> timeout = PublishProcessor.create();
  5. Function<Integer, Flowable<Integer>> timeoutFunc = new Function<Integer, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(Integer t1) {
  8. return timeout;
  9. }
  10. };
  11. Callable<Flowable<Integer>> firstTimeoutFunc = new Callable<Flowable<Integer>>() {
  12. @Override
  13. public Flowable<Integer> call() {
  14. throw new TestException();
  15. }
  16. };
  17. Flowable<Integer> other = Flowable.fromIterable(Arrays.asList(100));
  18. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  19. source.timeout(Flowable.defer(firstTimeoutFunc), timeoutFunc, other).subscribe(subscriber);
  20. verify(subscriber).onError(any(TestException.class));
  21. verify(subscriber, never()).onNext(any());
  22. verify(subscriber, never()).onComplete();
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayWithFlowableSubscriptionFunctionThrows() {
  3. PublishProcessor<Integer> source = PublishProcessor.create();
  4. final PublishProcessor<Integer> delay = PublishProcessor.create();
  5. Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() {
  8. throw new TestException();
  9. }
  10. };
  11. Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
  12. @Override
  13. public Flowable<Integer> apply(Integer t1) {
  14. return delay;
  15. }
  16. };
  17. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  18. InOrder inOrder = inOrder(subscriber);
  19. source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
  20. source.onNext(1);
  21. delay.onNext(1);
  22. source.onNext(2);
  23. inOrder.verify(subscriber).onError(any(TestException.class));
  24. inOrder.verifyNoMoreInteractions();
  25. verify(subscriber, never()).onNext(any());
  26. verify(subscriber, never()).onComplete();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayWithFlowableSubscriptionThrows() {
  3. PublishProcessor<Integer> source = PublishProcessor.create();
  4. final PublishProcessor<Integer> delay = PublishProcessor.create();
  5. Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> call() {
  8. return delay;
  9. }
  10. };
  11. Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
  12. @Override
  13. public Flowable<Integer> apply(Integer t1) {
  14. return delay;
  15. }
  16. };
  17. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  18. InOrder inOrder = inOrder(subscriber);
  19. source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
  20. source.onNext(1);
  21. delay.onError(new TestException());
  22. source.onNext(2);
  23. inOrder.verify(subscriber).onError(any(TestException.class));
  24. inOrder.verifyNoMoreInteractions();
  25. verify(subscriber, never()).onNext(any());
  26. verify(subscriber, never()).onComplete();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMergeCovariance4() {
  3. Flowable<Movie> f1 = Flowable.defer(new Callable<Publisher<Movie>>() {
  4. @Override
  5. public Publisher<Movie> call() {
  6. return Flowable.just(
  7. new HorrorMovie(),
  8. new Movie()
  9. );
  10. }
  11. });
  12. Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
  13. List<Media> values = Flowable.merge(f1, f2).toList().blockingGet();
  14. assertTrue(values.get(0) instanceof HorrorMovie);
  15. assertTrue(values.get(1) instanceof Movie);
  16. assertTrue(values.get(2) != null);
  17. assertTrue(values.get(3) instanceof HorrorMovie);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayWithFlowableSubscriptionRunCompletion() {
  3. PublishProcessor<Integer> source = PublishProcessor.create();
  4. final PublishProcessor<Integer> sdelay = PublishProcessor.create();
  5. final PublishProcessor<Integer> delay = PublishProcessor.create();
  6. Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
  7. @Override
  8. public Flowable<Integer> call() {
  9. return sdelay;
  10. }
  11. };
  12. Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
  13. @Override
  14. public Flowable<Integer> apply(Integer t1) {
  15. return delay;
  16. }
  17. };
  18. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  19. InOrder inOrder = inOrder(subscriber);
  20. source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
  21. source.onNext(1);
  22. sdelay.onComplete();
  23. source.onNext(2);
  24. delay.onNext(2);
  25. inOrder.verify(subscriber).onNext(2);
  26. inOrder.verifyNoMoreInteractions();
  27. verify(subscriber, never()).onError(any(Throwable.class));
  28. verify(subscriber, never()).onComplete();
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  2. Flowable.range(1, Flowable.bufferSize() * 2)
  3. .delay(Flowable.defer(new Callable<Flowable<Long>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierSimple() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. Flowable<Integer> source = Flowable.range(1, 5);
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
  7. @Override
  8. public Publisher<Integer> call() {
  9. return pp;
  10. }
  11. })).subscribe(ts);
  12. ts.assertNoValues();
  13. ts.assertNoErrors();
  14. ts.assertNotComplete();
  15. pp.onNext(1);
  16. ts.assertValues(1, 2, 3, 4, 5);
  17. ts.assertComplete();
  18. ts.assertNoErrors();
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierCompletes() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. Flowable<Integer> source = Flowable.range(1, 5);
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
  7. @Override
  8. public Publisher<Integer> call() {
  9. return pp;
  10. }
  11. })).subscribe(ts);
  12. ts.assertNoValues();
  13. ts.assertNoErrors();
  14. ts.assertNotComplete();
  15. // FIXME should this complete the source instead of consuming it?
  16. pp.onComplete();
  17. ts.assertValues(1, 2, 3, 4, 5);
  18. ts.assertComplete();
  19. ts.assertNoErrors();
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierErrors() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. Flowable<Integer> source = Flowable.range(1, 5);
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
  7. @Override
  8. public Publisher<Integer> call() {
  9. return pp;
  10. }
  11. })).subscribe(ts);
  12. ts.assertNoValues();
  13. ts.assertNoErrors();
  14. ts.assertNotComplete();
  15. pp.onError(new TestException());
  16. ts.assertNoValues();
  17. ts.assertNotComplete();
  18. ts.assertError(TestException.class);
  19. }

相关文章

Flowable类方法