io.reactivex.Flowable.interval()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(421)

本文整理了Java中io.reactivex.Flowable.interval()方法的一些代码示例,展示了Flowable.interval()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.interval()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:interval

Flowable.interval介绍

[英]Returns a Flowable that emits a 0L after the initialDelay and ever-increasing numbers after each period of time thereafter.

Backpressure: The operator generates values based on time and ignores downstream backpressure which may lead to MissingBackpressureException at some point in the chain. Consumers should consider applying one of the onBackpressureXXX operators as well. Scheduler: interval operates by default on the computation Scheduler.
[中]返回在initialDelay之后发出0L的可流动数据,并在此后的每个时间段后不断增加数字。
背压:操作员根据时间生成值,并忽略下游背压,这可能导致链中某个点缺少背压异常。消费者也应该考虑应用一个OnPress Prxuxxx运营商。调度程序:默认情况下,interval在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Long> apply(Long t1) {
    return Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
  }
};

代码示例来源:origin: ReactiveX/RxJava

public void intervalSchedulerNull() {
  Flowable.interval(1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalPeriodUnitNull() {
  Flowable.interval(1, 1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalPeriodSchedulerNull() {
  Flowable.interval(1, 1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalUnitNull() {
  Flowable.interval(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Long> createPublisher(long elements) {
    return
      Flowable.interval(0, 1, TimeUnit.MILLISECONDS).take(elements)
      .onBackpressureBuffer()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Disposable> apply(Integer count) {
    return Flowable
        .interval(1, TimeUnit.MICROSECONDS)
        .map(new Function<Long, Disposable>() {
          @Override
          public Disposable apply(Long ount1) {
            return trampolineWorker.schedule(Functions.EMPTY_RUNNABLE);
          }
        }).take(100);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithBoundaryTake2() {
  Flowable<Long> boundary = Flowable.interval(60, 60, TimeUnit.MILLISECONDS, scheduler);
  Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Flowable<List<Long>> result = source.buffer(boundary).take(2);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  result.subscribe(subscriber);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(subscriber).onNext(Arrays.asList(0L));
  inOrder.verify(subscriber).onNext(Arrays.asList(1L));
  inOrder.verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithTimeSkipTake2() {
  Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Flowable<List<Long>> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  result.subscribe(subscriber);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(subscriber).onNext(Arrays.asList(0L, 1L));
  inOrder.verify(subscriber).onNext(Arrays.asList(1L, 2L));
  inOrder.verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bufferWithTimeAndSize() {
  Flowable<Long> source = Flowable.interval(30, 30, TimeUnit.MILLISECONDS, scheduler);
  Flowable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler, 2).take(3);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  result.subscribe(subscriber);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(subscriber).onNext(Arrays.asList(0L, 1L));
  inOrder.verify(subscriber).onNext(Arrays.asList(2L));
  inOrder.verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithTimeTake1() {
  Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Flowable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  result.subscribe(subscriber);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  verify(subscriber).onNext(Arrays.asList(0L, 1L));
  verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Long> source = Flowable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  // only 9 because take(10) will immediately call onComplete when receiving the 10th item
  // which onComplete will overwrite the previous value
  for (int i = 0; i < 10; i++) {
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(Long.valueOf(i), it.next());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
  // choose an asynchronous source
  Flowable<Long> source = Flowable.interval(10, TimeUnit.MILLISECONDS).take(1);
  // apply groupBy to the source
  Flowable<GroupedFlowable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
  // create two observers
  Subscriber<GroupedFlowable<Boolean, Long>> f1 = TestHelper.mockSubscriber();
  Subscriber<GroupedFlowable<Boolean, Long>> f2 = TestHelper.mockSubscriber();
  // subscribe with the observers
  stream.subscribe(f1);
  stream.subscribe(f2);
  // check that subscriptions were successful
  verify(f1, never()).onError(Mockito.<Throwable> any());
  verify(f2, never()).onError(Mockito.<Throwable> any());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Long> source = Flowable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  for (int j = 0; j < 3; j++) {
    Iterator<Long> it = iter.iterator();
    // only 9 because take(10) will immediately call onComplete when receiving the 10th item
    // which onComplete will overwrite the previous value
    for (int i = 0; i < 9; i++) {
      scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(Long.valueOf(i), it.next());
    }
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(false, it.hasNext());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<String> apply(final String s) {
    return Flowable.just(s)
        .mergeWith(Flowable.interval(10, TimeUnit.MILLISECONDS)
        .map(new Function<Long, String>() {
          @Override
          public String apply(Long i) {
            return s + " " + i;
          }
        })).take(250);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void cancel() {
  Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
  .take(10)
  .test()
  .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
  Flowable<Long> f = Flowable.interval(250, TimeUnit.MILLISECONDS);
  PublishProcessor<Integer> terminal = PublishProcessor.create();
  Flowable<Long> source = f.takeUntil(terminal);
  Iterable<Long> iter = source.blockingNext();
  for (int j = 0; j < 3; j++) {
    BlockingFlowableNext.NextIterator<Long> it = (BlockingFlowableNext.NextIterator<Long>)iter.iterator();
    for (long i = 0; i < 10; i++) {
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
    }
    terminal.onNext(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 10000)
public void testInitialRequestsDontOverflow() {
  TestSubscriber<Long> ts = new TestSubscriber<Long>(0L);
  Flowable.switchOnNext(
      Flowable.interval(100, TimeUnit.MILLISECONDS)
          .map(new Function<Long, Flowable<Long>>() {
            @Override
            public Flowable<Long> apply(Long t) {
              return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)).hide();
            }
          }).take(3)).subscribe(ts);
  ts.request(Long.MAX_VALUE - 1);
  ts.request(2);
  ts.awaitTerminalEvent();
  assertTrue(ts.valueCount() > 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConnectWithNoSubscriber() {
  TestScheduler scheduler = new TestScheduler();
  ConnectableFlowable<Long> cf = Flowable.interval(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
  cf.connect();
  // Emit 0
  scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
  TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
  cf.subscribe(subscriber);
  // Emit 1 and 2
  scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
  subscriber.assertValues(1L, 2L);
  subscriber.assertNoErrors();
  subscriber.assertTerminated();
}

相关文章

Flowable类方法