本文整理了Java中io.reactivex.Observable.takeWhile()
方法的一些代码示例,展示了Observable.takeWhile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeWhile()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:takeWhile
[英]Returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.
Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个Observable,只要每个项目满足指定的条件,它就会发出源ObservableSource发出的项目,然后在不满足该条件时立即完成。
调度程序:默认情况下,takeWhile不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<?> apply(Observable<Object> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
return o.takeWhile(Functions.alwaysTrue());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void takeWhileNull() {
just1.takeWhile(null);
}
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public ObservableSource<?> apply(Observable<?> emittingOnBatchWriteFinished) {
return emittingOnBatchWriteFinished
.takeWhile(notUnsubscribed(emitterWrapper))
.map(bufferIsNotEmpty(byteBuffer))
.compose(writeOperationAckStrategy)
.takeWhile(new Predicate<Boolean>() {
@Override
public boolean test(Boolean hasRemaining) {
return hasRemaining;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIssue1451Case1() {
// https://github.com/Netflix/RxJava/issues/1451
final int expectedCount = 3;
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < expectedCount; i++) {
Observable
.just(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Predicate<Boolean>() {
@Override
public boolean test(Boolean value) {
return value;
}
})
.toList()
.doOnSuccess(new Consumer<List<Boolean>>() {
@Override
public void accept(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(expectedCount, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWhile1() {
Observable<Integer> w = Observable.just(1, 2, 3);
Observable<Integer> take = w.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer input) {
return input < 3;
}
});
Observer<Integer> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, never()).onNext(3);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWhile2() {
Observable<String> w = Observable.just("one", "two", "three");
Observable<String> take = w.takeWhile(new Predicate<String>() {
int index;
@Override
public boolean test(String input) {
return index++ < 2;
}
});
Observer<String> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, never()).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWhileToList() {
final int expectedCount = 3;
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < expectedCount; i++) {
Observable
.just(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Predicate<Boolean>() {
@Override
public boolean test(Boolean v) {
return v;
}
})
.toList()
.doOnSuccess(new Consumer<List<Boolean>>() {
@Override
public void accept(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(expectedCount, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIssue1451Case2() {
// https://github.com/Netflix/RxJava/issues/1451
final int expectedCount = 3;
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < expectedCount; i++) {
Observable
.just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
.takeWhile(new Predicate<Boolean>() {
@Override
public boolean test(Boolean value) {
return value;
}
})
.toList()
.doOnSuccess(new Consumer<List<Boolean>>() {
@Override
public void accept(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(expectedCount, count.get());
}
代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples
@Override
protected void doSomeWork() {
getStringObservable()
//Delay item emission by one second
.zipWith(Observable.interval(0, 1, TimeUnit.SECONDS), new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long aLong) throws Exception {
return s;
}
})
//Take the items until the condition is met.
.takeWhile(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.toLowerCase().contains("honey");
}
})
//We need to observe on MainThread because delay works on background thread to avoid UI blocking.
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWhileProtectsPredicateCall() {
TestObservable source = new TestObservable(mock(Disposable.class), "one");
final RuntimeException testException = new RuntimeException("test exception");
Observer<String> observer = TestHelper.mockObserver();
Observable<String> take = Observable.unsafeCreate(source)
.takeWhile(new Predicate<String>() {
@Override
public boolean test(String s) {
throw testException;
}
});
take.subscribe(observer);
// wait for the Observable to complete
try {
source.t.join();
} catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
verify(observer, never()).onNext(any(String.class));
verify(observer, times(1)).onError(testException);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeAfterTake() {
Disposable upstream = mock(Disposable.class);
TestObservable w = new TestObservable(upstream, "one", "two", "three");
Observer<String> observer = TestHelper.mockObserver();
Observable<String> take = Observable.unsafeCreate(w)
.takeWhile(new Predicate<String>() {
int index;
@Override
public boolean test(String s) {
return index++ < 1;
}
});
take.subscribe(observer);
// wait for the Observable to complete
try {
w.t.join();
} catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
System.out.println("TestObservable thread finished");
verify(observer, times(1)).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, never()).onNext("three");
verify(upstream, times(1)).dispose();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorCauseIncludesLastValue() {
TestObserver<String> to = new TestObserver<String>();
Observable.just("abc").takeWhile(new Predicate<String>() {
@Override
public boolean test(String t1) {
throw new TestException();
}
}).subscribe(to);
to.assertTerminated();
to.assertNoValues();
to.assertError(TestException.class);
// FIXME last cause value not recorded
// assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWhileDoesntLeakErrors() {
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext("one");
observer.onError(new Throwable("test failed"));
}
});
source.takeWhile(new Predicate<String>() {
@Override
public boolean test(String s) {
return false;
}
}).blockingLast("");
}
代码示例来源:origin: Polidea/RxAndroidBle
/**
* Observable that emits `true` if the permission was granted on the time of subscription
* @param locationServicesStatus the LocationServicesStatus
* @param timerScheduler the Scheduler
* @return the observable
*/
@NonNull
private static Single<Boolean> checkPermissionUntilGranted(
final LocationServicesStatus locationServicesStatus,
Scheduler timerScheduler
) {
return Observable.interval(0, 1L, TimeUnit.SECONDS, timerScheduler)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(Long timer) {
return !locationServicesStatus.isLocationPermissionOk();
}
})
.count()
.map(new Function<Long, Boolean>() {
@Override
public Boolean apply(Long count) throws Exception {
// if no elements were emitted then the permission was granted from the beginning
return count == 0;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoUnsubscribeDownstream() {
Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer t1) {
return t1 < 2;
}
});
TestObserver<Integer> to = new TestObserver<Integer>();
source.subscribe(to);
to.assertNoErrors();
to.assertValue(1);
// 2.0.2 - not anymore
// Assert.assertTrue("Not cancelled!", ts.isCancelled());
}
代码示例来源:origin: L4Digital/RxLoader
private Observable<String> getObservable() {
return Observable.interval(500, TimeUnit.MILLISECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(Long tick) throws Exception {
return tick < sVersionNames.length;
}
})
.map(new Function<Long, String>() {
@Override
public String apply(Long tick) throws Exception {
return sVersionNames[tick.intValue()];
}
});
}
代码示例来源:origin: AppStoreFoundation/asf-sdk
private ObservableSource<?> handleWsError(Observable<Throwable> throwableObservable) {
AtomicInteger counter = new AtomicInteger();
return throwableObservable.flatMap(throwable -> {
if (throwable instanceof HttpException) {
return Observable.just(throwable)
.takeWhile(__ -> counter.getAndIncrement() != 5)
.flatMap(__ -> Observable.timer(5, TimeUnit.SECONDS));
} else {
return Observable.just(throwable);
}
});
}
内容来源于网络,如有侵权,请联系作者删除!