io.reactivex.Flowable.doOnSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(221)

本文整理了Java中io.reactivex.Flowable.doOnSubscribe()方法的一些代码示例,展示了Flowable.doOnSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnSubscribe()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnSubscribe

Flowable.doOnSubscribe介绍

[英]Modifies the source Publisher so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Publisher is reference counted, in which case the source Publisher will invoke the given action for the first subscription.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnSubscribe does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在从订阅服务器订阅时调用给定操作。每个订阅将导致调用给定操作,除非源发布服务器进行引用计数,在这种情况下,源发布服务器将为第一个订阅调用给定操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:默认情况下,doOnSubscribe不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnSubscribeNull() {
  3. just1.doOnSubscribe(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnSubscribe2() throws Exception {
  3. final AtomicInteger count = new AtomicInteger();
  4. Flowable<Integer> f = Flowable.just(1).doOnSubscribe(new Consumer<Subscription>() {
  5. @Override
  6. public void accept(Subscription s) {
  7. count.incrementAndGet();
  8. }
  9. }).take(1).doOnSubscribe(new Consumer<Subscription>() {
  10. @Override
  11. public void accept(Subscription s) {
  12. count.incrementAndGet();
  13. }
  14. });
  15. f.subscribe();
  16. assertEquals(2, count.get());
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnSubscribe() throws Exception {
  3. final AtomicInteger count = new AtomicInteger();
  4. Flowable<Integer> f = Flowable.just(1).doOnSubscribe(new Consumer<Subscription>() {
  5. @Override
  6. public void accept(Subscription s) {
  7. count.incrementAndGet();
  8. }
  9. });
  10. f.subscribe();
  11. f.subscribe();
  12. f.subscribe();
  13. assertEquals(3, count.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchWhenNotEmpty() throws Exception {
  3. final AtomicBoolean subscribed = new AtomicBoolean(false);
  4. final Flowable<Integer> flowable = Flowable.just(4)
  5. .switchIfEmpty(Flowable.just(2)
  6. .doOnSubscribe(new Consumer<Subscription>() {
  7. @Override
  8. public void accept(Subscription s) {
  9. subscribed.set(true);
  10. }
  11. }));
  12. assertEquals(4, flowable.blockingSingle().intValue());
  13. assertFalse(subscribed.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. private static <T> Flowable<T> composer(Flowable<T> source, final AtomicInteger subscriptionCount, final int m) {
  2. return source.doOnSubscribe(new Consumer<Subscription>() {
  3. @Override
  4. public void accept(Subscription s) {
  5. int n = subscriptionCount.getAndIncrement();
  6. if (n >= m) {
  7. Assert.fail("Too many subscriptions! " + (n + 1));
  8. }
  9. }
  10. }).doOnComplete(new Action() {
  11. @Override
  12. public void run() {
  13. int n = subscriptionCount.decrementAndGet();
  14. if (n < 0) {
  15. Assert.fail("Too many unsubscriptions! " + (n - 1));
  16. }
  17. }
  18. });
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. final AtomicInteger nextCount = new AtomicInteger();
  2. Flowable<Integer> r = Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
  3. .doOnSubscribe(new Consumer<Subscription>() {
  4. @Override
  5. public void accept(Subscription s) {

代码示例来源:origin: ReactiveX/RxJava

  1. final AtomicInteger unsubscribeCount = new AtomicInteger();
  2. Flowable<Long> r = Flowable.interval(0, 1, TimeUnit.MILLISECONDS)
  3. .doOnSubscribe(new Consumer<Subscription>() {
  4. @Override
  5. public void accept(Subscription s) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposedUpfrontFallback() {
  3. PublishProcessor<Object> pp = PublishProcessor.create();
  4. final AtomicInteger counter = new AtomicInteger();
  5. Flowable<Object> timeoutAndFallback = Flowable.never().doOnSubscribe(new Consumer<Subscription>() {
  6. @Override
  7. public void accept(Subscription s) throws Exception {
  8. counter.incrementAndGet();
  9. }
  10. });
  11. pp
  12. .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback), timeoutAndFallback)
  13. .test(1, true)
  14. .assertEmpty();
  15. assertEquals(0, counter.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposedUpfront() {
  3. PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final AtomicInteger counter = new AtomicInteger();
  5. Flowable<Object> timeoutAndFallback = Flowable.never().doOnSubscribe(new Consumer<Subscription>() {
  6. @Override
  7. public void accept(Subscription s) throws Exception {
  8. counter.incrementAndGet();
  9. }
  10. });
  11. pp
  12. .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback))
  13. .test(1, true)
  14. .assertEmpty();
  15. assertEquals(0, counter.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayAndTakeUntilNeverSubscribeToSource() {
  3. PublishProcessor<Integer> delayUntil = PublishProcessor.create();
  4. PublishProcessor<Integer> interrupt = PublishProcessor.create();
  5. final AtomicBoolean subscribed = new AtomicBoolean(false);
  6. Flowable.just(1)
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.set(true);
  11. }
  12. })
  13. .delaySubscription(delayUntil)
  14. .takeUntil(interrupt)
  15. .subscribe();
  16. interrupt.onNext(9000);
  17. delayUntil.onNext(1);
  18. Assert.assertFalse(subscribed.get());
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayAndTakeUntilNeverSubscribeToSource() {
  3. PublishProcessor<Integer> delayUntil = PublishProcessor.create();
  4. PublishProcessor<Integer> interrupt = PublishProcessor.create();
  5. final AtomicBoolean subscribed = new AtomicBoolean(false);
  6. Flowable.just(1)
  7. .doOnSubscribe(new Consumer<Object>() {
  8. @Override
  9. public void accept(Object o) {
  10. subscribed.set(true);
  11. }
  12. })
  13. .delaySubscription(delayUntil)
  14. .takeUntil(interrupt)
  15. .subscribe();
  16. interrupt.onNext(9000);
  17. delayUntil.onNext(1);
  18. Assert.assertFalse(subscribed.get());
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void dontSubscribeIfDone2() {
  4. List<Throwable> errors = TestHelper.trackPluginErrors();
  5. try {
  6. final int[] count = { 0 };
  7. Flowable.combineLatestDelayError(
  8. Arrays.asList(Flowable.empty(),
  9. Flowable.error(new TestException())
  10. .doOnSubscribe(new Consumer<Subscription>() {
  11. @Override
  12. public void accept(Subscription s) throws Exception {
  13. count[0]++;
  14. }
  15. })
  16. ),
  17. new Function<Object[], Object>() {
  18. @Override
  19. public Object apply(Object[] a) throws Exception {
  20. return 0;
  21. }
  22. })
  23. .test()
  24. .assertResult();
  25. assertEquals(0, count[0]);
  26. assertTrue(errors.toString(), errors.isEmpty());
  27. } finally {
  28. RxJavaPlugins.reset();
  29. }
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
  4. final AtomicLong count = new AtomicLong();
  5. Consumer<Subscription> incrementer = new Consumer<Subscription>() {
  6. @Override
  7. public void accept(Subscription s) {
  8. count.incrementAndGet();
  9. }
  10. };
  11. //this aync stream should emit first
  12. Flowable<Integer> f1 = Flowable.just(1).doOnSubscribe(incrementer)
  13. .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  14. //this stream emits second
  15. Flowable<Integer> f2 = Flowable.just(1).doOnSubscribe(incrementer)
  16. .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  17. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  18. Flowable.ambArray(f1, f2).subscribe(ts);
  19. ts.request(1);
  20. ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  21. ts.assertNoErrors();
  22. assertEquals(2, count.get());
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dontSubscribeIfDone() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. final int[] count = { 0 };
  6. Flowable.combineLatest(Flowable.empty(),
  7. Flowable.error(new TestException())
  8. .doOnSubscribe(new Consumer<Subscription>() {
  9. @Override
  10. public void accept(Subscription s) throws Exception {
  11. count[0]++;
  12. }
  13. }),
  14. new BiFunction<Object, Object, Object>() {
  15. @Override
  16. public Object apply(Object a, Object b) throws Exception {
  17. return 0;
  18. }
  19. })
  20. .test()
  21. .assertResult();
  22. assertEquals(0, count[0]);
  23. assertTrue(errors.toString(), errors.isEmpty());
  24. } finally {
  25. RxJavaPlugins.reset();
  26. }
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCompleteTriggersSubscription() {
  3. PublishProcessor<Object> other = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Flowable.just(1)
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(ts);
  15. ts.assertNotComplete();
  16. ts.assertNoErrors();
  17. ts.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onComplete();
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. ts.assertValue(1);
  22. ts.assertNoErrors();
  23. ts.assertComplete();
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoPrematureSubscription() {
  3. PublishProcessor<Object> other = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Flowable.just(1)
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(ts);
  15. ts.assertNotComplete();
  16. ts.assertNoErrors();
  17. ts.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onNext(1);
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. ts.assertValue(1);
  22. ts.assertNoErrors();
  23. ts.assertComplete();
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoPrematureSubscriptionToError() {
  3. PublishProcessor<Object> other = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Flowable.<Integer>error(new TestException())
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(ts);
  15. ts.assertNotComplete();
  16. ts.assertNoErrors();
  17. ts.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onComplete();
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. ts.assertNoValues();
  22. ts.assertNotComplete();
  23. ts.assertError(TestException.class);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoSubscriptionIfOtherErrors() {
  3. PublishProcessor<Object> other = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Flowable.<Integer>error(new TestException())
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(ts);
  15. ts.assertNotComplete();
  16. ts.assertNoErrors();
  17. ts.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onError(new TestException());
  20. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  21. ts.assertNoValues();
  22. ts.assertNotComplete();
  23. ts.assertError(TestException.class);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoMultipleSubscriptions() {
  3. PublishProcessor<Object> other = PublishProcessor.create();
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Flowable.just(1)
  7. .doOnSubscribe(new Consumer<Subscription>() {
  8. @Override
  9. public void accept(Subscription s) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(ts);
  15. ts.assertNotComplete();
  16. ts.assertNoErrors();
  17. ts.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onNext(1);
  20. other.onNext(2);
  21. Assert.assertEquals("No subscription", 1, subscribed.get());
  22. ts.assertValue(1);
  23. ts.assertNoErrors();
  24. ts.assertComplete();
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void byCount() {
  3. final int[] subscriptions = { 0 };
  4. Flowable<Integer> source = Flowable.range(1, 5)
  5. .doOnSubscribe(new Consumer<Subscription>() {
  6. @Override
  7. public void accept(Subscription s) throws Exception {
  8. subscriptions[0]++;
  9. }
  10. })
  11. .publish()
  12. .refCount(2);
  13. for (int i = 0; i < 3; i++) {
  14. TestSubscriber<Integer> ts1 = source.test();
  15. ts1.assertEmpty();
  16. TestSubscriber<Integer> ts2 = source.test();
  17. ts1.assertResult(1, 2, 3, 4, 5);
  18. ts2.assertResult(1, 2, 3, 4, 5);
  19. }
  20. assertEquals(3, subscriptions[0]);
  21. }

相关文章

Flowable类方法