本文整理了Java中rx.Observable.doOnCompleted()
方法的一些代码示例,展示了Observable.doOnCompleted()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnCompleted()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnCompleted
[英]Modifies the source Observable so that it invokes an action when it calls onCompleted.
Scheduler: doOnCompleted does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted时调用操作。
调度器:默认情况下,doOnCompleted不会在特定的调度器上运行。
代码示例来源:origin: PipelineAI/pipeline
public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (completionLogicRun.compareAndSet(false, true)) {
commandUnsubscribed(commandToCopyStateInto);
}
}
});
}
代码示例来源:origin: apache/usergrid
doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
try {
nodeSerialization.delete( scope, node, maxVersion.get()).execute();
代码示例来源:origin: PipelineAI/pipeline
}).doOnCompleted(new Action0() {
代码示例来源:origin: apache/usergrid
/**
* Time the obserable with the specified timer
*/
public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) {
final ObservableTimer proxy = new ObservableTimer( timer );
//attach to the observable
return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() );
}
}
代码示例来源:origin: ReactiveX/RxNetty
private void _writeStreamToChannel(final Subscriber<? super Void> subscriber, final long startTimeNanos) {
final ChannelFuture writeFuture = nettyChannel.write(msgs.doOnCompleted(new Action0() {
@Override
public void call() {
代码示例来源:origin: apache/usergrid
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
.subscribeOn( Schedulers.io() ).subscribe();
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super Void> subscriber) {
if (!isUsable()) {
PooledConnection.this.owner.discard(PooledConnection.this)
.unsafeSubscribe(subscriber);
} else {
Long keepAliveTimeout = unsafeNettyChannel().attr(DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS).get();
if (null != keepAliveTimeout) {
PooledConnection.this.maxIdleTimeMillis = keepAliveTimeout;
}
markAwarePipeline.reset(); // Reset pipeline state, if changed, on release.
PooledConnection.this.owner.release(PooledConnection.this)
.doOnCompleted(new Action0() {
@Override
public void call() {
releasedAtLeastOnce = true;
lastReturnToPoolTimeMillis = System.currentTimeMillis();
}
})
.unsafeSubscribe(subscriber);
}
}
}).onErrorResumeNext(discard());
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(new Action0() {
@Override
public void call() {
代码示例来源:origin: apache/usergrid
.doOnCompleted(() -> {
代码示例来源:origin: apache/usergrid
.doOnCompleted(() ->{
if( isForCollection ){
writeStateMetaForCollection(
代码示例来源:origin: apache/usergrid
@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
final int count = 10;
final CountDownLatch latch = new CountDownLatch( count+1 );
final Subscription connectedObservable =
Observable.range( 0, count )
.doOnNext( integer -> latch.countDown() )
.doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
.subscribe();
final boolean completed = latch.await( 3, TimeUnit.SECONDS );
assertTrue( "publish1 behaves as expected", completed );
final boolean completedSubscription = connectedObservable.isUnsubscribed();
assertTrue( "Subscription complete", completedSubscription );
}
代码示例来源:origin: apache/usergrid
@Override
public void run( ProgressObserver observer ) {
final int version = migrationInfoSerialization.getVersion( getName() );
if ( version == getMaxVersion() ) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping Migration Plugin: {}", getName());
}
return;
}
observer.start();
AtomicInteger count = new AtomicInteger();
//get old app infos to migrate
final Observable<Entity> oldAppInfos = getOldAppInfos();
oldAppInfos.doOnNext( oldAppInfoEntity -> {
migrateAppInfo( oldAppInfoEntity, observer );
count.incrementAndGet();
} )
//we want a doOnError to catch something going wrong, otherwise we'll mark as complete
.doOnError( error -> {
logger.error( "Unable to migrate applications, an error occurred. Please try again", error );
observer.failed( getMaxVersion(), "Unable to migrate applications", error );
} )
//if we complete successfully, set the version and notify the observer
.doOnCompleted( () -> {
migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
observer.complete();
} ).subscribe();//let this run through since it handles errors
}
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(fireOnCompletedHook);
代码示例来源:origin: apache/usergrid
observer.failed(getMaxVersion(),"failed to update",t);
})
.doOnCompleted(() -> {
migrationInfoSerialization.setVersion(getName(), getMaxVersion());
observer.complete();
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(new Action0() {
@Override
public void call() {
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(new Action0() {
@Override
public void call() {
代码示例来源:origin: apache/usergrid
} ).doOnCompleted( () -> {
代码示例来源:origin: PipelineAI/pipeline
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
代码示例来源:origin: apache/usergrid
.doOnCompleted( () -> {
try {
if ( oldAppEntity != null ) {
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
内容来源于网络,如有侵权,请联系作者删除!