本文整理了Java中rx.Observable.doAfterTerminate()
方法的一些代码示例,展示了Observable.doAfterTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doAfterTerminate()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doAfterTerminate
暂无
代码示例来源:origin: PipelineAI/pipeline
}).doAfterTerminate(new Action0() {
代码示例来源:origin: PipelineAI/pipeline
}).doAfterTerminate(new Action0() {
代码示例来源:origin: PipelineAI/pipeline
}).doAfterTerminate(new Action0() {
代码示例来源:origin: rchodava/datamill
@Override
public Observable<byte[]> asChunks() {
return StringObservable.from(inputStream)
.doAfterTerminate(completionHandler != null ? completionHandler : () -> {});
}
代码示例来源:origin: akarnokd/RxAgera
public RxObservableAsAgera(rx.Observable<?> source) {
if (source instanceof Subject) {
// no need to publish/autoConnect a subject
this.source = source.doAfterTerminate(this);
} else {
this.source = source.doAfterTerminate(this).publish().autoConnect();
}
this.updatables = new HashMap<>();
}
代码示例来源:origin: io.vertx/vertx-rx-java
/**
* Generates a {@link Observable} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Observable}
* @return an {@link Observable} generated from {@link SQLConnection} operations
*/
public static <T> Observable<T> usingConnectionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) {
return client.rxGetConnection().flatMapObservable(conn -> {
return sourceSupplier.apply(conn).doAfterTerminate(conn::close);
});
}
代码示例来源:origin: vert-x3/vertx-rx
/**
* Generates a {@link Observable} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Observable}
* @return an {@link Observable} generated from {@link SQLConnection} operations
*/
public static <T> Observable<T> usingConnectionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) {
return client.rxGetConnection().flatMapObservable(conn -> {
return sourceSupplier.apply(conn).doAfterTerminate(conn::close);
});
}
代码示例来源:origin: ArturVasilov/AndroidSchool
@Test
public void testTask2() throws Exception {
final Observable<BigInteger> observable = RxJavaTask6.task6Observable();
final long firstStart = System.nanoTime();
observable
.doAfterTerminate(() -> {
final long firstTime = System.nanoTime() - firstStart;
final long secondStart = System.nanoTime();
observable.subscribe(bigInteger -> {
final long secondTime = System.nanoTime() - secondStart;
assertTrue("Second request to observable is too slow, probably you haven't cached it",
secondTime < firstTime / 2);
});
})
.subscribe(value -> {
assertEquals(EXPECTED_RESULT, value);
}, throwable -> fail());
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_189() throws Exception {
Connection connection = null;
PreparedStatement statement =
connection.prepareStatement("SELECT ...");
statement.setFetchSize(1000);
ResultSet rs = statement.executeQuery();
Observable<Object[]> result =
Observable
.from(ResultSetIterator.iterable(rs))
.doAfterTerminate(() -> {
try {
rs.close();
statement.close();
connection.close();
} catch (SQLException e) {
log.warn("Unable to close", e);
}
});
}
代码示例来源:origin: ArturVasilov/AndroidSchool
private void load(boolean restart) {
Observable.Transformer<List<Movie>, List<Movie>> lifecycleTransformer =
restart
? mLifecycleHandler.reload(R.id.movies_loader_request)
: mLifecycleHandler.load(R.id.movies_loader_request);
RepositoryProvider.getRepository().loadMovies(mType)
.doOnSubscribe(() -> setRefreshing(true))
.doAfterTerminate(() -> setRefreshing(false))
.compose(lifecycleTransformer)
.subscribe(this::handleMovies, this::handleError);
}
代码示例来源:origin: w0080626/GankIO
@Override
public void onRefresh() {
setCurPage(1);
ListModel.getInstance().getResult(title, 20, getCurPage()).doAfterTerminate(new Action0() {
@Override
public void call() {
setCurPage(2);
}
}).unsafeSubscribe(getRefreshSubscriber());
}
代码示例来源:origin: vert-x3/vertx-rx
protected void assertTableContainsInitDataOnly() throws Exception {
client.rxGetConnection().flatMapObservable(conn -> {
return uniqueNames(conn).doAfterTerminate(conn::close);
}).test()
.awaitTerminalEvent()
.assertCompleted()
.assertValues(NAMES.stream().sorted().distinct().toArray(String[]::new));
}
代码示例来源:origin: hawkular/hawkular-metrics
.flatMap(mId -> metricsService.findMetric(mId)
.doOnNext(m -> count.incrementAndGet())
.doAfterTerminate(() ->
logger.infof("Fetched %d metric definitions for tenant %s", count.get(),
tenantId))),
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
.flatMap(mId -> metricsService.findMetric(mId)
.doOnNext(m -> count.incrementAndGet())
.doAfterTerminate(() ->
logger.infof("Fetched %d metric definitions for tenant %s", count.get(),
tenantId))),
代码示例来源:origin: georocket/georocket
.doAfterTerminate(chunk::close)
代码示例来源:origin: rchodava/datamill
.doAfterTerminate(() -> {
if (first[0]) {
sendFullResponse(context, originalRequest,
代码示例来源:origin: vert-x3/vertx-rx
private Observable<String> inTransaction(Exception e) throws Exception {
return client.rxGetConnection().flatMapObservable(conn -> {
return rxInsertExtraFolks(conn)
.andThen(uniqueNames(conn))
.compose(upstream -> e == null ? upstream : upstream.concatWith(Observable.error(e)))
.compose(SQLClientHelper.txObservableTransformer(conn))
.concatWith(rxAssertAutoCommit(conn).toObservable())
.doAfterTerminate(conn::close);
});
}
}
内容来源于网络,如有侵权,请联系作者删除!