io.reactivex.Observable.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(222)

本文整理了Java中io.reactivex.Observable.takeWhile()方法的一些代码示例,展示了Observable.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeWhile()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:takeWhile

Observable.takeWhile介绍

[英]Returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个Observable,只要每个项目满足指定的条件,它就会发出源ObservableSource发出的项目,然后在不满足该条件时立即完成。
调度程序:默认情况下,takeWhile不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<?> apply(Observable<Object> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
  3. return o.takeWhile(Functions.alwaysTrue());
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void takeWhileNull() {
  3. just1.takeWhile(null);
  4. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. @Override
  2. public ObservableSource<?> apply(Observable<?> emittingOnBatchWriteFinished) {
  3. return emittingOnBatchWriteFinished
  4. .takeWhile(notUnsubscribed(emitterWrapper))
  5. .map(bufferIsNotEmpty(byteBuffer))
  6. .compose(writeOperationAckStrategy)
  7. .takeWhile(new Predicate<Boolean>() {
  8. @Override
  9. public boolean test(Boolean hasRemaining) {
  10. return hasRemaining;
  11. }
  12. });
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue1451Case1() {
  3. // https://github.com/Netflix/RxJava/issues/1451
  4. final int expectedCount = 3;
  5. final AtomicInteger count = new AtomicInteger();
  6. for (int i = 0; i < expectedCount; i++) {
  7. Observable
  8. .just(Boolean.TRUE, Boolean.FALSE)
  9. .takeWhile(new Predicate<Boolean>() {
  10. @Override
  11. public boolean test(Boolean value) {
  12. return value;
  13. }
  14. })
  15. .toList()
  16. .doOnSuccess(new Consumer<List<Boolean>>() {
  17. @Override
  18. public void accept(List<Boolean> booleans) {
  19. count.incrementAndGet();
  20. }
  21. })
  22. .subscribe();
  23. }
  24. assertEquals(expectedCount, count.get());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhile1() {
  3. Observable<Integer> w = Observable.just(1, 2, 3);
  4. Observable<Integer> take = w.takeWhile(new Predicate<Integer>() {
  5. @Override
  6. public boolean test(Integer input) {
  7. return input < 3;
  8. }
  9. });
  10. Observer<Integer> observer = TestHelper.mockObserver();
  11. take.subscribe(observer);
  12. verify(observer, times(1)).onNext(1);
  13. verify(observer, times(1)).onNext(2);
  14. verify(observer, never()).onNext(3);
  15. verify(observer, never()).onError(any(Throwable.class));
  16. verify(observer, times(1)).onComplete();
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhile2() {
  3. Observable<String> w = Observable.just("one", "two", "three");
  4. Observable<String> take = w.takeWhile(new Predicate<String>() {
  5. int index;
  6. @Override
  7. public boolean test(String input) {
  8. return index++ < 2;
  9. }
  10. });
  11. Observer<String> observer = TestHelper.mockObserver();
  12. take.subscribe(observer);
  13. verify(observer, times(1)).onNext("one");
  14. verify(observer, times(1)).onNext("two");
  15. verify(observer, never()).onNext("three");
  16. verify(observer, never()).onError(any(Throwable.class));
  17. verify(observer, times(1)).onComplete();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileToList() {
  3. final int expectedCount = 3;
  4. final AtomicInteger count = new AtomicInteger();
  5. for (int i = 0; i < expectedCount; i++) {
  6. Observable
  7. .just(Boolean.TRUE, Boolean.FALSE)
  8. .takeWhile(new Predicate<Boolean>() {
  9. @Override
  10. public boolean test(Boolean v) {
  11. return v;
  12. }
  13. })
  14. .toList()
  15. .doOnSuccess(new Consumer<List<Boolean>>() {
  16. @Override
  17. public void accept(List<Boolean> booleans) {
  18. count.incrementAndGet();
  19. }
  20. })
  21. .subscribe();
  22. }
  23. assertEquals(expectedCount, count.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue1451Case2() {
  3. // https://github.com/Netflix/RxJava/issues/1451
  4. final int expectedCount = 3;
  5. final AtomicInteger count = new AtomicInteger();
  6. for (int i = 0; i < expectedCount; i++) {
  7. Observable
  8. .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
  9. .takeWhile(new Predicate<Boolean>() {
  10. @Override
  11. public boolean test(Boolean value) {
  12. return value;
  13. }
  14. })
  15. .toList()
  16. .doOnSuccess(new Consumer<List<Boolean>>() {
  17. @Override
  18. public void accept(List<Boolean> booleans) {
  19. count.incrementAndGet();
  20. }
  21. })
  22. .subscribe();
  23. }
  24. assertEquals(expectedCount, count.get());
  25. }

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

  1. @Override
  2. protected void doSomeWork() {
  3. getStringObservable()
  4. //Delay item emission by one second
  5. .zipWith(Observable.interval(0, 1, TimeUnit.SECONDS), new BiFunction<String, Long, String>() {
  6. @Override
  7. public String apply(String s, Long aLong) throws Exception {
  8. return s;
  9. }
  10. })
  11. //Take the items until the condition is met.
  12. .takeWhile(new Predicate<String>() {
  13. @Override
  14. public boolean test(String s) throws Exception {
  15. return !s.toLowerCase().contains("honey");
  16. }
  17. })
  18. //We need to observe on MainThread because delay works on background thread to avoid UI blocking.
  19. .observeOn(AndroidSchedulers.mainThread())
  20. .subscribe(getObserver());
  21. }
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileProtectsPredicateCall() {
  3. TestObservable source = new TestObservable(mock(Disposable.class), "one");
  4. final RuntimeException testException = new RuntimeException("test exception");
  5. Observer<String> observer = TestHelper.mockObserver();
  6. Observable<String> take = Observable.unsafeCreate(source)
  7. .takeWhile(new Predicate<String>() {
  8. @Override
  9. public boolean test(String s) {
  10. throw testException;
  11. }
  12. });
  13. take.subscribe(observer);
  14. // wait for the Observable to complete
  15. try {
  16. source.t.join();
  17. } catch (Throwable e) {
  18. e.printStackTrace();
  19. fail(e.getMessage());
  20. }
  21. verify(observer, never()).onNext(any(String.class));
  22. verify(observer, times(1)).onError(testException);
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUnsubscribeAfterTake() {
  3. Disposable upstream = mock(Disposable.class);
  4. TestObservable w = new TestObservable(upstream, "one", "two", "three");
  5. Observer<String> observer = TestHelper.mockObserver();
  6. Observable<String> take = Observable.unsafeCreate(w)
  7. .takeWhile(new Predicate<String>() {
  8. int index;
  9. @Override
  10. public boolean test(String s) {
  11. return index++ < 1;
  12. }
  13. });
  14. take.subscribe(observer);
  15. // wait for the Observable to complete
  16. try {
  17. w.t.join();
  18. } catch (Throwable e) {
  19. e.printStackTrace();
  20. fail(e.getMessage());
  21. }
  22. System.out.println("TestObservable thread finished");
  23. verify(observer, times(1)).onNext("one");
  24. verify(observer, never()).onNext("two");
  25. verify(observer, never()).onNext("three");
  26. verify(upstream, times(1)).dispose();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorCauseIncludesLastValue() {
  3. TestObserver<String> to = new TestObserver<String>();
  4. Observable.just("abc").takeWhile(new Predicate<String>() {
  5. @Override
  6. public boolean test(String t1) {
  7. throw new TestException();
  8. }
  9. }).subscribe(to);
  10. to.assertTerminated();
  11. to.assertNoValues();
  12. to.assertError(TestException.class);
  13. // FIXME last cause value not recorded
  14. // assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileDoesntLeakErrors() {
  3. Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
  4. @Override
  5. public void subscribe(Observer<? super String> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onNext("one");
  8. observer.onError(new Throwable("test failed"));
  9. }
  10. });
  11. source.takeWhile(new Predicate<String>() {
  12. @Override
  13. public boolean test(String s) {
  14. return false;
  15. }
  16. }).blockingLast("");
  17. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. /**
  2. * Observable that emits `true` if the permission was granted on the time of subscription
  3. * @param locationServicesStatus the LocationServicesStatus
  4. * @param timerScheduler the Scheduler
  5. * @return the observable
  6. */
  7. @NonNull
  8. private static Single<Boolean> checkPermissionUntilGranted(
  9. final LocationServicesStatus locationServicesStatus,
  10. Scheduler timerScheduler
  11. ) {
  12. return Observable.interval(0, 1L, TimeUnit.SECONDS, timerScheduler)
  13. .takeWhile(new Predicate<Long>() {
  14. @Override
  15. public boolean test(Long timer) {
  16. return !locationServicesStatus.isLocationPermissionOk();
  17. }
  18. })
  19. .count()
  20. .map(new Function<Long, Boolean>() {
  21. @Override
  22. public Boolean apply(Long count) throws Exception {
  23. // if no elements were emitted then the permission was granted from the beginning
  24. return count == 0;
  25. }
  26. });
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoUnsubscribeDownstream() {
  3. Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Predicate<Integer>() {
  4. @Override
  5. public boolean test(Integer t1) {
  6. return t1 < 2;
  7. }
  8. });
  9. TestObserver<Integer> to = new TestObserver<Integer>();
  10. source.subscribe(to);
  11. to.assertNoErrors();
  12. to.assertValue(1);
  13. // 2.0.2 - not anymore
  14. // Assert.assertTrue("Not cancelled!", ts.isCancelled());
  15. }

代码示例来源:origin: L4Digital/RxLoader

  1. private Observable<String> getObservable() {
  2. return Observable.interval(500, TimeUnit.MILLISECONDS)
  3. .takeWhile(new Predicate<Long>() {
  4. @Override
  5. public boolean test(Long tick) throws Exception {
  6. return tick < sVersionNames.length;
  7. }
  8. })
  9. .map(new Function<Long, String>() {
  10. @Override
  11. public String apply(Long tick) throws Exception {
  12. return sVersionNames[tick.intValue()];
  13. }
  14. });
  15. }

代码示例来源:origin: AppStoreFoundation/asf-sdk

  1. private ObservableSource<?> handleWsError(Observable<Throwable> throwableObservable) {
  2. AtomicInteger counter = new AtomicInteger();
  3. return throwableObservable.flatMap(throwable -> {
  4. if (throwable instanceof HttpException) {
  5. return Observable.just(throwable)
  6. .takeWhile(__ -> counter.getAndIncrement() != 5)
  7. .flatMap(__ -> Observable.timer(5, TimeUnit.SECONDS));
  8. } else {
  9. return Observable.just(throwable);
  10. }
  11. });
  12. }

相关文章

Observable类方法