本文整理了Java中rx.Observable.doOnError()
方法的一些代码示例,展示了Observable.doOnError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnError()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnError
[英]Modifies the source Observable so that it invokes an action if it calls onError.
Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onError时调用操作。
计划程序:默认情况下,doOnError不会在特定计划程序上运行。
代码示例来源:origin: PipelineAI/pipeline
public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (completionLogicRun.compareAndSet(false, true)) {
commandUnsubscribed(commandToCopyStateInto);
}
}
});
}
代码示例来源:origin: PipelineAI/pipeline
commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
代码示例来源:origin: apache/usergrid
@Test
public void testSubscribe(){
List<Integer> expected = Arrays.asList( 10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
final AtomicInteger i = new AtomicInteger();
Observable.from(expected).doOnNext(x -> {
logger.info("print " + x);
i.set(x);
}).doOnError(e -> logger.error(e.getMessage())).subscribe();
logger.info("last");
assertTrue(i.get()==0);
}
代码示例来源:origin: apache/usergrid
@Test
public void testSubscribeException() {
try {
List<Integer> expected = Arrays.asList(10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
Observable.from(expected).doOnNext(x -> {
logger.info("print " + x);
throw new RuntimeException();
}).doOnError(e -> logger.error(e.getMessage())).subscribe();
logger.info("last");
fail();
} catch (Exception e) {
}
}
/**
代码示例来源:origin: apache/usergrid
/**
* Completely delete an index.
*/
public Observable deleteApplication() {
String idString = applicationId(applicationScope.getApplication());
final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
final String[] indexes = getIndexes();
//Added For Graphite Metrics
return Observable.from( indexes ).flatMap( index -> {
final ListenableActionFuture<DeleteByQueryResponse> response =
esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
response.addListener( new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse( DeleteByQueryResponse response ) {
checkDeleteByQueryResponse( tqb, response );
}
@Override
public void onFailure( Throwable e ) {
logger.error( "Failed on delete index", e.getMessage() );
}
} );
return Observable.from( response );
} ).doOnError( t -> logger.error( "Failed on delete application", t.getMessage() ) );
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadContextOnTimeout() {
final AtomicBoolean isInitialized = new AtomicBoolean();
new TimeoutCommand().toObservable()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
isInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
}
})
.materialize()
.toBlocking().single();
System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
System.out.println("initialized inside onError = " + isInitialized.get());
assertEquals(true, isInitialized.get());
}
代码示例来源:origin: PipelineAI/pipeline
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t) {
代码示例来源:origin: PipelineAI/pipeline
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
代码示例来源:origin: apache/usergrid
@Override
public void run( ProgressObserver observer ) {
final int version = migrationInfoSerialization.getVersion( getName() );
if ( version == getMaxVersion() ) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping Migration Plugin: {}", getName());
}
return;
}
observer.start();
AtomicInteger count = new AtomicInteger();
//get old app infos to migrate
final Observable<Entity> oldAppInfos = getOldAppInfos();
oldAppInfos.doOnNext( oldAppInfoEntity -> {
migrateAppInfo( oldAppInfoEntity, observer );
count.incrementAndGet();
} )
//we want a doOnError to catch something going wrong, otherwise we'll mark as complete
.doOnError( error -> {
logger.error( "Unable to migrate applications, an error occurred. Please try again", error );
observer.failed( getMaxVersion(), "Unable to migrate applications", error );
} )
//if we complete successfully, set the version and notify the observer
.doOnCompleted( () -> {
migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
observer.complete();
} ).subscribe();//let this run through since it handles errors
}
代码示例来源:origin: apache/usergrid
}).doOnError(throwable -> logger.error("Failed while sending", throwable));
}, 10);
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof Observable) {
Observable msgO = (Observable) msg;
final TestWriteSubscriber testSubscriber = new TestWriteSubscriber(promise);
msgO.doOnNext(new Action1() {
@Override
public void call(Object o) {
final ChannelPromise channelPromise = ctx.newPromise();
testSubscriber.allPromises.add(channelPromise);
if (o instanceof String) {
o = Unpooled.buffer().writeBytes(((String) o).getBytes());
} else if (o instanceof byte[]) {
o = Unpooled.buffer().writeBytes((byte[]) o);
}
ctx.write(o, channelPromise);
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
ctx.fireExceptionCaught(throwable);
}
}).subscribe(testSubscriber);
writeObservableSubscribers.add(testSubscriber);
} else {
super.write(ctx, msg, promise);
}
}
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
final long startTimeNanos = Clock.newStartTimeNanos();
if (limitDeterminationStrategy.acquireCreationPermit(startTimeNanos, NANOSECONDS)) {
Observable<Connection<R, W>> newConnObsv = hostConnector.getConnectionProvider()
.newConnectionRequest();
newConnObsv.map(new Func1<Connection<R, W>, PooledConnection<R, W>>() {
@Override
public PooledConnection<R, W> call(Connection<R, W> connection) {
return PooledConnection.create(PooledConnectionProviderImpl.this,
maxIdleTimeMillis, connection);
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
limitDeterminationStrategy.releasePermit(); /*Before connect we acquired.*/
}
}).unsafeSubscribe(subscriber);
} else {
idleConnectionsHolder.poll()
.switchIfEmpty(Observable.<PooledConnection<R, W>>error(
new PoolExhaustedException("Client connection pool exhausted.")))
.unsafeSubscribe(subscriber);
}
}
});
代码示例来源:origin: apache/usergrid
.doOnError(throwable -> {
代码示例来源:origin: apache/usergrid
observer.update(getMaxVersion(), "running update for " + index);
})
.doOnError(t -> {
observer.failed(getMaxVersion(),"failed to update",t);
})
代码示例来源:origin: PipelineAI/pipeline
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
代码示例来源:origin: PipelineAI/pipeline
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable ex) {
代码示例来源:origin: apache/usergrid
/**
* Tests working with observers
*/
@Test( expected = TestException.class )
public void throwOnSubscribeObservableNewThread() throws Exception {
final ReThrowObserver exceptionObserver = new ReThrowObserver();
Observable.range( 0, 1 ).map(integer -> {
throw new TestException("I throw and exception");
})
.doOnError(t -> exceptionObserver.onError(t))
.subscribeOn(Schedulers.newThread())
.subscribe(exceptionObserver);
for(int i =0; i<5; i++) {
exceptionObserver.checkResult();
Thread.sleep(200);
}
}
代码示例来源:origin: ReactiveX/RxNetty
.doOnError(LogErrorAction.INSTANCE)
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnError(new Action1<Throwable>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnError(new Action1<Throwable>() {
内容来源于网络,如有侵权,请联系作者删除!