本文整理了Java中io.reactivex.Flowable.interval()
方法的一些代码示例,展示了Flowable.interval()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.interval()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:interval
[英]Returns a Flowable that emits a 0L after the initialDelay and ever-increasing numbers after each period of time thereafter.
Backpressure: The operator generates values based on time and ignores downstream backpressure which may lead to MissingBackpressureException at some point in the chain. Consumers should consider applying one of the onBackpressureXXX operators as well. Scheduler: interval operates by default on the computation Scheduler.
[中]返回在initialDelay之后发出0L的可流动数据,并在此后的每个时间段后不断增加数字。
背压:操作员根据时间生成值,并忽略下游背压,这可能导致链中某个点缺少背压异常。消费者也应该考虑应用一个OnPress Prxuxxx运营商。调度程序:默认情况下,interval在计算调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Long> apply(Long t1) {
return Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
}
};
代码示例来源:origin: ReactiveX/RxJava
public void intervalSchedulerNull() {
Flowable.interval(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalPeriodUnitNull() {
Flowable.interval(1, 1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalPeriodSchedulerNull() {
Flowable.interval(1, 1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalUnitNull() {
Flowable.interval(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.interval(0, 1, TimeUnit.MILLISECONDS).take(elements)
.onBackpressureBuffer()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Disposable> apply(Integer count) {
return Flowable
.interval(1, TimeUnit.MICROSECONDS)
.map(new Function<Long, Disposable>() {
@Override
public Disposable apply(Long ount1) {
return trampolineWorker.schedule(Functions.EMPTY_RUNNABLE);
}
}).take(100);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithBoundaryTake2() {
Flowable<Long> boundary = Flowable.interval(60, 60, TimeUnit.MILLISECONDS, scheduler);
Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Flowable<List<Long>> result = source.buffer(boundary).take(2);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
result.subscribe(subscriber);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(subscriber).onNext(Arrays.asList(0L));
inOrder.verify(subscriber).onNext(Arrays.asList(1L));
inOrder.verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithTimeSkipTake2() {
Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Flowable<List<Long>> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
result.subscribe(subscriber);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(subscriber).onNext(Arrays.asList(0L, 1L));
inOrder.verify(subscriber).onNext(Arrays.asList(1L, 2L));
inOrder.verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void bufferWithTimeAndSize() {
Flowable<Long> source = Flowable.interval(30, 30, TimeUnit.MILLISECONDS, scheduler);
Flowable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler, 2).take(3);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
result.subscribe(subscriber);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(subscriber).onNext(Arrays.asList(0L, 1L));
inOrder.verify(subscriber).onNext(Arrays.asList(2L));
inOrder.verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithTimeTake1() {
Flowable<Long> source = Flowable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Flowable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
verify(subscriber).onNext(Arrays.asList(0L, 1L));
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
TestScheduler scheduler = new TestScheduler();
Flowable<Long> source = Flowable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 10; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(Long.valueOf(i), it.next());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
// choose an asynchronous source
Flowable<Long> source = Flowable.interval(10, TimeUnit.MILLISECONDS).take(1);
// apply groupBy to the source
Flowable<GroupedFlowable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
// create two observers
Subscriber<GroupedFlowable<Boolean, Long>> f1 = TestHelper.mockSubscriber();
Subscriber<GroupedFlowable<Boolean, Long>> f2 = TestHelper.mockSubscriber();
// subscribe with the observers
stream.subscribe(f1);
stream.subscribe(f2);
// check that subscriptions were successful
verify(f1, never()).onError(Mockito.<Throwable> any());
verify(f2, never()).onError(Mockito.<Throwable> any());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
TestScheduler scheduler = new TestScheduler();
Flowable<Long> source = Flowable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
for (int j = 0; j < 3; j++) {
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<String> apply(final String s) {
return Flowable.just(s)
.mergeWith(Flowable.interval(10, TimeUnit.MILLISECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long i) {
return s + " " + i;
}
})).take(250);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void cancel() {
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
代码示例来源:origin: ReactiveX/RxJava
@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
Flowable<Long> f = Flowable.interval(250, TimeUnit.MILLISECONDS);
PublishProcessor<Integer> terminal = PublishProcessor.create();
Flowable<Long> source = f.takeUntil(terminal);
Iterable<Long> iter = source.blockingNext();
for (int j = 0; j < 3; j++) {
BlockingFlowableNext.NextIterator<Long> it = (BlockingFlowableNext.NextIterator<Long>)iter.iterator();
for (long i = 0; i < 10; i++) {
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
}
terminal.onNext(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 10000)
public void testInitialRequestsDontOverflow() {
TestSubscriber<Long> ts = new TestSubscriber<Long>(0L);
Flowable.switchOnNext(
Flowable.interval(100, TimeUnit.MILLISECONDS)
.map(new Function<Long, Flowable<Long>>() {
@Override
public Flowable<Long> apply(Long t) {
return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)).hide();
}
}).take(3)).subscribe(ts);
ts.request(Long.MAX_VALUE - 1);
ts.request(2);
ts.awaitTerminalEvent();
assertTrue(ts.valueCount() > 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConnectWithNoSubscriber() {
TestScheduler scheduler = new TestScheduler();
ConnectableFlowable<Long> cf = Flowable.interval(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
cf.connect();
// Emit 0
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
cf.subscribe(subscriber);
// Emit 1 and 2
scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
subscriber.assertValues(1L, 2L);
subscriber.assertNoErrors();
subscriber.assertTerminated();
}
内容来源于网络,如有侵权,请联系作者删除!