io.reactivex.Flowable.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中io.reactivex.Flowable.takeWhile()方法的一些代码示例,展示了Flowable.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.takeWhile()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:takeWhile

Flowable.takeWhile介绍

[英]Returns a Flowable that emits items emitted by the source Publisher so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: takeWhile does not operate by default on a particular Scheduler.
[中]返回一个可流动项,只要每个项满足指定的条件,它就会发出源发布服务器发出的项,然后在不满足此条件时立即完成。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,takeWhile不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<?> apply(Flowable<Object> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
    return e.takeWhile(new Predicate<Object>() {
      @Override
      public boolean test(Object v) throws Exception {
        return times.getAndIncrement() < 4;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.takeWhile(Functions.alwaysTrue());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void takeWhileNull() {
  just1.takeWhile(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).takeWhile(Functions.alwaysTrue())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhile2() {
  Flowable<String> w = Flowable.just("one", "two", "three");
  Flowable<String> take = w.takeWhile(new Predicate<String>() {
    int index;
    @Override
    public boolean test(String input) {
      return index++ < 2;
    }
  });
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  take.subscribe(subscriber);
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, times(1)).onNext("two");
  verify(subscriber, never()).onNext("three");
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhile1() {
  Flowable<Integer> w = Flowable.just(1, 2, 3);
  Flowable<Integer> take = w.takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer input) {
      return input < 3;
    }
  });
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  take.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(1);
  verify(subscriber, times(1)).onNext(2);
  verify(subscriber, never()).onNext(3);
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue1451Case2() {
  // https://github.com/Netflix/RxJava/issues/1451
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Flowable
        .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean value) {
            return value;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileToList() {
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Flowable
        .just(Boolean.TRUE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean v) {
            return v;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileProtectsPredicateCall() {
  TestFlowable source = new TestFlowable(mock(Subscription.class), "one");
  final RuntimeException testException = new RuntimeException("test exception");
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> take = Flowable.unsafeCreate(source)
      .takeWhile(new Predicate<String>() {
    @Override
    public boolean test(String s) {
      throw testException;
    }
  });
  take.subscribe(subscriber);
  // wait for the Flowable to complete
  try {
    source.t.join();
  } catch (Throwable e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
  verify(subscriber, never()).onNext(any(String.class));
  verify(subscriber, times(1)).onError(testException);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeAfterTake() {
  Subscription s = mock(Subscription.class);
  TestFlowable w = new TestFlowable(s, "one", "two", "three");
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> take = Flowable.unsafeCreate(w)
      .takeWhile(new Predicate<String>() {
    int index;
    @Override
    public boolean test(String s) {
      return index++ < 1;
    }
  });
  take.subscribe(subscriber);
  // wait for the Flowable to complete
  try {
    w.t.join();
  } catch (Throwable e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
  System.out.println("TestFlowable thread finished");
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, never()).onNext("two");
  verify(subscriber, never()).onNext("three");
  verify(s, times(1)).cancel();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue1451Case1() {
  // https://github.com/Netflix/RxJava/issues/1451
  final int expectedCount = 3;
  final AtomicInteger count = new AtomicInteger();
  for (int i = 0; i < expectedCount; i++) {
    Flowable
        .just(Boolean.TRUE, Boolean.FALSE)
        .takeWhile(new Predicate<Boolean>() {
          @Override
          public boolean test(Boolean value) {
            return value;
          }
        })
        .toList()
        .doOnSuccess(new Consumer<List<Boolean>>() {
          @Override
          public void accept(List<Boolean> booleans) {
            count.incrementAndGet();
          }
        })
        .subscribe();
  }
  assertEquals(expectedCount, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

.takeWhile(new Predicate<List<Integer>>() {
  @Override
  public boolean test(List<Integer> v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testOnBackpressureBuffer() {
  int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
  AtomicInteger c = new AtomicInteger();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  firehose(c).takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer t1) {
      return t1 < 100000;
    }
  })
  .onBackpressureBuffer()
  .observeOn(Schedulers.computation())
  .map(SLOW_PASS_THRU).take(num).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  System.out.println("testOnBackpressureBuffer => Received: " + ts.valueCount() + "  Emitted: " + c.get());
  assertEquals(num, ts.valueCount());
  // it buffers, so we should get the right value sequentially
  assertEquals(num - 1, ts.values().get(num - 1).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testErrorCauseIncludesLastValue() {
    TestSubscriber<String> ts = new TestSubscriber<String>();
    Flowable.just("abc").takeWhile(new Predicate<String>() {
      @Override
      public boolean test(String t1) {
        throw new TestException();
      }
    }).subscribe(ts);

    ts.assertTerminated();
    ts.assertNoValues();
    ts.assertError(TestException.class);
    // FIXME last cause value not recorded
//        assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
  }

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileDoesntLeakErrors() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
      @Override
      public void subscribe(Subscriber<? super String> subscriber) {
        subscriber.onSubscribe(new BooleanSubscription());
        subscriber.onNext("one");
        subscriber.onError(new TestException("test failed"));
      }
    });
    source.takeWhile(new Predicate<String>() {
      @Override
      public boolean test(String s) {
        return false;
      }
    }).blockingLast("");
    TestHelper.assertUndeliverable(errors, 0, TestException.class, "test failed");
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

.takeWhile(new Predicate<List<Integer>>() {
  @Override
  public boolean test(List<Integer> v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNoUnsubscribeDownstream() {
  Flowable<Integer> source = Flowable.range(1, 1000).takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer t1) {
      return t1 < 2;
    }
  });
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.subscribe(ts);
  ts.assertNoErrors();
  ts.assertValue(1);
  Assert.assertFalse("Unsubscribed!", ts.isCancelled());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure() {
  Flowable<Integer> source = Flowable.range(1, 1000).takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer t1) {
      return t1 < 100;
    }
  });
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>(5L);
  source.subscribe(ts);
  ts.assertNoErrors();
  ts.assertValues(1, 2, 3, 4, 5);
  ts.request(5);
  ts.assertNoErrors();
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

相关文章

Flowable类方法