io.reactivex.Flowable.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(301)

本文整理了Java中io.reactivex.Flowable.doAfterTerminate()方法的一些代码示例,展示了Flowable.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doAfterTerminate()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doAfterTerminate

Flowable.doAfterTerminate介绍

[英]Registers an Action to be called when this Publisher invokes either Subscriber#onComplete or Subscriber#onError.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doAfterTerminate does not operate by default on a particular Scheduler.
[中]注册此发布服务器调用Subscriber#onComplete或Subscriber#onError时要调用的操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doAfterTerminate默认情况下不在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doAfterTerminateNull() {
  just1.doAfterTerminate(null);
}

代码示例来源:origin: ReactiveX/RxJava

private void checkActionCalled(Flowable<String> input) {
  input.doAfterTerminate(aAction0).subscribe(subscriber);
  try {
    verify(aAction0, times(1)).run();
  } catch (Throwable ex) {
    throw ExceptionHelper.wrapOrThrow(ex);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullActionShouldBeCheckedInConstructor() {
  try {
    Flowable.empty().doAfterTerminate(null);
    fail("Should have thrown NullPointerException");
  } catch (NullPointerException expected) {
    assertEquals("onAfterTerminate is null", expected.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullFinallyActionShouldBeCheckedASAP() {
  try {
    Flowable
        .just("value")
        .doAfterTerminate(null);
    fail();
  } catch (NullPointerException expected) {
  }
}

代码示例来源:origin: micronaut-projects/micronaut-core

Flowable buildFlowable(ReplaySubject subject, Integer dataKey, boolean controlsFlow) {
  Flowable flowable = FlowableReplay.createFrom(subject.toFlowable(BackpressureStrategy.BUFFER)).refCount();
  if (controlsFlow) {
    flowable = flowable.doOnRequest(onRequest);
  }
  return flowable
      .doAfterTerminate(() -> {
        if (controlsFlow) {
          HttpDataReference dataReference = dataReferences.get(dataKey);
          dataReference.destroy();
        }
      });
}

代码示例来源:origin: ReactiveX/RxJava

}).doAfterTerminate(new Action() {

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void ifFinallyActionThrowsExceptionShouldNotBeSwallowedAndActionShouldBeCalledOnce() throws Exception {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
      Action finallyAction = Mockito.mock(Action.class);
      doThrow(new IllegalStateException()).when(finallyAction).run();

      TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

      Flowable
          .just("value")
          .doAfterTerminate(finallyAction)
          .subscribe(testSubscriber);

      testSubscriber.assertValue("value");

      verify(finallyAction).run();

      TestHelper.assertError(errors, 0, IllegalStateException.class);
      // Actual result:
      // Not only IllegalStateException was swallowed
      // But finallyAction was called twice!
    } finally {
      RxJavaPlugins.reset();
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfter() {
  final int[] call = { 0 };
  Flowable.just(1)
  .doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .test()
  .assertResult(1);
  assertEquals(1, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: akarnokd/RxJava2Extensions

@Override
  public void run() throws Exception {
    Flowable.just(1)
    .subscribeOn(Schedulers.io())
    .observeOn(scheduler)
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        scheduler.shutdown();
      }
    })
    .subscribe(new Consumer<Integer>() {
      @Override
      public void accept(Integer v) throws Exception {
        t1[0] = Thread.currentThread();
      }
    });
  }
});

代码示例来源:origin: akarnokd/RxJava2Extensions

@Override
  public void run() throws Exception {
    Flowable.range(1, 5)
    .subscribeOn(scheduler)
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        scheduler.shutdown();
      }
    })
    .subscribe(ts);
    ts.assertEmpty();
  }
});

代码示例来源:origin: akarnokd/RxJava2Extensions

@Override
  public void run() throws Exception {
    Flowable.range(1, 5)
    .subscribeOn(scheduler)
    .delay(100, TimeUnit.MILLISECONDS, scheduler)
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        scheduler.shutdown();
      }
    })
    .subscribe(ts);
    ts.assertEmpty();
  }
});

代码示例来源:origin: reactiverse/reactive-pg-client

private Flowable<Row> createFlowable(String sql) {
 return pool.rxBegin()
  .flatMapPublisher(tx -> tx.rxPrepare(sql)
   .flatMapPublisher(preparedQuery -> {
    // Fetch 50 rows at a time
    PgStream<io.reactiverse.reactivex.pgclient.Row> stream = preparedQuery.createStream(50, Tuple.tuple());
    return stream.toFlowable();
   })
   .doAfterTerminate(tx::commit));
}

相关文章

Flowable类方法