rx.Observable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(229)

本文整理了Java中rx.Observable.doOnNext()方法的一些代码示例,展示了Observable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnNext

Observable.doOnNext介绍

[英]Modifies the source Observable so that it invokes an action when it calls onNext.

Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
  2. WriteStart writeState ) {
  3. return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
  4. Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
  5. Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
  6. .doOnNext( writeVerifyUnique );
  7. // optimistic verification
  8. Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
  9. Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
  10. .doOnNext( writeOptimisticVerify );
  11. final Observable<CollectionIoEvent<MvccEntity>> zip =
  12. Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
  13. return zip;
  14. } );
  15. }

代码示例来源:origin: spring-projects/spring-framework

  1. @PostMapping("/observable")
  2. public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
  3. return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
  4. }

代码示例来源:origin: hidroh/materialistic

  1. @NonNull
  2. private Observable<String> fromNetwork(String itemId, String url) {
  3. return mMercuryService.parse(url)
  4. .onErrorReturn(throwable -> null)
  5. .map(readable -> readable == null ? null : readable.content)
  6. .doOnNext(content -> mCache.putReadability(itemId, content));
  7. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  3. return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
  4. @Override
  5. public void call(BatchReturnType batchReturnType) {
  6. // this is a blocking call in HystrixCollapser
  7. self.mapResponseToRequests(batchReturnType, requests);
  8. }
  9. }).ignoreElements().cast(Void.class);
  10. }

代码示例来源:origin: vert-x3/vertx-examples

  1. private void insertAndFind() {
  2. // Documents to insert
  3. Observable<JsonObject> documents = Observable.just(
  4. new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
  5. new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
  6. );
  7. mongo.rxCreateCollection("users").flatMapObservable(v -> {
  8. // After collection is created we insert each document
  9. return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
  10. }).doOnNext(id -> {
  11. System.out.println("Inserted document " + id);
  12. }).last().toSingle().flatMap(id -> {
  13. // Everything has been inserted now we can query mongo
  14. System.out.println("Insertions done");
  15. return mongo.rxFind("users", new JsonObject());
  16. }).subscribe(results -> {
  17. System.out.println("Results " + results);
  18. }, error -> {
  19. System.out.println("Err");
  20. error.printStackTrace();
  21. });
  22. }
  23. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<Id> mark(final Id entityId, String region) {
  3. Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
  4. Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
  5. Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
  6. Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) )
  7. .map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map(
  8. entityEvent -> entityEvent.getEvent().getId() );
  9. return ObservableTimer.time( o, deleteTimer );
  10. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. @Override
  2. protected void loadData() {
  3. RetrofitHelper.getIm9API()
  4. .getUserInterestQuanData(mid, pageNum, pageSize)
  5. .compose(bindToLifecycle())
  6. .map(userInterestQuanInfo -> userInterestQuanInfo.getData().getResult())
  7. .subscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .doOnNext(resultBeans -> {
  10. if (resultBeans.size() < pageSize) {
  11. loadMoreView.setVisibility(View.GONE);
  12. mHeaderViewRecyclerAdapter.removeFootView();
  13. }
  14. })
  15. .subscribe(resultBeans -> {
  16. userInterestQuans.addAll(resultBeans);
  17. finishTask();
  18. }, throwable -> loadMoreView.setVisibility(View.GONE));
  19. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. @Override
  2. protected void loadData() {
  3. RetrofitHelper.getUserAPI()
  4. .getUserContributeVideos(mid, pageNum, pageSize)
  5. .compose(this.bindToLifecycle())
  6. .map(userContributeInfo -> userContributeInfo.getData().getVlist())
  7. .subscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .doOnNext(vlistBeans -> {
  10. if (vlistBeans.size() < pageSize) {
  11. loadMoreView.setVisibility(View.GONE);
  12. mHeaderViewRecyclerAdapter.removeFootView();
  13. }
  14. })
  15. .subscribe(vlistBeans -> {
  16. userContributes.addAll(vlistBeans);
  17. finishTask();
  18. }, throwable -> loadMoreView.setVisibility(View.GONE));
  19. }

代码示例来源:origin: ReactiveX/RxNetty

  1. static <X> DisposableContentSource<X> createNew(Observable<X> source) {
  2. final ArrayList<X> chunks = new ArrayList<>();
  3. ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
  4. @Override
  5. public void call(X x) {
  6. chunks.add(x);
  7. }
  8. }).replay();
  9. return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
  10. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<String> construct() {
  3. executed = true;
  4. return Observable.just(value).delay(duration, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation())
  5. .doOnNext(new Action1<String>() {
  6. @Override
  7. public void call(String t1) {
  8. System.out.println("successfully executed");
  9. }
  10. });
  11. }

