本文整理了Java中rx.Observable.doOnNext()
方法的一些代码示例,展示了Observable.doOnNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnNext
[英]Modifies the source Observable so that it invokes an action when it calls onNext.
Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。
代码示例来源:origin: apache/usergrid
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( writeVerifyUnique );
// optimistic verification
Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( writeOptimisticVerify );
final Observable<CollectionIoEvent<MvccEntity>> zip =
Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
return zip;
} );
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/observable")
public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
}
代码示例来源:origin: hidroh/materialistic
@NonNull
private Observable<String> fromNetwork(String itemId, String url) {
return mMercuryService.parse(url)
.onErrorReturn(throwable -> null)
.map(readable -> readable == null ? null : readable.content)
.doOnNext(content -> mCache.putReadability(itemId, content));
}
代码示例来源:origin: PipelineAI/pipeline
@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(batchReturnType, requests);
}
}).ignoreElements().cast(Void.class);
}
代码示例来源:origin: vert-x3/vertx-examples
private void insertAndFind() {
// Documents to insert
Observable<JsonObject> documents = Observable.just(
new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
);
mongo.rxCreateCollection("users").flatMapObservable(v -> {
// After collection is created we insert each document
return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
}).doOnNext(id -> {
System.out.println("Inserted document " + id);
}).last().toSingle().flatMap(id -> {
// Everything has been inserted now we can query mongo
System.out.println("Insertions done");
return mongo.rxFind("users", new JsonObject());
}).subscribe(results -> {
System.out.println("Results " + results);
}, error -> {
System.out.println("Err");
error.printStackTrace();
});
}
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<Id> mark(final Id entityId, String region) {
Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) )
.map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map(
entityEvent -> entityEvent.getEvent().getId() );
return ObservableTimer.time( o, deleteTimer );
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
protected void loadData() {
RetrofitHelper.getIm9API()
.getUserInterestQuanData(mid, pageNum, pageSize)
.compose(bindToLifecycle())
.map(userInterestQuanInfo -> userInterestQuanInfo.getData().getResult())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(resultBeans -> {
if (resultBeans.size() < pageSize) {
loadMoreView.setVisibility(View.GONE);
mHeaderViewRecyclerAdapter.removeFootView();
}
})
.subscribe(resultBeans -> {
userInterestQuans.addAll(resultBeans);
finishTask();
}, throwable -> loadMoreView.setVisibility(View.GONE));
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
protected void loadData() {
RetrofitHelper.getUserAPI()
.getUserContributeVideos(mid, pageNum, pageSize)
.compose(this.bindToLifecycle())
.map(userContributeInfo -> userContributeInfo.getData().getVlist())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(vlistBeans -> {
if (vlistBeans.size() < pageSize) {
loadMoreView.setVisibility(View.GONE);
mHeaderViewRecyclerAdapter.removeFootView();
}
})
.subscribe(vlistBeans -> {
userContributes.addAll(vlistBeans);
finishTask();
}, throwable -> loadMoreView.setVisibility(View.GONE));
}
代码示例来源:origin: ReactiveX/RxNetty
static <X> DisposableContentSource<X> createNew(Observable<X> source) {
final ArrayList<X> chunks = new ArrayList<>();
ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
@Override
public void call(X x) {
chunks.add(x);
}
}).replay();
return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<String> construct() {
executed = true;
return Observable.just(value).delay(duration, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation())
.doOnNext(new Action1<String>() {
@Override
public void call(String t1) {
System.out.println("successfully executed");
}
});
}
代码示例来源:origin: PipelineAI/pipeline
/* package */ HystrixThreadEventStream(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
writeOnlyCommandStartSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCollapserSubject = PublishSubject.create();
writeOnlyCommandStartSubject
.onBackpressureBuffer()
.doOnNext(writeCommandStartsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCollapserSubject
.onBackpressureBuffer()
.doOnNext(writeCollapserExecutionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
protected void loadData() {
RetrofitHelper.getBiliAppAPI()
.searchArchive(content, pageNum, pageSize)
.compose(this.bindToLifecycle())
.map(SearchArchiveInfo::getData)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(dataBean -> {
if (dataBean.getItems().getArchive().size() < pageSize) {
loadMoreView.setVisibility(View.GONE);
mHeaderViewRecyclerAdapter.removeFootView();
}
})
.subscribe(dataBean -> {
archives.addAll(dataBean.getItems().getArchive());
seasons.addAll(dataBean.getItems().getSeason());
finishTask();
}, throwable -> {
showEmptyView();
loadMoreView.setVisibility(View.GONE);
});
}
代码示例来源:origin: apache/usergrid
@Test
public void testSubscribe(){
List<Integer> expected = Arrays.asList( 10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
final AtomicInteger i = new AtomicInteger();
Observable.from(expected).doOnNext(x -> {
logger.info("print " + x);
i.set(x);
}).doOnError(e -> logger.error(e.getMessage())).subscribe();
logger.info("last");
assertTrue(i.get()==0);
}
代码示例来源:origin: apache/usergrid
private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
代码示例来源:origin: apache/usergrid
private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
代码示例来源:origin: apache/usergrid
@Test
public void testSubscribeException() {
try {
List<Integer> expected = Arrays.asList(10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
Observable.from(expected).doOnNext(x -> {
logger.info("print " + x);
throw new RuntimeException();
}).doOnError(e -> logger.error(e.getMessage())).subscribe();
logger.info("last");
fail();
} catch (Exception e) {
}
}
/**
代码示例来源:origin: apache/usergrid
@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
final int count = 10;
final CountDownLatch latch = new CountDownLatch( count+1 );
final Subscription connectedObservable =
Observable.range( 0, count )
.doOnNext( integer -> latch.countDown() )
.doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
.subscribe();
final boolean completed = latch.await( 3, TimeUnit.SECONDS );
assertTrue( "publish1 behaves as expected", completed );
final boolean completedSubscription = connectedObservable.isUnsubscribed();
assertTrue( "Subscription complete", completedSubscription );
}
代码示例来源:origin: ReactiveX/RxNetty
@Test
public void testUnsubscribeFromUpstream() throws Exception {
final List<String> emittedBufs = new ArrayList<>();
toByteBufObservable("first", "second", "third")
.doOnNext(new Action1<ByteBuf>() {
@Override
public void call(ByteBuf byteBuf) {
emittedBufs.add(byteBuf.toString(Charset.defaultCharset()));
}
})
.compose(CollectBytes.upTo(7))
.subscribe(new TestSubscriber<>());
Assert.assertEquals(Arrays.asList("first", "second"), emittedBufs);
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreIsolatedResponseFromCache() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
.doOnNext(new Action1<List<HystrixCommandCompletion>>() {
@Override
public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
System.out.println("LIST : " + hystrixCommandCompletions);
assertEquals(3, hystrixCommandCompletions.size());
}
})
.subscribe(commandListSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
writeToStream.executionDone(result, commandKey, threadPoolKey);
writeToStream.executionDone(cache1, commandKey, threadPoolKey);
writeToStream.executionDone(cache2, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadIsolatedResponseFromCache() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
.doOnNext(new Action1<List<HystrixCommandCompletion>>() {
@Override
public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
System.out.println("LIST : " + hystrixCommandCompletions);
assertEquals(3, hystrixCommandCompletions.size());
}
})
.subscribe(commandListSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
writeToStream.executionDone(result, commandKey, threadPoolKey);
writeToStream.executionDone(cache1, commandKey, threadPoolKey);
writeToStream.executionDone(cache2, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
内容来源于网络,如有侵权,请联系作者删除!