rx.Observable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(202)

本文整理了Java中rx.Observable.doOnNext()方法的一些代码示例,展示了Observable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnNext

Observable.doOnNext介绍

[英]Modifies the source Observable so that it invokes an action when it calls onNext.

Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。

代码示例

代码示例来源:origin: apache/usergrid

public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                               WriteStart writeState ) {
  return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
    Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeVerifyUnique );
    // optimistic verification
    Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeOptimisticVerify );
    final Observable<CollectionIoEvent<MvccEntity>> zip =
      Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
    return zip;
  } );
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/observable")
public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
  return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
}

代码示例来源:origin: hidroh/materialistic

@NonNull
private Observable<String> fromNetwork(String itemId, String url) {
  return mMercuryService.parse(url)
      .onErrorReturn(throwable -> null)
      .map(readable -> readable == null ? null : readable.content)
      .doOnNext(content -> mCache.putReadability(itemId, content));
}

代码示例来源:origin: PipelineAI/pipeline

@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
    @Override
    public void call(BatchReturnType batchReturnType) {
      // this is a blocking call in HystrixCollapser
      self.mapResponseToRequests(batchReturnType, requests);
    }
  }).ignoreElements().cast(Void.class);
}

代码示例来源:origin: vert-x3/vertx-examples

private void insertAndFind() {
  // Documents to insert
  Observable<JsonObject> documents = Observable.just(
   new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
   new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
  );

  mongo.rxCreateCollection("users").flatMapObservable(v -> {
   // After collection is created we insert each document
   return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
  }).doOnNext(id -> {
   System.out.println("Inserted document " + id);
  }).last().toSingle().flatMap(id -> {
   // Everything has been inserted now we can query mongo
   System.out.println("Insertions done");
   return mongo.rxFind("users", new JsonObject());
  }).subscribe(results -> {
   System.out.println("Results " + results);
  }, error -> {
   System.out.println("Err");
   error.printStackTrace();
  });
 }
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<Id> mark(final Id entityId, String region) {
  Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
  Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
  Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
  Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) )
    .map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map(
      entityEvent -> entityEvent.getEvent().getId() );
  return ObservableTimer.time( o, deleteTimer );
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getIm9API()
      .getUserInterestQuanData(mid, pageNum, pageSize)
      .compose(bindToLifecycle())
      .map(userInterestQuanInfo -> userInterestQuanInfo.getData().getResult())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext(resultBeans -> {
        if (resultBeans.size() < pageSize) {
          loadMoreView.setVisibility(View.GONE);
          mHeaderViewRecyclerAdapter.removeFootView();
        }
      })
      .subscribe(resultBeans -> {
        userInterestQuans.addAll(resultBeans);
        finishTask();
      }, throwable -> loadMoreView.setVisibility(View.GONE));
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getUserAPI()
      .getUserContributeVideos(mid, pageNum, pageSize)
      .compose(this.bindToLifecycle())
      .map(userContributeInfo -> userContributeInfo.getData().getVlist())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext(vlistBeans -> {
        if (vlistBeans.size() < pageSize) {
          loadMoreView.setVisibility(View.GONE);
          mHeaderViewRecyclerAdapter.removeFootView();
        }
      })
      .subscribe(vlistBeans -> {
        userContributes.addAll(vlistBeans);
        finishTask();
      }, throwable -> loadMoreView.setVisibility(View.GONE));
}

代码示例来源:origin: ReactiveX/RxNetty

