rx.Observable.doOnTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(178)

本文整理了Java中rx.Observable.doOnTerminate()方法的一些代码示例,展示了Observable.doOnTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnTerminate()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnTerminate

Observable.doOnTerminate介绍

[英]Modifies the source Observable so that it invokes an action when it calls onCompleted or onError.

This differs from finallyDo in that this happens before the onCompleted or onError notification. Scheduler: doOnTerminate does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted或onError时调用操作。
这与finallyDo的不同之处在于,这发生在未完成通知或错误通知之前。调度器:默认情况下,DooInterminate不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
  try {
    executionHook.onCacheHit(this);
  } catch (Throwable hookEx) {
    logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
  }
  return fromCache.toObservableWithStateCopiedInto(this)
      .doOnTerminate(new Action0() {
        @Override
        public void call() {
          if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
            cleanUpAfterResponseFromCache(false); //user code never ran
          } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
            cleanUpAfterResponseFromCache(true); //user code did run
          }
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
            cleanUpAfterResponseFromCache(false); //user code never ran
          } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
            cleanUpAfterResponseFromCache(true); //user code did run
          }
        }
      });
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  jdbc
   .rxGetConnection() // Connect to the database
   .flatMapObservable(conn -> { // With the connection...
    return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
     .flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
     .flatMapObservable(sqlRowStream -> {
      return sqlRowStream.toObservable() // Transform the stream into an Observable...
       .doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
     });
   }).subscribe(row -> System.out.println("Row : " + row.encode()));
 }
}

代码示例来源:origin: PipelineAI/pipeline

}).doOnTerminate(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

return executeCommandAndObserve(_cmd)
      .doOnError(markExceptionThrown)
      .doOnTerminate(singleSemaphoreRelease)
      .doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {

代码示例来源:origin: PipelineAI/pipeline

.doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))

代码示例来源:origin: ReactiveX/RxNetty

.doOnTerminate(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(markFallbackCompleted)
      .onErrorResumeNext(handleFallbackError)
      .doOnTerminate(singleSemaphoreRelease)
      .doOnUnsubscribe(singleSemaphoreRelease);
} else {

代码示例来源:origin: alaisi/postgres-async-driver

@Override
public Observable<Void> rollback() {
  return transaction.rollback()
      .doOnTerminate(this::releaseConnection);
}

代码示例来源:origin: alaisi/postgres-async-driver

@Override
public Observable<Void> commit() {
  return transaction.commit()
      .doOnTerminate(this::releaseConnection);
}

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

@Override
public Observable<Void> commit() {
  return transaction.commit()
      .doOnTerminate(this::releaseConnection);
}

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

@Override
public Observable<Void> rollback() {
  return transaction.rollback()
      .doOnTerminate(this::releaseConnection);
}

代码示例来源:origin: alaisi/postgres-async-driver

@Override
public Observable<Row> queryRows(String sql, Object... params) {
  return getConnection()
      .doOnNext(this::releaseIfPipelining)
      .flatMap(connection -> connection.queryRows(sql, params)
                  .doOnTerminate(() -> releaseIfNotPipelining(connection)));
}

代码示例来源:origin: alaisi/postgres-async-driver

@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
  return getConnection()
      .doOnNext(this::releaseIfPipelining)
      .flatMap(connection -> connection.querySet(sql, params)
          .doOnTerminate(() -> releaseIfNotPipelining(connection)));
}

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
  return getConnection()
      .doOnNext(this::releaseIfPipelining)
      .flatMap(connection -> connection.querySet(sql, params)
          .doOnTerminate(() -> releaseIfNotPipelining(connection)));
}

代码示例来源:origin: ArturVasilov/AndroidSchool

public void init() {
  mRepository.repositories()
      .doOnSubscribe(mView::showLoading)
      .doOnTerminate(mView::hideLoading)
      .compose(mLifecycleHandler.load(R.id.repositories_request))
      .subscribe(mView::showRepositories, throwable -> mView.showError());
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

@Override
public Observable<CartEvent> streamByUser(String userId) {
 JsonArray params = new JsonArray().add(userId).add(userId);
 return client.rxGetConnection()
  .flatMapObservable(conn ->
   conn.rxQueryWithParams(STREAM_STATEMENT, params)
    .map(ResultSet::getRows)
    .flatMapObservable(Observable::from)
    .map(this::wrapCartEvent)
    .doOnTerminate(conn::close)
  );
}

代码示例来源:origin: ArturVasilov/AndroidSchool

public void init() {
  RepositoryProvider.provideGithubRepository()
      .repositories()
      .doOnSubscribe(mView::showLoading)
      .doOnTerminate(mView::hideLoading)
      .compose(mLifecycleHandler.load(R.id.repositories_request))
      .subscribe(mView::showRepositories, throwable -> mView.showError());
}

代码示例来源:origin: ArturVasilov/AndroidSchool

public void init() {
  RepositoryProvider.provideGithubRepository()
      .repositories()
      .doOnSubscribe(mView::showLoading)
      .doOnTerminate(mView::hideLoading)
      .compose(mLifecycleHandler.load(R.id.repositories_request))
      .subscribe(mView::showRepositories, throwable -> mView.showError());
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_55() throws Exception {
  final Observable<Integer> observable = Observable.range(1, 100);
  Counter counter = metricRegistry.counter("counter");
  observable
      .flatMap(x ->
          makeNetworkCall(x)
              .doOnSubscribe(counter::inc)
              .doOnTerminate(counter::dec)
      )
      .subscribe(/* ... */);
}

代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.io.http

private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
 PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
 return response.doOnSubscribe(metrics.getStartAction())
   .doOnNext(metrics.getOnNextAction())
   .doOnTerminate(metrics.getEndAction());
}

相关文章

Observable类方法