rx.Observable.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.6k)|赞(0)|评价(0)|浏览(225)

本文整理了Java中rx.Observable.takeWhile()方法的一些代码示例,展示了Observable.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeWhile()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:takeWhile

Observable.takeWhile介绍

[英]Returns an Observable that emits items emitted by the source Observable so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个Observable,只要每个项目满足指定的条件,它就会发出源Observable发出的项目,然后在不满足该条件时立即完成。
调度程序:默认情况下,takeWhile不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<MarkedEdge> compactNode( final Id inputNode ) {
  3. final UUID startTime = UUIDGenerator.newTimeUUID();
  4. final Observable<MarkedEdge> nodeObservable =
  5. Observable.just( inputNode )
  6. .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
  7. //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
  8. .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
  9. //map our delete listener
  10. .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
  11. return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
  12. }

代码示例来源:origin: bumptech/glide

  1. .takeWhile(new Func1<List<Image>, Boolean>() {
  2. @Override
  3. public Boolean call(List<Image> images) {

代码示例来源:origin: apache/usergrid

  1. entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
  2. .flatMap(writeEvent -> {
  3. return Observable.just(writeEvent).doOnNext(doWork);
  4. final int connectionCount = otherEventObservable.takeWhile(
  5. writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> {
  6. return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).takeWhile(new Func1<Integer, Boolean>() {
  4. @Override
  5. public Boolean call(Integer i) {
  6. return i < 3;
  7. }
  8. }).subscribe(new Action1<Integer>() {
  9. @Override
  10. public void call(Integer integer) {
  11. log(integer);
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Out> call() {
  3. Mutable<State> state = new Mutable<State>(initialState.call());
  4. return source.materialize()
  5. // do state transitions and emit notifications
  6. // use flatMap to emit notification values
  7. .flatMap(execute(transition, completion, state, backpressureStrategy),
  8. initialRequest)
  9. // complete if we encounter an unsubscribed sentinel
  10. .takeWhile(NOT_UNSUBSCRIBED)
  11. // flatten notifications to a stream which will enable
  12. // early termination from the state machine if desired
  13. .dematerialize();
  14. }
  15. });

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Out> call() {
  3. Mutable<State> state = new Mutable<State>(initialState.call());
  4. return source.materialize()
  5. // do state transitions and emit notifications
  6. // use flatMap to emit notification values
  7. .flatMap(execute(transition, completion, state, backpressureStrategy),
  8. initialRequest)
  9. // complete if we encounter an unsubscribed sentinel
  10. .takeWhile(NOT_UNSUBSCRIBED)
  11. // flatten notifications to a stream which will enable
  12. // early termination from the state machine if desired
  13. .dematerialize();
  14. }
  15. });

代码示例来源:origin: com.couchbase.client/core-io

  1. Observable
  2. .interval(1, TimeUnit.SECONDS)
  3. .takeWhile(new Func1<Long, Boolean>() {
  4. @Override
  5. public Boolean call(Long aLong) {

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. Observable
  2. .interval(1, TimeUnit.SECONDS)
  3. .takeWhile(new Func1<Long, Boolean>() {
  4. @Override
  5. public Boolean call(Long aLong) {

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. void allPeople() {
  2. Observable<Person> allPages = Observable
  3. .range(0, Integer.MAX_VALUE)
  4. .map(this::listPeople)
  5. .takeWhile(list -> !list.isEmpty())
  6. .concatMap(Observable::from);
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_537() throws Exception {
  3. Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
  4. Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]
  5. }

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

  1. /**
  2. * Get the shopping cart for a certain user.
  3. *
  4. * @param userId user id
  5. * @return async result
  6. */
  7. private Future<ShoppingCart> aggregateCartEvents(String userId) {
  8. Future<ShoppingCart> future = Future.future();
  9. // aggregate cart events into raw shopping cart
  10. repository.streamByUser(userId)
  11. .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType()))
  12. .reduce(new ShoppingCart(), ShoppingCart::incorporate)
  13. .toSingle()
  14. .subscribe(future::complete, future::fail);
  15. return future.compose(cart ->
  16. getProductService()
  17. .compose(service -> prepareProduct(service, cart)) // prepare product data
  18. .compose(productList -> generateCurrentCartFromStream(cart, productList)) // prepare product items
  19. );
  20. }

相关文章

Observable类方法