io.reactivex.Flowable.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(233)

本文整理了Java中io.reactivex.Flowable.takeWhile()方法的一些代码示例,展示了Flowable.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.takeWhile()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:takeWhile

Flowable.takeWhile介绍

[英]Returns a Flowable that emits items emitted by the source Publisher so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个可流动项,只要每个项满足指定的条件,它就会发出源发布服务器发出的项,然后在不满足此条件时立即完成。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,takeWhile不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<?> apply(Flowable<Object> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
  3. return e.takeWhile(new Predicate<Object>() {
  4. @Override
  5. public boolean test(Object v) throws Exception {
  6. return times.getAndIncrement() < 4;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.takeWhile(Functions.alwaysTrue());
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void takeWhileNull() {
  3. just1.takeWhile(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements).takeWhile(Functions.alwaysTrue())
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhile2() {
  3. Flowable<String> w = Flowable.just("one", "two", "three");
  4. Flowable<String> take = w.takeWhile(new Predicate<String>() {
  5. int index;
  6. @Override
  7. public boolean test(String input) {
  8. return index++ < 2;
  9. }
  10. });
  11. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  12. take.subscribe(subscriber);
  13. verify(subscriber, times(1)).onNext("one");
  14. verify(subscriber, times(1)).onNext("two");
  15. verify(subscriber, never()).onNext("three");
  16. verify(subscriber, never()).onError(any(Throwable.class));
  17. verify(subscriber, times(1)).onComplete();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhile1() {
  3. Flowable<Integer> w = Flowable.just(1, 2, 3);
  4. Flowable<Integer> take = w.takeWhile(new Predicate<Integer>() {
  5. @Override
  6. public boolean test(Integer input) {
  7. return input < 3;
  8. }
  9. });
  10. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  11. take.subscribe(subscriber);
  12. verify(subscriber, times(1)).onNext(1);
  13. verify(subscriber, times(1)).onNext(2);
  14. verify(subscriber, never()).onNext(3);
  15. verify(subscriber, never()).onError(any(Throwable.class));
  16. verify(subscriber, times(1)).onComplete();
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue1451Case2() {
  3. // https://github.com/Netflix/RxJava/issues/1451
  4. final int expectedCount = 3;
  5. final AtomicInteger count = new AtomicInteger();
  6. for (int i = 0; i < expectedCount; i++) {
  7. Flowable
  8. .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
  9. .takeWhile(new Predicate<Boolean>() {
  10. @Override
  11. public boolean test(Boolean value) {
  12. return value;
  13. }
  14. })
  15. .toList()
  16. .doOnSuccess(new Consumer<List<Boolean>>() {
  17. @Override
  18. public void accept(List<Boolean> booleans) {
  19. count.incrementAndGet();
  20. }
  21. })
  22. .subscribe();
  23. }
  24. assertEquals(expectedCount, count.get());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileToList() {
  3. final int expectedCount = 3;
  4. final AtomicInteger count = new AtomicInteger();
  5. for (int i = 0; i < expectedCount; i++) {
  6. Flowable
  7. .just(Boolean.TRUE, Boolean.FALSE)
  8. .takeWhile(new Predicate<Boolean>() {
  9. @Override
  10. public boolean test(Boolean v) {
  11. return v;
  12. }
  13. })
  14. .toList()
  15. .doOnSuccess(new Consumer<List<Boolean>>() {
  16. @Override
  17. public void accept(List<Boolean> booleans) {
  18. count.incrementAndGet();
  19. }
  20. })
  21. .subscribe();
  22. }
  23. assertEquals(expectedCount, count.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileProtectsPredicateCall() {
  3. TestFlowable source = new TestFlowable(mock(Subscription.class), "one");
  4. final RuntimeException testException = new RuntimeException("test exception");
  5. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  6. Flowable<String> take = Flowable.unsafeCreate(source)
  7. .takeWhile(new Predicate<String>() {
  8. @Override
  9. public boolean test(String s) {
  10. throw testException;
  11. }
  12. });
  13. take.subscribe(subscriber);
  14. // wait for the Flowable to complete
  15. try {
  16. source.t.join();
  17. } catch (Throwable e) {
  18. e.printStackTrace();
  19. fail(e.getMessage());
  20. }
  21. verify(subscriber, never()).onNext(any(String.class));
  22. verify(subscriber, times(1)).onError(testException);
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUnsubscribeAfterTake() {
  3. Subscription s = mock(Subscription.class);
  4. TestFlowable w = new TestFlowable(s, "one", "two", "three");
  5. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  6. Flowable<String> take = Flowable.unsafeCreate(w)
  7. .takeWhile(new Predicate<String>() {
  8. int index;
  9. @Override
  10. public boolean test(String s) {
  11. return index++ < 1;
  12. }
  13. });
  14. take.subscribe(subscriber);
  15. // wait for the Flowable to complete
  16. try {
  17. w.t.join();
  18. } catch (Throwable e) {
  19. e.printStackTrace();
  20. fail(e.getMessage());
  21. }
  22. System.out.println("TestFlowable thread finished");
  23. verify(subscriber, times(1)).onNext("one");
  24. verify(subscriber, never()).onNext("two");
  25. verify(subscriber, never()).onNext("three");
  26. verify(s, times(1)).cancel();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue1451Case1() {
  3. // https://github.com/Netflix/RxJava/issues/1451
  4. final int expectedCount = 3;
  5. final AtomicInteger count = new AtomicInteger();
  6. for (int i = 0; i < expectedCount; i++) {
  7. Flowable
  8. .just(Boolean.TRUE, Boolean.FALSE)
  9. .takeWhile(new Predicate<Boolean>() {
  10. @Override
  11. public boolean test(Boolean value) {
  12. return value;
  13. }
  14. })
  15. .toList()
  16. .doOnSuccess(new Consumer<List<Boolean>>() {
  17. @Override
  18. public void accept(List<Boolean> booleans) {
  19. count.incrementAndGet();
  20. }
  21. })
  22. .subscribe();
  23. }
  24. assertEquals(expectedCount, count.get());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. .takeWhile(new Predicate<List<Integer>>() {
  2. @Override
  3. public boolean test(List<Integer> v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testOnBackpressureBuffer() {
  3. int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
  4. AtomicInteger c = new AtomicInteger();
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. firehose(c).takeWhile(new Predicate<Integer>() {
  7. @Override
  8. public boolean test(Integer t1) {
  9. return t1 < 100000;
  10. }
  11. })
  12. .onBackpressureBuffer()
  13. .observeOn(Schedulers.computation())
  14. .map(SLOW_PASS_THRU).take(num).subscribe(ts);
  15. ts.awaitTerminalEvent();
  16. ts.assertNoErrors();
  17. System.out.println("testOnBackpressureBuffer => Received: " + ts.valueCount() + " Emitted: " + c.get());
  18. assertEquals(num, ts.valueCount());
  19. // it buffers, so we should get the right value sequentially
  20. assertEquals(num - 1, ts.values().get(num - 1).intValue());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorCauseIncludesLastValue() {
  3. TestSubscriber<String> ts = new TestSubscriber<String>();
  4. Flowable.just("abc").takeWhile(new Predicate<String>() {
  5. @Override
  6. public boolean test(String t1) {
  7. throw new TestException();
  8. }
  9. }).subscribe(ts);
  10. ts.assertTerminated();
  11. ts.assertNoValues();
  12. ts.assertError(TestException.class);
  13. // FIXME last cause value not recorded
  14. // assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWhileDoesntLeakErrors() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
  6. @Override
  7. public void subscribe(Subscriber<? super String> subscriber) {
  8. subscriber.onSubscribe(new BooleanSubscription());
  9. subscriber.onNext("one");
  10. subscriber.onError(new TestException("test failed"));
  11. }
  12. });
  13. source.takeWhile(new Predicate<String>() {
  14. @Override
  15. public boolean test(String s) {
  16. return false;
  17. }
  18. }).blockingLast("");
  19. TestHelper.assertUndeliverable(errors, 0, TestException.class, "test failed");
  20. } finally {
  21. RxJavaPlugins.reset();
  22. }
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. .takeWhile(new Predicate<List<Integer>>() {
  2. @Override
  3. public boolean test(List<Integer> v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoUnsubscribeDownstream() {
  3. Flowable<Integer> source = Flowable.range(1, 1000).takeWhile(new Predicate<Integer>() {
  4. @Override
  5. public boolean test(Integer t1) {
  6. return t1 < 2;
  7. }
  8. });
  9. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  10. source.subscribe(ts);
  11. ts.assertNoErrors();
  12. ts.assertValue(1);
  13. Assert.assertFalse("Unsubscribed!", ts.isCancelled());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure() {
  3. Flowable<Integer> source = Flowable.range(1, 1000).takeWhile(new Predicate<Integer>() {
  4. @Override
  5. public boolean test(Integer t1) {
  6. return t1 < 100;
  7. }
  8. });
  9. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(5L);
  10. source.subscribe(ts);
  11. ts.assertNoErrors();
  12. ts.assertValues(1, 2, 3, 4, 5);
  13. ts.request(5);
  14. ts.assertNoErrors();
  15. ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  16. }

相关文章

Flowable类方法