static <X> DisposableContentSource<X> createNew(Observable<X> source) {
  final ArrayList<X> chunks = new ArrayList<>();
  ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
    @Override
    public void call(X x) {
      chunks.add(x);
    }
  }).replay();
  return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<String> construct() {
  executed = true;
  return Observable.just(value).delay(duration, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation())
      .doOnNext(new Action1<String>() {
        @Override
        public void call(String t1) {
          System.out.println("successfully executed");
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

/* package */ HystrixThreadEventStream(Thread thread) {
  this.threadId = thread.getId();
  this.threadName = thread.getName();
  writeOnlyCommandStartSubject = PublishSubject.create();
  writeOnlyCommandCompletionSubject = PublishSubject.create();
  writeOnlyCollapserSubject = PublishSubject.create();
  writeOnlyCommandStartSubject
      .onBackpressureBuffer()
      .doOnNext(writeCommandStartsToShardedStreams)
      .unsafeSubscribe(Subscribers.empty());
  writeOnlyCommandCompletionSubject
      .onBackpressureBuffer()
      .doOnNext(writeCommandCompletionsToShardedStreams)
      .unsafeSubscribe(Subscribers.empty());
  writeOnlyCollapserSubject
      .onBackpressureBuffer()
      .doOnNext(writeCollapserExecutionsToShardedStreams)
      .unsafeSubscribe(Subscribers.empty());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getBiliAppAPI()
      .searchArchive(content, pageNum, pageSize)
      .compose(this.bindToLifecycle())
      .map(SearchArchiveInfo::getData)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext(dataBean -> {
        if (dataBean.getItems().getArchive().size() < pageSize) {
          loadMoreView.setVisibility(View.GONE);
          mHeaderViewRecyclerAdapter.removeFootView();
        }
      })
      .subscribe(dataBean -> {
        archives.addAll(dataBean.getItems().getArchive());
        seasons.addAll(dataBean.getItems().getSeason());
        finishTask();
      }, throwable -> {
        showEmptyView();
        loadMoreView.setVisibility(View.GONE);
      });
}

代码示例来源:origin: apache/usergrid

@Test
public void testSubscribe(){
  List<Integer> expected = Arrays.asList( 10, 9, 9,  8, 7, 6,  6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
  final AtomicInteger i = new AtomicInteger();
  Observable.from(expected).doOnNext(x -> {
    logger.info("print " + x);
    i.set(x);
  }).doOnError(e -> logger.error(e.getMessage())).subscribe();
  logger.info("last");
  assertTrue(i.get()==0);
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

@Test
public void testSubscribeException() {
  try {
    List<Integer> expected = Arrays.asList(10, 9, 9, 8, 7, 6, 6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
    Observable.from(expected).doOnNext(x -> {
      logger.info("print " + x);
      throw new RuntimeException();
    }).doOnError(e -> logger.error(e.getMessage())).subscribe();
    logger.info("last");
    fail();
  } catch (Exception e) {
  }
}
/**

代码示例来源:origin: apache/usergrid

@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch( count+1 );
  final Subscription connectedObservable =
    Observable.range( 0, count )
      .doOnNext( integer -> latch.countDown() )
      .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
      .subscribe();
  final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  assertTrue( "publish1 behaves as expected", completed );
  final boolean completedSubscription = connectedObservable.isUnsubscribed();
  assertTrue( "Subscription complete", completedSubscription );
}

代码示例来源:origin: ReactiveX/RxNetty

@Test
public void testUnsubscribeFromUpstream() throws Exception {
  final List<String> emittedBufs = new ArrayList<>();
  toByteBufObservable("first", "second", "third")
      .doOnNext(new Action1<ByteBuf>() {
        @Override
        public void call(ByteBuf byteBuf) {
          emittedBufs.add(byteBuf.toString(Charset.defaultCharset()));
        }
      })
      .compose(CollectBytes.upTo(7))
      .subscribe(new TestSubscriber<>());
  Assert.assertEquals(Arrays.asList("first", "second"), emittedBufs);
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedResponseFromCache() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
      .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
        @Override
        public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
          System.out.println("LIST : " + hystrixCommandCompletions);
          assertEquals(3, hystrixCommandCompletions.size());
        }
      })
      .subscribe(commandListSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedResponseFromCache() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
      .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
        @Override
        public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
          System.out.println("LIST : " + hystrixCommandCompletions);
          assertEquals(3, hystrixCommandCompletions.size());
        }
      })
      .subscribe(commandListSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

相关文章

Observable类方法