rx.Observable.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中rx.Observable.doOnError()方法的一些代码示例,展示了Observable.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnError()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnError

Observable.doOnError介绍

[英]Modifies the source Observable so that it invokes an action if it calls onError.

Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onError时调用操作。
计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
  final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
  return cachedObservable
      .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnCompleted(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandUnsubscribed(commandToCopyStateInto);
          }
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {

代码示例来源:origin: apache/usergrid

@Test
public void testSubscribe(){
  List<Integer> expected = Arrays.asList( 10, 9, 9,  8, 7, 6,  6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
  final AtomicInteger i = new AtomicInteger();
  Observable.from(expected).doOnNext(x -> {
    logger.info("print " + x);
    i.set(x);
  }).doOnError(e -> logger.error(e.getMessage())).subscribe();
  logger.info("last");
  assertTrue(i.get()==0);
}

代码示例来源:origin: apache/usergrid

@Test
public void testSubscribeException() {
  try {
    List<Integer> expected = Arrays.asList(10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
    Observable.from(expected).doOnNext(x -> {
      logger.info("print " + x);
      throw new RuntimeException();
    }).doOnError(e -> logger.error(e.getMessage())).subscribe();
    logger.info("last");
    fail();
  } catch (Exception e) {
  }
}
/**

代码示例来源:origin: apache/usergrid

/**
 * Completely delete an index.
 */
public Observable deleteApplication() {
  String idString = applicationId(applicationScope.getApplication());
  final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
  final String[] indexes = getIndexes();
  //Added For Graphite Metrics
  return Observable.from( indexes ).flatMap( index -> {
    final ListenableActionFuture<DeleteByQueryResponse> response =
      esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
    response.addListener( new ActionListener<DeleteByQueryResponse>() {
      @Override
      public void onResponse( DeleteByQueryResponse response ) {
        checkDeleteByQueryResponse( tqb, response );
      }
      @Override
      public void onFailure( Throwable e ) {
        logger.error( "Failed on delete index", e.getMessage() );
      }
    } );
    return Observable.from( response );
  } ).doOnError( t -> logger.error( "Failed on delete application", t.getMessage() ) );
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadContextOnTimeout() {
  final AtomicBoolean isInitialized = new AtomicBoolean();
  new TimeoutCommand().toObservable()
      .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          isInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
        }
      })
      .materialize()
      .toBlocking().single();
  System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
  System.out.println("initialized inside onError = " + isInitialized.get());
  assertEquals(true, isInitialized.get());
}

代码示例来源:origin: PipelineAI/pipeline

.doOnError(new Action1<Throwable>() {
  @Override
  public void call(Throwable t) {

代码示例来源:origin: PipelineAI/pipeline

.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);

代码示例来源:origin: apache/usergrid

@Override
public void run( ProgressObserver observer ) {
  final int version = migrationInfoSerialization.getVersion( getName() );
  if ( version == getMaxVersion() ) {
    if (logger.isDebugEnabled()) {
      logger.debug("Skipping Migration Plugin: {}", getName());
    }
    return;
  }
  observer.start();
  AtomicInteger count = new AtomicInteger();
  //get old app infos to migrate
  final Observable<Entity> oldAppInfos = getOldAppInfos();
  oldAppInfos.doOnNext( oldAppInfoEntity -> {
    migrateAppInfo( oldAppInfoEntity, observer );
    count.incrementAndGet();
  } )
    //we want a doOnError to catch something going wrong, otherwise we'll mark as complete
    .doOnError( error -> {
      logger.error( "Unable to migrate applications, an error occurred.  Please try again", error );
      observer.failed( getMaxVersion(), "Unable to migrate applications", error );
    } )
      //if we complete successfully, set the version and notify the observer
    .doOnCompleted( () -> {
      migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
      observer.complete();
    } ).subscribe();//let this run through since it handles errors
}

代码示例来源:origin: apache/usergrid

}).doOnError(throwable -> logger.error("Failed while sending", throwable));
}, 10);

代码示例来源:origin: ReactiveX/RxNetty

@Override
  public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
      throws Exception {
    if (msg instanceof Observable) {
      Observable msgO = (Observable) msg;
      final TestWriteSubscriber testSubscriber = new TestWriteSubscriber(promise);
      msgO.doOnNext(new Action1() {
        @Override
        public void call(Object o) {
          final ChannelPromise channelPromise = ctx.newPromise();
          testSubscriber.allPromises.add(channelPromise);
          if (o instanceof String) {
            o = Unpooled.buffer().writeBytes(((String) o).getBytes());
          } else if (o instanceof byte[]) {
            o = Unpooled.buffer().writeBytes((byte[]) o);
          }
          ctx.write(o, channelPromise);
        }
      }).doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          ctx.fireExceptionCaught(throwable);
        }
      }).subscribe(testSubscriber);
      writeObservableSubscribers.add(testSubscriber);
    } else {
      super.write(ctx, msg, promise);
    }
  }
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
  public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
    final long startTimeNanos = Clock.newStartTimeNanos();
    if (limitDeterminationStrategy.acquireCreationPermit(startTimeNanos, NANOSECONDS)) {
      Observable<Connection<R, W>> newConnObsv = hostConnector.getConnectionProvider()
                                  .newConnectionRequest();
      newConnObsv.map(new Func1<Connection<R, W>, PooledConnection<R, W>>() {
        @Override
        public PooledConnection<R, W> call(Connection<R, W> connection) {
          return PooledConnection.create(PooledConnectionProviderImpl.this,
                          maxIdleTimeMillis, connection);
        }
      }).doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          limitDeterminationStrategy.releasePermit(); /*Before connect we acquired.*/
        }
      }).unsafeSubscribe(subscriber);
    } else {
      idleConnectionsHolder.poll()
                 .switchIfEmpty(Observable.<PooledConnection<R, W>>error(
                     new PoolExhaustedException("Client connection pool exhausted.")))
                 .unsafeSubscribe(subscriber);
    }
  }
});

代码示例来源:origin: apache/usergrid

.doOnError(throwable -> {

代码示例来源:origin: apache/usergrid

observer.update(getMaxVersion(), "running update for " + index);
})
.doOnError(t -> {
  observer.failed(getMaxVersion(),"failed to update",t);
})

代码示例来源:origin: PipelineAI/pipeline

.doOnError(new Action1<Throwable>() {
  @Override
  public void call(Throwable throwable) {

代码示例来源:origin: PipelineAI/pipeline

.doOnError(new Action1<Throwable>() {
  @Override
  public void call(Throwable ex) {

代码示例来源:origin: apache/usergrid

/**
 *  Tests working with observers
 */
@Test( expected = TestException.class )
public void throwOnSubscribeObservableNewThread() throws Exception {
  final ReThrowObserver exceptionObserver = new ReThrowObserver();
  Observable.range( 0, 1 ).map(integer -> {
    throw new TestException("I throw and exception");
  })
    .doOnError(t -> exceptionObserver.onError(t))
    .subscribeOn(Schedulers.newThread())
    .subscribe(exceptionObserver);
  for(int i =0; i<5; i++) {
    exceptionObserver.checkResult();
    Thread.sleep(200);
  }
}

代码示例来源:origin: ReactiveX/RxNetty

.doOnError(LogErrorAction.INSTANCE)

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnError(new Action1<Throwable>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnError(new Action1<Throwable>() {

相关文章

Observable类方法