本文整理了Java中rx.Observable.takeWhile()
方法的一些代码示例,展示了Observable.takeWhile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeWhile()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:takeWhile
[英]Returns an Observable that emits items emitted by the source Observable so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.
Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个Observable,只要每个项目满足指定的条件,它就会发出源Observable发出的项目,然后在不满足该条件时立即完成。
调度程序:默认情况下,takeWhile不会在特定调度程序上运行。
代码示例来源:origin: apache/usergrid
@Override
public Observable<MarkedEdge> compactNode( final Id inputNode ) {
final UUID startTime = UUIDGenerator.newTimeUUID();
final Observable<MarkedEdge> nodeObservable =
Observable.just( inputNode )
.map( node -> nodeSerialization.getMaxVersion( scope, node ) )
//.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
.takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
//map our delete listener
.flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
}
代码示例来源:origin: bumptech/glide
.takeWhile(new Func1<List<Image>, Boolean>() {
@Override
public Boolean call(List<Image> images) {
代码示例来源:origin: apache/usergrid
entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
.flatMap(writeEvent -> {
return Observable.just(writeEvent).doOnNext(doWork);
final int connectionCount = otherEventObservable.takeWhile(
writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> {
return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i < 3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Out> call() {
Mutable<State> state = new Mutable<State>(initialState.call());
return source.materialize()
// do state transitions and emit notifications
// use flatMap to emit notification values
.flatMap(execute(transition, completion, state, backpressureStrategy),
initialRequest)
// complete if we encounter an unsubscribed sentinel
.takeWhile(NOT_UNSUBSCRIBED)
// flatten notifications to a stream which will enable
// early termination from the state machine if desired
.dematerialize();
}
});
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Out> call() {
Mutable<State> state = new Mutable<State>(initialState.call());
return source.materialize()
// do state transitions and emit notifications
// use flatMap to emit notification values
.flatMap(execute(transition, completion, state, backpressureStrategy),
initialRequest)
// complete if we encounter an unsubscribed sentinel
.takeWhile(NOT_UNSUBSCRIBED)
// flatten notifications to a stream which will enable
// early termination from the state machine if desired
.dematerialize();
}
});
代码示例来源:origin: com.couchbase.client/core-io
Observable
.interval(1, TimeUnit.SECONDS)
.takeWhile(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
代码示例来源:origin: couchbase/couchbase-jvm-core
Observable
.interval(1, TimeUnit.SECONDS)
.takeWhile(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
代码示例来源:origin: nurkiewicz/rxjava-book-examples
void allPeople() {
Observable<Person> allPages = Observable
.range(0, Integer.MAX_VALUE)
.map(this::listPeople)
.takeWhile(list -> !list.isEmpty())
.concatMap(Observable::from);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_537() throws Exception {
Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]
}
代码示例来源:origin: sczyh30/vertx-blueprint-microservice
/**
* Get the shopping cart for a certain user.
*
* @param userId user id
* @return async result
*/
private Future<ShoppingCart> aggregateCartEvents(String userId) {
Future<ShoppingCart> future = Future.future();
// aggregate cart events into raw shopping cart
repository.streamByUser(userId)
.takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType()))
.reduce(new ShoppingCart(), ShoppingCart::incorporate)
.toSingle()
.subscribe(future::complete, future::fail);
return future.compose(cart ->
getProductService()
.compose(service -> prepareProduct(service, cart)) // prepare product data
.compose(productList -> generateCurrentCartFromStream(cart, productList)) // prepare product items
);
}
内容来源于网络,如有侵权,请联系作者删除!