io.reactivex.Observable.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(202)

本文整理了Java中io.reactivex.Observable.takeWhile()方法的一些代码示例,展示了Observable.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeWhile()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:takeWhile

Observable.takeWhile介绍

[英]Returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个Observable,只要每个项目满足指定的条件,它就会发出源ObservableSource发出的项目,然后在不满足该条件时立即完成。
调度程序:默认情况下,takeWhile不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<?> apply(Observable<Object> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.takeWhile(Functions.alwaysTrue());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void takeWhileNull() {
  just1.takeWhile(null);
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
public ObservableSource<?> apply(Observable<?> emittingOnBatchWriteFinished) {
  return emittingOnBatchWriteFinished
      .takeWhile(notUnsubscribed(emitterWrapper))
      .map(bufferIsNotEmpty(byteBuffer))
      .compose(writeOperationAckStrategy)
      .takeWhile(new Predicate<Boolean>() {
        @Override
        public boolean test(Boolean hasRemaining) {
          return hasRemaining;
        }
      });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue1451Case1() {
  // https://github.com/Netflix/RxJava/issues/1451
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Observable
        .just(Boolean.TRUE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean value) {
            return value;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhile1() {
  Observable<Integer> w = Observable.just(1, 2, 3);
  Observable<Integer> take = w.takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer input) {
      return input < 3;
    }
  });
  Observer<Integer> observer = TestHelper.mockObserver();
  take.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, never()).onNext(3);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhile2() {
  Observable<String> w = Observable.just("one", "two", "three");
  Observable<String> take = w.takeWhile(new Predicate<String>() {
    int index;
    @Override
    public boolean test(String input) {
      return index++ < 2;
    }
  });
  Observer<String> observer = TestHelper.mockObserver();
  take.subscribe(observer);
  verify(observer, times(1)).onNext("one");
  verify(observer, times(1)).onNext("two");
  verify(observer, never()).onNext("three");
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileToList() {
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Observable
        .just(Boolean.TRUE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean v) {
            return v;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue1451Case2() {
  // https://github.com/Netflix/RxJava/issues/1451
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Observable
        .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean value) {
            return value;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

@Override
  protected void doSomeWork() {
    getStringObservable()
        //Delay item emission by one second
        .zipWith(Observable.interval(0, 1, TimeUnit.SECONDS), new BiFunction<String, Long, String>() {
          @Override
          public String apply(String s, Long aLong) throws Exception {
            return s;
          }
        })
        //Take the items until the condition is met.
        .takeWhile(new Predicate<String>() {
          @Override
          public boolean test(String s) throws Exception {
            return !s.toLowerCase().contains("honey");
          }
        })
        //We need to observe on MainThread because delay works on background thread to avoid UI blocking.
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(getObserver());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileProtectsPredicateCall() {
  TestObservable source = new TestObservable(mock(Disposable.class), "one");
  final RuntimeException testException = new RuntimeException("test exception");
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> take = Observable.unsafeCreate(source)
      .takeWhile(new Predicate<String>() {
    @Override
    public boolean test(String s) {
      throw testException;
    }
  });
  take.subscribe(observer);
  // wait for the Observable to complete
  try {
    source.t.join();
  } catch (Throwable e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
  verify(observer, never()).onNext(any(String.class));
  verify(observer, times(1)).onError(testException);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeAfterTake() {
  Disposable upstream = mock(Disposable.class);
  TestObservable w = new TestObservable(upstream, "one", "two", "three");
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> take = Observable.unsafeCreate(w)
      .takeWhile(new Predicate<String>() {
    int index;
    @Override
    public boolean test(String s) {
      return index++ < 1;
    }
  });
  take.subscribe(observer);
  // wait for the Observable to complete
  try {
    w.t.join();
  } catch (Throwable e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
  System.out.println("TestObservable thread finished");
  verify(observer, times(1)).onNext("one");
  verify(observer, never()).onNext("two");
  verify(observer, never()).onNext("three");
  verify(upstream, times(1)).dispose();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testErrorCauseIncludesLastValue() {
    TestObserver<String> to = new TestObserver<String>();
    Observable.just("abc").takeWhile(new Predicate<String>() {
      @Override
      public boolean test(String t1) {
        throw new TestException();
      }
    }).subscribe(to);

    to.assertTerminated();
    to.assertNoValues();
    to.assertError(TestException.class);
    // FIXME last cause value not recorded
//        assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
  }

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileDoesntLeakErrors() {
  Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onNext("one");
      observer.onError(new Throwable("test failed"));
    }
  });
  source.takeWhile(new Predicate<String>() {
    @Override
    public boolean test(String s) {
      return false;
    }
  }).blockingLast("");
}

代码示例来源:origin: Polidea/RxAndroidBle

/**
 * Observable that emits `true` if the permission was granted on the time of subscription
 * @param locationServicesStatus the LocationServicesStatus
 * @param timerScheduler the Scheduler
 * @return the observable
 */
@NonNull
private static Single<Boolean> checkPermissionUntilGranted(
    final LocationServicesStatus locationServicesStatus,
    Scheduler timerScheduler
) {
  return Observable.interval(0, 1L, TimeUnit.SECONDS, timerScheduler)
      .takeWhile(new Predicate<Long>() {
        @Override
        public boolean test(Long timer) {
          return !locationServicesStatus.isLocationPermissionOk();
        }
      })
      .count()
      .map(new Function<Long, Boolean>() {
        @Override
        public Boolean apply(Long count) throws Exception {
          // if no elements were emitted then the permission was granted from the beginning
          return count == 0;
        }
      });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testNoUnsubscribeDownstream() {
    Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Predicate<Integer>() {
      @Override
      public boolean test(Integer t1) {
        return t1 < 2;
      }
    });
    TestObserver<Integer> to = new TestObserver<Integer>();

    source.subscribe(to);

    to.assertNoErrors();
    to.assertValue(1);

    // 2.0.2 - not anymore
//        Assert.assertTrue("Not cancelled!", ts.isCancelled());
  }

代码示例来源:origin: L4Digital/RxLoader

private Observable<String> getObservable() {
  return Observable.interval(500, TimeUnit.MILLISECONDS)
      .takeWhile(new Predicate<Long>() {
        @Override
        public boolean test(Long tick) throws Exception {
          return tick < sVersionNames.length;
        }
      })
      .map(new Function<Long, String>() {
        @Override
        public String apply(Long tick) throws Exception {
          return sVersionNames[tick.intValue()];
        }
      });
}

代码示例来源:origin: AppStoreFoundation/asf-sdk

private ObservableSource<?> handleWsError(Observable<Throwable> throwableObservable) {
 AtomicInteger counter = new AtomicInteger();
 return throwableObservable.flatMap(throwable -> {
  if (throwable instanceof HttpException) {
   return Observable.just(throwable)
     .takeWhile(__ -> counter.getAndIncrement() != 5)
     .flatMap(__ -> Observable.timer(5, TimeUnit.SECONDS));
  } else {
   return Observable.just(throwable);
  }
 });
}

相关文章

Observable类方法