代码示例来源:origin: PipelineAI/pipeline

  1. /* package */ HystrixThreadEventStream(Thread thread) {
  2. this.threadId = thread.getId();
  3. this.threadName = thread.getName();
  4. writeOnlyCommandStartSubject = PublishSubject.create();
  5. writeOnlyCommandCompletionSubject = PublishSubject.create();
  6. writeOnlyCollapserSubject = PublishSubject.create();
  7. writeOnlyCommandStartSubject
  8. .onBackpressureBuffer()
  9. .doOnNext(writeCommandStartsToShardedStreams)
  10. .unsafeSubscribe(Subscribers.empty());
  11. writeOnlyCommandCompletionSubject
  12. .onBackpressureBuffer()
  13. .doOnNext(writeCommandCompletionsToShardedStreams)
  14. .unsafeSubscribe(Subscribers.empty());
  15. writeOnlyCollapserSubject
  16. .onBackpressureBuffer()
  17. .doOnNext(writeCollapserExecutionsToShardedStreams)
  18. .unsafeSubscribe(Subscribers.empty());
  19. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. @Override
  2. protected void loadData() {
  3. RetrofitHelper.getBiliAppAPI()
  4. .searchArchive(content, pageNum, pageSize)
  5. .compose(this.bindToLifecycle())
  6. .map(SearchArchiveInfo::getData)
  7. .subscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .doOnNext(dataBean -> {
  10. if (dataBean.getItems().getArchive().size() < pageSize) {
  11. loadMoreView.setVisibility(View.GONE);
  12. mHeaderViewRecyclerAdapter.removeFootView();
  13. }
  14. })
  15. .subscribe(dataBean -> {
  16. archives.addAll(dataBean.getItems().getArchive());
  17. seasons.addAll(dataBean.getItems().getSeason());
  18. finishTask();
  19. }, throwable -> {
  20. showEmptyView();
  21. loadMoreView.setVisibility(View.GONE);
  22. });
  23. }

代码示例来源:origin: apache/usergrid

  1. @Test
  2. public void testSubscribe(){
  3. List<Integer> expected = Arrays.asList( 10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
  4. final AtomicInteger i = new AtomicInteger();
  5. Observable.from(expected).doOnNext(x -> {
  6. logger.info("print " + x);
  7. i.set(x);
  8. }).doOnError(e -> logger.error(e.getMessage())).subscribe();
  9. logger.info("last");
  10. assertTrue(i.get()==0);
  11. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. @Test
  2. public void testSubscribeException() {
  3. try {
  4. List<Integer> expected = Arrays.asList(10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
  5. Observable.from(expected).doOnNext(x -> {
  6. logger.info("print " + x);
  7. throw new RuntimeException();
  8. }).doOnError(e -> logger.error(e.getMessage())).subscribe();
  9. logger.info("last");
  10. fail();
  11. } catch (Exception e) {
  12. }
  13. }
  14. /**

代码示例来源:origin: apache/usergrid

  1. @Test
  2. @Category(ExperimentalTest.class )
  3. public void testPublish() throws InterruptedException {
  4. final int count = 10;
  5. final CountDownLatch latch = new CountDownLatch( count+1 );
  6. final Subscription connectedObservable =
  7. Observable.range( 0, count )
  8. .doOnNext( integer -> latch.countDown() )
  9. .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
  10. .subscribe();
  11. final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  12. assertTrue( "publish1 behaves as expected", completed );
  13. final boolean completedSubscription = connectedObservable.isUnsubscribed();
  14. assertTrue( "Subscription complete", completedSubscription );
  15. }

代码示例来源:origin: ReactiveX/RxNetty

  1. @Test
  2. public void testUnsubscribeFromUpstream() throws Exception {
  3. final List<String> emittedBufs = new ArrayList<>();
  4. toByteBufObservable("first", "second", "third")
  5. .doOnNext(new Action1<ByteBuf>() {
  6. @Override
  7. public void call(ByteBuf byteBuf) {
  8. emittedBufs.add(byteBuf.toString(Charset.defaultCharset()));
  9. }
  10. })
  11. .compose(CollectBytes.upTo(7))
  12. .subscribe(new TestSubscriber<>());
  13. Assert.assertEquals(Arrays.asList("first", "second"), emittedBufs);
  14. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreIsolatedResponseFromCache() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
  7. .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
  8. @Override
  9. public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
  10. System.out.println("LIST : " + hystrixCommandCompletions);
  11. assertEquals(3, hystrixCommandCompletions.size());
  12. }
  13. })
  14. .subscribe(commandListSubscriber);
  15. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  16. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  17. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  18. ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  19. ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  20. writeToStream.executionDone(result, commandKey, threadPoolKey);
  21. writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  22. writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  23. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  24. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  25. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadIsolatedResponseFromCache() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
  7. .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
  8. @Override
  9. public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
  10. System.out.println("LIST : " + hystrixCommandCompletions);
  11. assertEquals(3, hystrixCommandCompletions.size());
  12. }
  13. })
  14. .subscribe(commandListSubscriber);
  15. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  16. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  17. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  18. ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  19. ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  20. writeToStream.executionDone(result, commandKey, threadPoolKey);
  21. writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  22. writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  23. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  24. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  25. }

相关文章

Observable类方法