本文整理了Java中rx.Observable.doOnTerminate()
方法的一些代码示例,展示了Observable.doOnTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnTerminate()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnTerminate
[英]Modifies the source Observable so that it invokes an action when it calls onCompleted or onError.
This differs from finallyDo in that this happens before the onCompleted or onError notification. Scheduler: doOnTerminate does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted或onError时调用操作。
这与finallyDo的不同之处在于,这发生在未完成通知或错误通知之前。调度器:默认情况下,DooInterminate不会在特定的调度器上运行。
代码示例来源:origin: PipelineAI/pipeline
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}
return fromCache.toObservableWithStateCopiedInto(this)
.doOnTerminate(new Action0() {
@Override
public void call() {
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(true); //user code did run
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(true); //user code did run
}
}
});
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver");
JDBCClient jdbc = JDBCClient.createShared(vertx, config);
jdbc
.rxGetConnection() // Connect to the database
.flatMapObservable(conn -> { // With the connection...
return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
.flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
.flatMapObservable(sqlRowStream -> {
return sqlRowStream.toObservable() // Transform the stream into an Observable...
.doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
});
}).subscribe(row -> System.out.println("Row : " + row.encode()));
}
}
代码示例来源:origin: PipelineAI/pipeline
}).doOnTerminate(new Action0() {
@Override
public void call() {
代码示例来源:origin: PipelineAI/pipeline
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
代码示例来源:origin: PipelineAI/pipeline
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
代码示例来源:origin: ReactiveX/RxNetty
.doOnTerminate(new Action0() {
@Override
public void call() {
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<Void> rollback() {
return transaction.rollback()
.doOnTerminate(this::releaseConnection);
}
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<Void> commit() {
return transaction.commit()
.doOnTerminate(this::releaseConnection);
}
代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver
@Override
public Observable<Void> commit() {
return transaction.commit()
.doOnTerminate(this::releaseConnection);
}
代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver
@Override
public Observable<Void> rollback() {
return transaction.rollback()
.doOnTerminate(this::releaseConnection);
}
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<Row> queryRows(String sql, Object... params) {
return getConnection()
.doOnNext(this::releaseIfPipelining)
.flatMap(connection -> connection.queryRows(sql, params)
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
}
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
return getConnection()
.doOnNext(this::releaseIfPipelining)
.flatMap(connection -> connection.querySet(sql, params)
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
}
代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver
@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
return getConnection()
.doOnNext(this::releaseIfPipelining)
.flatMap(connection -> connection.querySet(sql, params)
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
}
代码示例来源:origin: ArturVasilov/AndroidSchool
public void init() {
mRepository.repositories()
.doOnSubscribe(mView::showLoading)
.doOnTerminate(mView::hideLoading)
.compose(mLifecycleHandler.load(R.id.repositories_request))
.subscribe(mView::showRepositories, throwable -> mView.showError());
}
代码示例来源:origin: sczyh30/vertx-blueprint-microservice
@Override
public Observable<CartEvent> streamByUser(String userId) {
JsonArray params = new JsonArray().add(userId).add(userId);
return client.rxGetConnection()
.flatMapObservable(conn ->
conn.rxQueryWithParams(STREAM_STATEMENT, params)
.map(ResultSet::getRows)
.flatMapObservable(Observable::from)
.map(this::wrapCartEvent)
.doOnTerminate(conn::close)
);
}
代码示例来源:origin: ArturVasilov/AndroidSchool
public void init() {
RepositoryProvider.provideGithubRepository()
.repositories()
.doOnSubscribe(mView::showLoading)
.doOnTerminate(mView::hideLoading)
.compose(mLifecycleHandler.load(R.id.repositories_request))
.subscribe(mView::showRepositories, throwable -> mView.showError());
}
代码示例来源:origin: ArturVasilov/AndroidSchool
public void init() {
RepositoryProvider.provideGithubRepository()
.repositories()
.doOnSubscribe(mView::showLoading)
.doOnTerminate(mView::hideLoading)
.compose(mLifecycleHandler.load(R.id.repositories_request))
.subscribe(mView::showRepositories, throwable -> mView.showError());
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_55() throws Exception {
final Observable<Integer> observable = Observable.range(1, 100);
Counter counter = metricRegistry.counter("counter");
observable
.flatMap(x ->
makeNetworkCall(x)
.doOnSubscribe(counter::inc)
.doOnTerminate(counter::dec)
)
.subscribe(/* ... */);
}
代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.io.http
private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
return response.doOnSubscribe(metrics.getStartAction())
.doOnNext(metrics.getOnNextAction())
.doOnTerminate(metrics.getEndAction());
}
内容来源于网络,如有侵权,请联系作者删除!