io.reactivex.Flowable.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(362)

本文整理了Java中io.reactivex.Flowable.doAfterTerminate()方法的一些代码示例,展示了Flowable.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doAfterTerminate()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doAfterTerminate

Flowable.doAfterTerminate介绍

[英]Registers an Action to be called when this Publisher invokes either Subscriber#onComplete or Subscriber#onError.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doAfterTerminate does not operate by default on a particular Scheduler.
[中]注册此发布服务器调用Subscriber#onComplete或Subscriber#onError时要调用的操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doAfterTerminate默认情况下不在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doAfterTerminateNull() {
  3. just1.doAfterTerminate(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. private void checkActionCalled(Flowable<String> input) {
  2. input.doAfterTerminate(aAction0).subscribe(subscriber);
  3. try {
  4. verify(aAction0, times(1)).run();
  5. } catch (Throwable ex) {
  6. throw ExceptionHelper.wrapOrThrow(ex);
  7. }
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullActionShouldBeCheckedInConstructor() {
  3. try {
  4. Flowable.empty().doAfterTerminate(null);
  5. fail("Should have thrown NullPointerException");
  6. } catch (NullPointerException expected) {
  7. assertEquals("onAfterTerminate is null", expected.getMessage());
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullFinallyActionShouldBeCheckedASAP() {
  3. try {
  4. Flowable
  5. .just("value")
  6. .doAfterTerminate(null);
  7. fail();
  8. } catch (NullPointerException expected) {
  9. }
  10. }

代码示例来源:origin: micronaut-projects/micronaut-core

  1. Flowable buildFlowable(ReplaySubject subject, Integer dataKey, boolean controlsFlow) {
  2. Flowable flowable = FlowableReplay.createFrom(subject.toFlowable(BackpressureStrategy.BUFFER)).refCount();
  3. if (controlsFlow) {
  4. flowable = flowable.doOnRequest(onRequest);
  5. }
  6. return flowable
  7. .doAfterTerminate(() -> {
  8. if (controlsFlow) {
  9. HttpDataReference dataReference = dataReferences.get(dataKey);
  10. dataReference.destroy();
  11. }
  12. });
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. }).doAfterTerminate(new Action() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ifFinallyActionThrowsExceptionShouldNotBeSwallowedAndActionShouldBeCalledOnce() throws Exception {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Action finallyAction = Mockito.mock(Action.class);
  6. doThrow(new IllegalStateException()).when(finallyAction).run();
  7. TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
  8. Flowable
  9. .just("value")
  10. .doAfterTerminate(finallyAction)
  11. .subscribe(testSubscriber);
  12. testSubscriber.assertValue("value");
  13. verify(finallyAction).run();
  14. TestHelper.assertError(errors, 0, IllegalStateException.class);
  15. // Actual result:
  16. // Not only IllegalStateException was swallowed
  17. // But finallyAction was called twice!
  18. } finally {
  19. RxJavaPlugins.reset();
  20. }
  21. }
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteAfter() {
  3. final int[] call = { 0 };
  4. Flowable.just(1)
  5. .doAfterTerminate(new Action() {
  6. @Override
  7. public void run() throws Exception {
  8. call[0]++;
  9. }
  10. })
  11. .test()
  12. .assertResult(1);
  13. assertEquals(1, call[0]);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorAfterCrash() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Flowable.fromPublisher(new Publisher<Object>() {
  6. @Override
  7. public void subscribe(Subscriber<? super Object> s) {
  8. s.onSubscribe(new BooleanSubscription());
  9. s.onError(new TestException());
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .test()
  19. .assertFailure(TestException.class);
  20. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  21. } finally {
  22. RxJavaPlugins.reset();
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteAfterCrash() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Flowable.fromPublisher(new Publisher<Object>() {
  6. @Override
  7. public void subscribe(Subscriber<? super Object> s) {
  8. s.onSubscribe(new BooleanSubscription());
  9. s.onComplete();
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .test()
  19. .assertResult();
  20. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  21. } finally {
  22. RxJavaPlugins.reset();
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorAfterCrashConditional() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Flowable.fromPublisher(new Publisher<Object>() {
  6. @Override
  7. public void subscribe(Subscriber<? super Object> s) {
  8. s.onSubscribe(new BooleanSubscription());
  9. s.onError(new TestException());
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .test()
  20. .assertFailure(TestException.class);
  21. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  22. } finally {
  23. RxJavaPlugins.reset();
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteAfterCrashConditional() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Flowable.fromPublisher(new Publisher<Object>() {
  6. @Override
  7. public void subscribe(Subscriber<? super Object> s) {
  8. s.onSubscribe(new BooleanSubscription());
  9. s.onComplete();
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .test()
  20. .assertResult();
  21. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  22. } finally {
  23. RxJavaPlugins.reset();
  24. }
  25. }

代码示例来源:origin: akarnokd/RxJava2Extensions

  1. @Override
  2. public void run() throws Exception {
  3. Flowable.just(1)
  4. .subscribeOn(Schedulers.io())
  5. .observeOn(scheduler)
  6. .doAfterTerminate(new Action() {
  7. @Override
  8. public void run() throws Exception {
  9. scheduler.shutdown();
  10. }
  11. })
  12. .subscribe(new Consumer<Integer>() {
  13. @Override
  14. public void accept(Integer v) throws Exception {
  15. t1[0] = Thread.currentThread();
  16. }
  17. });
  18. }
  19. });

代码示例来源:origin: akarnokd/RxJava2Extensions

  1. @Override
  2. public void run() throws Exception {
  3. Flowable.range(1, 5)
  4. .subscribeOn(scheduler)
  5. .doAfterTerminate(new Action() {
  6. @Override
  7. public void run() throws Exception {
  8. scheduler.shutdown();
  9. }
  10. })
  11. .subscribe(ts);
  12. ts.assertEmpty();
  13. }
  14. });

代码示例来源:origin: akarnokd/RxJava2Extensions

  1. @Override
  2. public void run() throws Exception {
  3. Flowable.range(1, 5)
  4. .subscribeOn(scheduler)
  5. .delay(100, TimeUnit.MILLISECONDS, scheduler)
  6. .doAfterTerminate(new Action() {
  7. @Override
  8. public void run() throws Exception {
  9. scheduler.shutdown();
  10. }
  11. })
  12. .subscribe(ts);
  13. ts.assertEmpty();
  14. }
  15. });

代码示例来源:origin: reactiverse/reactive-pg-client

  1. private Flowable<Row> createFlowable(String sql) {
  2. return pool.rxBegin()
  3. .flatMapPublisher(tx -> tx.rxPrepare(sql)
  4. .flatMapPublisher(preparedQuery -> {
  5. // Fetch 50 rows at a time
  6. PgStream<io.reactiverse.reactivex.pgclient.Row> stream = preparedQuery.createStream(50, Tuple.tuple());
  7. return stream.toFlowable();
  8. })
  9. .doAfterTerminate(tx::commit));
  10. }

相关文章

Flowable类方法