io.reactivex.Flowable.doOnCancel()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(170)

本文整理了Java中io.reactivex.Flowable.doOnCancel()方法的一些代码示例,展示了Flowable.doOnCancel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnCancel()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnCancel

Flowable.doOnCancel介绍

[英]Calls the cancel Action if the downstream cancels the sequence.

The action is shared between subscriptions and thus may be called concurrently from multiple threads; the action must be thread-safe.

If the action throws a runtime exception, that exception is rethrown by the onCancel() call, sometimes as a CompositeException if there were multiple exceptions along the way.

Note that terminal events trigger the action unless the Publisher is subscribed to via unsafeSubscribe().

Backpressure: doOnCancel does not interact with backpressure requests or value delivery; backpressure behavior is preserved between its upstream and its downstream. Scheduler: doOnCancel does not operate by default on a particular Scheduler.
[中]如果下游取消序列,则调用取消操作。
该操作在订阅之间共享,因此可以从多个线程并发调用;操作必须是线程安全的。
如果该操作引发运行时异常,则onCancel()调用会重新引发该异常,如果在此过程中出现多个异常,有时会将该异常作为CompositeException。
请注意,除非通过unsafeSubscribe()订阅发布服务器,否则终端事件将触发该操作。
背压:doOnCancel不与背压请求或价值交付交互;背压行为在其上游和下游之间保持不变。调度程序:默认情况下,doOnCancel不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer v) throws Exception {
    return Flowable.just(v).doOnCancel(new Action() {
      @Override
      public void run() throws Exception {
        counter.getAndIncrement();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> apply(Integer v) throws Exception {
    return Flowable.<Integer>never()
        .doOnCancel(new Action() {
          @Override
          public void run() throws Exception {
            disposed.set(true);
          }
        });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnCancelNull() {
  just1.doOnCancel(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnDisposeNull() {
  just1.doOnCancel(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).cache();
  f.subscribe();
  f.subscribe();
  f.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribesFromUpstream() {
  final AtomicBoolean unsub = new AtomicBoolean();
  Flowable.range(1, 10).concatWith(Flowable.<Integer>never())
  .doOnCancel(new Action() {
    @Override
    public void run() {
      unsub.set(true);
    }})
    .ignoreElements()
    .subscribe().dispose();
  assertTrue(unsub.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactSequence() {
  Flowable.range(1, 5)
  .doOnRequest(this)
  .doOnCancel(this)
  .limit(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(2, requests.size());
  assertEquals(5, requests.get(0).intValue());
  assertEquals(CANCELLED, requests.get(1));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
  f.subscribe();
  f.subscribe();
  f.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeat() {
  final AtomicInteger counter = new AtomicInteger();
  Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.repeat(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noCancelPreviousArray() {
  final AtomicInteger counter = new AtomicInteger();
  Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  Flowable.concatArray(source, source, source, source, source)
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() throws Exception {
  Action onCancel = mock(Action.class);
  Flowable.range(1, 5)
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES)
  .take(1)
  .test()
  .assertResult(1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void missingBackpressureExceptionFirst() throws Exception {
  TestScheduler sch = new TestScheduler();
  Action onCancel = mock(Action.class);
  Flowable.just(1, 2)
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES, sch)
  .test(0)
  .assertFailure(MissingBackpressureException.class);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void onNext(Integer t) {
    if (valueCount() == 2) {
      source.doOnCancel(new Action() {
        @Override
        public void run() {
          child2Unsubscribed.set(true);
        }
      }).take(5).subscribe(ts2);
    }
    super.onNext(t);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribesFromUpstreamFlowable() {
  final AtomicBoolean unsub = new AtomicBoolean();
  Flowable.range(1, 10).concatWith(Flowable.<Integer>never())
  .doOnCancel(new Action() {
    @Override
    public void run() {
      unsub.set(true);
    }})
    .ignoreElements()
    .toFlowable()
    .subscribe().dispose();
  assertTrue(unsub.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitZero() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(0)
  .test()
  .assertResult();
  assertEquals(1, requests.size());
  assertEquals(CANCELLED, requests.get(0));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
    TestSubscriber<String> subscriber = TestSubscriber.create();
    final AtomicBoolean unsubscribed = new AtomicBoolean(false);
    Single<String> single = Flowable.just("Hello World!").doOnCancel(new Action() {

      @Override
      public void run() {
        unsubscribed.set(true);
      }}).single("");
    single.toFlowable().subscribe(subscriber);
    subscriber.assertComplete();
    Assert.assertFalse(unsubscribed.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
    TestSubscriber<String> subscriber = TestSubscriber.create();
    final AtomicBoolean unsubscribed = new AtomicBoolean(false);
    Completable cmp = Flowable.just("Hello World!").doOnCancel(new Action() {

      @Override
      public void run() {
        unsubscribed.set(true);
      }}).ignoreElements();

    cmp.<String>toFlowable().subscribe(subscriber);

    subscriber.assertComplete();

    assertFalse(unsubscribed.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitAndTake() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(6)
  .take(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(6L, CANCELLED), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void missingBackpressureExceptionLatest() throws Exception {
  TestScheduler sch = new TestScheduler();
  Action onCancel = mock(Action.class);
  TestSubscriber<Integer> ts = Flowable.just(1, 2)
  .concatWith(Flowable.<Integer>never())
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.SECONDS, sch, true)
  .test(1);
  sch.advanceTimeBy(1, TimeUnit.SECONDS);
  ts.assertFailure(MissingBackpressureException.class, 1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  final int[] calls = { 0 };
  Flowable.error(new TestException())
  .doOnCancel(new Action() {
    @Override
    public void run() throws Exception {
      calls[0]++;
    }
  })
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertFailure(TestException.class);
  assertEquals(0, calls[0]);
}

相关文章

Flowable类方法