rx.Observable.doOnCompleted()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(158)

本文整理了Java中rx.Observable.doOnCompleted()方法的一些代码示例,展示了Observable.doOnCompleted()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnCompleted()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnCompleted

Observable.doOnCompleted介绍

[英]Modifies the source Observable so that it invokes an action when it calls onCompleted.

Scheduler: doOnCompleted does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onCompleted时调用操作。
调度器:默认情况下,doOnCompleted不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
  2. final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
  3. return cachedObservable
  4. .doOnError(new Action1<Throwable>() {
  5. @Override
  6. public void call(Throwable throwable) {
  7. if (completionLogicRun.compareAndSet(false, true)) {
  8. commandCompleted(commandToCopyStateInto);
  9. }
  10. }
  11. })
  12. .doOnCompleted(new Action0() {
  13. @Override
  14. public void call() {
  15. if (completionLogicRun.compareAndSet(false, true)) {
  16. commandCompleted(commandToCopyStateInto);
  17. }
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. if (completionLogicRun.compareAndSet(false, true)) {
  24. commandUnsubscribed(commandToCopyStateInto);
  25. }
  26. }
  27. });
  28. }

代码示例来源:origin: apache/usergrid

  1. doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
  2. try {
  3. nodeSerialization.delete( scope, node, maxVersion.get()).execute();

代码示例来源:origin: PipelineAI/pipeline

  1. }).doOnCompleted(new Action0() {

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Time the obserable with the specified timer
  3. */
  4. public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) {
  5. final ObservableTimer proxy = new ObservableTimer( timer );
  6. //attach to the observable
  7. return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() );
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxNetty

  1. private void _writeStreamToChannel(final Subscriber<? super Void> subscriber, final long startTimeNanos) {
  2. final ChannelFuture writeFuture = nettyChannel.write(msgs.doOnCompleted(new Action0() {
  3. @Override
  4. public void call() {

代码示例来源:origin: apache/usergrid

  1. .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
  2. .subscribeOn( Schedulers.io() ).subscribe();

代码示例来源:origin: ReactiveX/RxNetty

  1. @Override
  2. public void call(Subscriber<? super Void> subscriber) {
  3. if (!isUsable()) {
  4. PooledConnection.this.owner.discard(PooledConnection.this)
  5. .unsafeSubscribe(subscriber);
  6. } else {
  7. Long keepAliveTimeout = unsafeNettyChannel().attr(DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS).get();
  8. if (null != keepAliveTimeout) {
  9. PooledConnection.this.maxIdleTimeMillis = keepAliveTimeout;
  10. }
  11. markAwarePipeline.reset(); // Reset pipeline state, if changed, on release.
  12. PooledConnection.this.owner.release(PooledConnection.this)
  13. .doOnCompleted(new Action0() {
  14. @Override
  15. public void call() {
  16. releasedAtLeastOnce = true;
  17. lastReturnToPoolTimeMillis = System.currentTimeMillis();
  18. }
  19. })
  20. .unsafeSubscribe(subscriber);
  21. }
  22. }
  23. }).onErrorResumeNext(discard());

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: apache/usergrid

  1. .doOnCompleted(() -> {

代码示例来源:origin: apache/usergrid

  1. .doOnCompleted(() ->{
  2. if( isForCollection ){
  3. writeStateMetaForCollection(

代码示例来源:origin: apache/usergrid

  1. @Test
  2. @Category(ExperimentalTest.class )
  3. public void testPublish() throws InterruptedException {
  4. final int count = 10;
  5. final CountDownLatch latch = new CountDownLatch( count+1 );
  6. final Subscription connectedObservable =
  7. Observable.range( 0, count )
  8. .doOnNext( integer -> latch.countDown() )
  9. .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
  10. .subscribe();
  11. final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  12. assertTrue( "publish1 behaves as expected", completed );
  13. final boolean completedSubscription = connectedObservable.isUnsubscribed();
  14. assertTrue( "Subscription complete", completedSubscription );
  15. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public void run( ProgressObserver observer ) {
  3. final int version = migrationInfoSerialization.getVersion( getName() );
  4. if ( version == getMaxVersion() ) {
  5. if (logger.isDebugEnabled()) {
  6. logger.debug("Skipping Migration Plugin: {}", getName());
  7. }
  8. return;
  9. }
  10. observer.start();
  11. AtomicInteger count = new AtomicInteger();
  12. //get old app infos to migrate
  13. final Observable<Entity> oldAppInfos = getOldAppInfos();
  14. oldAppInfos.doOnNext( oldAppInfoEntity -> {
  15. migrateAppInfo( oldAppInfoEntity, observer );
  16. count.incrementAndGet();
  17. } )
  18. //we want a doOnError to catch something going wrong, otherwise we'll mark as complete
  19. .doOnError( error -> {
  20. logger.error( "Unable to migrate applications, an error occurred. Please try again", error );
  21. observer.failed( getMaxVersion(), "Unable to migrate applications", error );
  22. } )
  23. //if we complete successfully, set the version and notify the observer
  24. .doOnCompleted( () -> {
  25. migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
  26. observer.complete();
  27. } ).subscribe();//let this run through since it handles errors
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(fireOnCompletedHook);

代码示例来源:origin: apache/usergrid

  1. observer.failed(getMaxVersion(),"failed to update",t);
  2. })
  3. .doOnCompleted(() -> {
  4. migrationInfoSerialization.setVersion(getName(), getMaxVersion());
  5. observer.complete();

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: apache/usergrid

  1. } ).doOnCompleted( () -> {

代码示例来源:origin: PipelineAI/pipeline

  1. .lift(new DeprecatedOnFallbackHookApplication(_cmd))
  2. .doOnNext(markFallbackEmit)
  3. .doOnCompleted(markFallbackCompleted)
  4. .onErrorResumeNext(handleFallbackError)
  5. .doOnTerminate(singleSemaphoreRelease)

代码示例来源:origin: apache/usergrid

  1. .doOnCompleted( () -> {
  2. try {
  3. if ( oldAppEntity != null ) {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(markOnCompleted)
  2. .onErrorResumeNext(handleFallback)
  3. .doOnEach(setRequestContext);

相关文章

Observable类方法