rx.Observable.doOnCompleted()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中rx.Observable.doOnCompleted()方法的一些代码示例,展示了Observable.doOnCompleted()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnCompleted()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnCompleted

Observable.doOnCompleted介绍

[英]Modifies the source Observable so that it invokes an action when it calls onCompleted.

Scheduler: doOnCompleted does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted时调用操作。
调度器:默认情况下,doOnCompleted不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
  final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
  return cachedObservable
      .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnCompleted(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandUnsubscribed(commandToCopyStateInto);
          }
        }
      });
}

代码示例来源:origin: apache/usergrid

doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
  try {
    nodeSerialization.delete( scope, node, maxVersion.get()).execute();

代码示例来源:origin: PipelineAI/pipeline

}).doOnCompleted(new Action0() {

代码示例来源:origin: apache/usergrid

/**
   * Time the obserable with the specified timer
   */
  public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) {
    final ObservableTimer proxy = new ObservableTimer( timer );

    //attach to the observable
    return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() );
  }
}

代码示例来源:origin: ReactiveX/RxNetty

private void _writeStreamToChannel(final Subscriber<? super Void> subscriber, final long startTimeNanos) {
  final ChannelFuture writeFuture = nettyChannel.write(msgs.doOnCompleted(new Action0() {
    @Override
    public void call() {

代码示例来源:origin: apache/usergrid

.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
.subscribeOn( Schedulers.io() ).subscribe();

代码示例来源:origin: ReactiveX/RxNetty

@Override
  public void call(Subscriber<? super Void> subscriber) {
    if (!isUsable()) {
      PooledConnection.this.owner.discard(PooledConnection.this)
                    .unsafeSubscribe(subscriber);
    } else {
      Long keepAliveTimeout = unsafeNettyChannel().attr(DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS).get();
      if (null != keepAliveTimeout) {
        PooledConnection.this.maxIdleTimeMillis = keepAliveTimeout;
      }
      markAwarePipeline.reset(); // Reset pipeline state, if changed, on release.
      PooledConnection.this.owner.release(PooledConnection.this)
         .doOnCompleted(new Action0() {
           @Override
           public void call() {
             releasedAtLeastOnce = true;
             lastReturnToPoolTimeMillis = System.currentTimeMillis();
           }
         })
         .unsafeSubscribe(subscriber);
    }
  }
}).onErrorResumeNext(discard());

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: apache/usergrid

.doOnCompleted(() -> {

代码示例来源:origin: apache/usergrid

.doOnCompleted(() ->{
  if( isForCollection ){
    writeStateMetaForCollection(

代码示例来源:origin: apache/usergrid

@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch( count+1 );
  final Subscription connectedObservable =
    Observable.range( 0, count )
      .doOnNext( integer -> latch.countDown() )
      .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
      .subscribe();
  final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  assertTrue( "publish1 behaves as expected", completed );
  final boolean completedSubscription = connectedObservable.isUnsubscribed();
  assertTrue( "Subscription complete", completedSubscription );
}

代码示例来源:origin: apache/usergrid

@Override
public void run( ProgressObserver observer ) {
  final int version = migrationInfoSerialization.getVersion( getName() );
  if ( version == getMaxVersion() ) {
    if (logger.isDebugEnabled()) {
      logger.debug("Skipping Migration Plugin: {}", getName());
    }
    return;
  }
  observer.start();
  AtomicInteger count = new AtomicInteger();
  //get old app infos to migrate
  final Observable<Entity> oldAppInfos = getOldAppInfos();
  oldAppInfos.doOnNext( oldAppInfoEntity -> {
    migrateAppInfo( oldAppInfoEntity, observer );
    count.incrementAndGet();
  } )
    //we want a doOnError to catch something going wrong, otherwise we'll mark as complete
    .doOnError( error -> {
      logger.error( "Unable to migrate applications, an error occurred.  Please try again", error );
      observer.failed( getMaxVersion(), "Unable to migrate applications", error );
    } )
      //if we complete successfully, set the version and notify the observer
    .doOnCompleted( () -> {
      migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
      observer.complete();
    } ).subscribe();//let this run through since it handles errors
}

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(fireOnCompletedHook);

代码示例来源:origin: apache/usergrid

observer.failed(getMaxVersion(),"failed to update",t);
})
.doOnCompleted(() -> {
  migrationInfoSerialization.setVersion(getName(), getMaxVersion());
  observer.complete();

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: apache/usergrid

} ).doOnCompleted( () -> {

代码示例来源:origin: PipelineAI/pipeline

.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)

代码示例来源:origin: apache/usergrid

.doOnCompleted( () -> {
  try {
    if ( oldAppEntity != null ) {

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);

相关文章

Observable类方法