io.reactivex.Flowable.repeat()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(226)

本文整理了Java中io.reactivex.Flowable.repeat()方法的一些代码示例,展示了Flowable.repeat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.repeat()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:repeat

Flowable.repeat介绍

[英]Returns a Flowable that repeats the sequence of items emitted by the source Publisher indefinitely.

Backpressure: The operator honors downstream backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: repeat does not operate by default on a particular Scheduler.
[中]返回无限期重复源发布服务器发出的项序列的可流动项。
背压:操作员接受下游背压,并希望源发布者也接受背压。如果违反此期望,运算符可能抛出一个非法状态异常。计划程序:默认情况下,重复不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return Flowable.just(1).repeat(elements);
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatOne() {
  3. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  4. Flowable.just(1).repeat(1).subscribe(subscriber);
  5. verify(subscriber).onComplete();
  6. verify(subscriber, times(1)).onNext(any());
  7. verify(subscriber, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatError() {
  3. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  4. Flowable.error(new TestException()).repeat(10).subscribe(subscriber);
  5. verify(subscriber).onError(any(TestException.class));
  6. verify(subscriber, never()).onNext(any());
  7. verify(subscriber, never()).onComplete();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatZero() {
  3. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  4. Flowable.just(1).repeat(0).subscribe(subscriber);
  5. verify(subscriber).onComplete();
  6. verify(subscriber, never()).onNext(any());
  7. verify(subscriber, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatLimited() {
  3. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  4. Flowable.just(1).repeat(10).subscribe(subscriber);
  5. verify(subscriber, times(10)).onNext(1);
  6. verify(subscriber).onComplete();
  7. verify(subscriber, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatAndTake() {
  3. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  4. Flowable.just(1).repeat().take(10).subscribe(subscriber);
  5. verify(subscriber, times(10)).onNext(1);
  6. verify(subscriber).onComplete();
  7. verify(subscriber, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void repeatLongPredicateInvalid() {
  3. try {
  4. Flowable.just(1).repeat(-99);
  5. fail("Should have thrown");
  6. } catch (IllegalArgumentException ex) {
  7. assertEquals("times >= 0 required but it was -99", ex.getMessage());
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void bufferWithSizeSkipTake1() {
  3. Flowable<Integer> source = Flowable.just(1).repeat();
  4. Flowable<List<Integer>> result = source.buffer(2, 3).take(1);
  5. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber).onNext(Arrays.asList(1, 1));
  8. verify(subscriber).onComplete();
  9. verify(subscriber, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void bufferWithSizeTake1() {
  3. Flowable<Integer> source = Flowable.just(1).repeat();
  4. Flowable<List<Integer>> result = source.buffer(2).take(1);
  5. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber).onNext(Arrays.asList(1, 1));
  8. verify(subscriber).onComplete();
  9. verify(subscriber, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRepeat() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
  5. @Override
  6. public void run() throws Exception {
  7. counter.getAndIncrement();
  8. }
  9. });
  10. source.repeat(5)
  11. .test()
  12. .assertResult(1, 1, 1, 1, 1);
  13. assertEquals(0, counter.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 5000)
  2. public void concatObservableMany() {
  3. Completable c = Completable.concat(Flowable.just(normal.completable).repeat(3));
  4. c.blockingAwait();
  5. normal.assertSubscriptions(3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 5000)
  2. public void mergeObservableMany() {
  3. Completable c = Completable.merge(Flowable.just(normal.completable).repeat(3));
  4. c.blockingAwait();
  5. normal.assertSubscriptions(3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 5000)
  2. public void mergeDelayErrorObservableMany() {
  3. Completable c = Completable.mergeDelayError(Flowable.just(normal.completable).repeat(3));
  4. c.blockingAwait();
  5. normal.assertSubscriptions(3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatTake() {
  3. Flowable<Integer> xs = Flowable.just(1, 2);
  4. Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  5. assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepetition() {
  3. int num = 10;
  4. final AtomicInteger count = new AtomicInteger();
  5. int value = Flowable.unsafeCreate(new Publisher<Integer>() {
  6. @Override
  7. public void subscribe(final Subscriber<? super Integer> subscriber) {
  8. subscriber.onNext(count.incrementAndGet());
  9. subscriber.onComplete();
  10. }
  11. }).repeat().subscribeOn(Schedulers.computation())
  12. .take(num).blockingLast();
  13. assertEquals(num, value);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 20000)
  2. public void testNoStackOverFlow() {
  3. Flowable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testRepeatObservableThrowsError() {
  3. TestSubscriber<String> subscriber = TestSubscriber.create();
  4. Single<String> single = Flowable.just("First", "Second").repeat().single("");
  5. single.toFlowable().subscribe(subscriber);
  6. subscriber.assertError(IllegalArgumentException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. /** Issue #2587. */
  2. @Test
  3. public void testRepeatAndDistinctUnbounded() {
  4. Flowable<Integer> src = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
  5. .take(3)
  6. .repeat(3)
  7. .distinct();
  8. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  9. src.subscribe(ts);
  10. ts.assertNoErrors();
  11. ts.assertTerminated();
  12. ts.assertValues(1, 2, 3);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 1000)
  2. public void testRaceConditions() {
  3. Scheduler comp = Schedulers.computation();
  4. Scheduler limited = comp.when(new Function<Flowable<Flowable<Completable>>, Completable>() {
  5. @Override
  6. public Completable apply(Flowable<Flowable<Completable>> t) {
  7. return Completable.merge(Flowable.merge(t, 10));
  8. }
  9. });
  10. merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void repeatScheduled() {
  3. TestSubscriber<Integer> ts = TestSubscriber.create();
  4. Flowable.just(1).subscribeOn(Schedulers.computation()).repeat(5).subscribe(ts);
  5. ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  6. ts.assertValues(1, 1, 1, 1, 1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

相关文章

Flowable类方法