rx.Observable.doOnTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(204)

本文整理了Java中rx.Observable.doOnTerminate()方法的一些代码示例,展示了Observable.doOnTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnTerminate()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnTerminate

Observable.doOnTerminate介绍

[英]Modifies the source Observable so that it invokes an action when it calls onCompleted or onError.

This differs from finallyDo in that this happens before the onCompleted or onError notification. Scheduler: doOnTerminate does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted或onError时调用操作。
这与finallyDo的不同之处在于,这发生在未完成通知或错误通知之前。调度器:默认情况下,DooInterminate不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
  2. try {
  3. executionHook.onCacheHit(this);
  4. } catch (Throwable hookEx) {
  5. logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
  6. }
  7. return fromCache.toObservableWithStateCopiedInto(this)
  8. .doOnTerminate(new Action0() {
  9. @Override
  10. public void call() {
  11. if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
  12. cleanUpAfterResponseFromCache(false); //user code never ran
  13. } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
  14. cleanUpAfterResponseFromCache(true); //user code did run
  15. }
  16. }
  17. })
  18. .doOnUnsubscribe(new Action0() {
  19. @Override
  20. public void call() {
  21. if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
  22. cleanUpAfterResponseFromCache(false); //user code never ran
  23. } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
  24. cleanUpAfterResponseFromCache(true); //user code did run
  25. }
  26. }
  27. });
  28. }

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
  4. .put("driver_class", "org.hsqldb.jdbcDriver");
  5. JDBCClient jdbc = JDBCClient.createShared(vertx, config);
  6. jdbc
  7. .rxGetConnection() // Connect to the database
  8. .flatMapObservable(conn -> { // With the connection...
  9. return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
  10. .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
  11. .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
  12. .flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
  13. .flatMapObservable(sqlRowStream -> {
  14. return sqlRowStream.toObservable() // Transform the stream into an Observable...
  15. .doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
  16. });
  17. }).subscribe(row -> System.out.println("Row : " + row.encode()));
  18. }
  19. }

代码示例来源:origin: PipelineAI/pipeline

  1. }).doOnTerminate(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. return executeCommandAndObserve(_cmd)
  2. .doOnError(markExceptionThrown)
  3. .doOnTerminate(singleSemaphoreRelease)
  4. .doOnUnsubscribe(singleSemaphoreRelease);
  5. } catch (RuntimeException e) {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))

代码示例来源:origin: ReactiveX/RxNetty

  1. .doOnTerminate(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(markFallbackCompleted)
  2. .onErrorResumeNext(handleFallbackError)
  3. .doOnTerminate(singleSemaphoreRelease)
  4. .doOnUnsubscribe(singleSemaphoreRelease);
  5. } else {

代码示例来源:origin: alaisi/postgres-async-driver

  1. @Override
  2. public Observable<Void> rollback() {
  3. return transaction.rollback()
  4. .doOnTerminate(this::releaseConnection);
  5. }

代码示例来源:origin: alaisi/postgres-async-driver

  1. @Override
  2. public Observable<Void> commit() {
  3. return transaction.commit()
  4. .doOnTerminate(this::releaseConnection);
  5. }

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

  1. @Override
  2. public Observable<Void> commit() {
  3. return transaction.commit()
  4. .doOnTerminate(this::releaseConnection);
  5. }

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

  1. @Override
  2. public Observable<Void> rollback() {
  3. return transaction.rollback()
  4. .doOnTerminate(this::releaseConnection);
  5. }

代码示例来源:origin: alaisi/postgres-async-driver

  1. @Override
  2. public Observable<Row> queryRows(String sql, Object... params) {
  3. return getConnection()
  4. .doOnNext(this::releaseIfPipelining)
  5. .flatMap(connection -> connection.queryRows(sql, params)
  6. .doOnTerminate(() -> releaseIfNotPipelining(connection)));
  7. }

代码示例来源:origin: alaisi/postgres-async-driver

  1. @Override
  2. public Observable<ResultSet> querySet(String sql, Object... params) {
  3. return getConnection()
  4. .doOnNext(this::releaseIfPipelining)
  5. .flatMap(connection -> connection.querySet(sql, params)
  6. .doOnTerminate(() -> releaseIfNotPipelining(connection)));
  7. }

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

  1. @Override
  2. public Observable<ResultSet> querySet(String sql, Object... params) {
  3. return getConnection()
  4. .doOnNext(this::releaseIfPipelining)
  5. .flatMap(connection -> connection.querySet(sql, params)
  6. .doOnTerminate(() -> releaseIfNotPipelining(connection)));
  7. }

代码示例来源:origin: ArturVasilov/AndroidSchool

  1. public void init() {
  2. mRepository.repositories()
  3. .doOnSubscribe(mView::showLoading)
  4. .doOnTerminate(mView::hideLoading)
  5. .compose(mLifecycleHandler.load(R.id.repositories_request))
  6. .subscribe(mView::showRepositories, throwable -> mView.showError());
  7. }

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

  1. @Override
  2. public Observable<CartEvent> streamByUser(String userId) {
  3. JsonArray params = new JsonArray().add(userId).add(userId);
  4. return client.rxGetConnection()
  5. .flatMapObservable(conn ->
  6. conn.rxQueryWithParams(STREAM_STATEMENT, params)
  7. .map(ResultSet::getRows)
  8. .flatMapObservable(Observable::from)
  9. .map(this::wrapCartEvent)
  10. .doOnTerminate(conn::close)
  11. );
  12. }

代码示例来源:origin: ArturVasilov/AndroidSchool

  1. public void init() {
  2. RepositoryProvider.provideGithubRepository()
  3. .repositories()
  4. .doOnSubscribe(mView::showLoading)
  5. .doOnTerminate(mView::hideLoading)
  6. .compose(mLifecycleHandler.load(R.id.repositories_request))
  7. .subscribe(mView::showRepositories, throwable -> mView.showError());
  8. }

代码示例来源:origin: ArturVasilov/AndroidSchool

  1. public void init() {
  2. RepositoryProvider.provideGithubRepository()
  3. .repositories()
  4. .doOnSubscribe(mView::showLoading)
  5. .doOnTerminate(mView::hideLoading)
  6. .compose(mLifecycleHandler.load(R.id.repositories_request))
  7. .subscribe(mView::showRepositories, throwable -> mView.showError());
  8. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_55() throws Exception {
  3. final Observable<Integer> observable = Observable.range(1, 100);
  4. Counter counter = metricRegistry.counter("counter");
  5. observable
  6. .flatMap(x ->
  7. makeNetworkCall(x)
  8. .doOnSubscribe(counter::inc)
  9. .doOnTerminate(counter::dec)
  10. )
  11. .subscribe(/* ... */);
  12. }

代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.io.http

  1. private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
  2. PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
  3. return response.doOnSubscribe(metrics.getStartAction())
  4. .doOnNext(metrics.getOnNextAction())
  5. .doOnTerminate(metrics.getEndAction());
  6. }

相关文章

Observable类方法