本文整理了Java中io.reactivex.Flowable.delaySubscription()
方法的一些代码示例,展示了Flowable.delaySubscription()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.delaySubscription()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:delaySubscription
[英]Returns a Flowable that delays the subscription to the source Publisher by a given amount of time.
Backpressure: The operator doesn't interfere with the backpressure behavior which is determined by the source Publisher. Scheduler: This version of delaySubscription operates by default on the computation Scheduler.
[中]返回将对源发布服务器的订阅延迟给定时间的Flowable。
背压:操作员不会干扰由源发布者确定的背压行为。调度程序:默认情况下,此版本的delaySubscription在计算调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Integer> f) throws Exception {
return Flowable.just(1).delaySubscription(f);
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionSupplierNull() {
just1.delaySubscription((Publisher<Object>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionFunctionNull() {
just1.delaySubscription((Publisher<Object>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).delaySubscription(1, TimeUnit.MILLISECONDS)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionOtherNull() {
just1.delaySubscription((Flowable<Object>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedUnitNull() {
just1.delaySubscription(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedSchedulerNull() {
just1.delaySubscription(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void otherNull() {
Flowable.just(1).delaySubscription((Flowable<Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySubscription() {
Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
result.subscribe(subscriber);
inOrder.verify(subscriber, never()).onNext(any());
inOrder.verify(subscriber, never()).onComplete();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext(1);
inOrder.verify(subscriber, times(1)).onNext(2);
inOrder.verify(subscriber, times(1)).onNext(3);
inOrder.verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySubscriptionCancelBeforeTime() {
Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
result.subscribe(ts);
ts.dispose();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySubscriptionDisposeBeforeTime() {
Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
result.subscribe(ts);
ts.dispose();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void afterDelayNoInterrupt() {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
final TestSubscriber<Boolean> ts = TestSubscriber.create();
ts.withTag(s.getClass().getSimpleName());
Flowable.<Boolean>create(new FlowableOnSubscribe<Boolean>() {
@Override
public void subscribe(FlowableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(Thread.interrupted());
emitter.onComplete();
}
}, BackpressureStrategy.MISSING)
.delaySubscription(100, TimeUnit.MILLISECONDS, s)
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertValue(false);
}
} finally {
exec.shutdown();
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayAndTakeUntilNeverSubscribeToSource() {
PublishProcessor<Integer> delayUntil = PublishProcessor.create();
PublishProcessor<Integer> interrupt = PublishProcessor.create();
final AtomicBoolean subscribed = new AtomicBoolean(false);
Flowable.just(1)
.doOnSubscribe(new Consumer<Object>() {
@Override
public void accept(Object o) {
subscribed.set(true);
}
})
.delaySubscription(delayUntil)
.takeUntil(interrupt)
.subscribe();
interrupt.onNext(9000);
delayUntil.onNext(1);
Assert.assertFalse(subscribed.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayAndTakeUntilNeverSubscribeToSource() {
PublishProcessor<Integer> delayUntil = PublishProcessor.create();
PublishProcessor<Integer> interrupt = PublishProcessor.create();
final AtomicBoolean subscribed = new AtomicBoolean(false);
Flowable.just(1)
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
subscribed.set(true);
}
})
.delaySubscription(delayUntil)
.takeUntil(interrupt)
.subscribe();
interrupt.onNext(9000);
delayUntil.onNext(1);
Assert.assertFalse(subscribed.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompleteTriggersSubscription() {
PublishProcessor<Object> other = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Flowable.just(1)
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);
ts.assertNotComplete();
ts.assertNoErrors();
ts.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoPrematureSubscriptionToError() {
PublishProcessor<Object> other = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Flowable.<Integer>error(new TestException())
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);
ts.assertNotComplete();
ts.assertNoErrors();
ts.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
ts.assertNoValues();
ts.assertNotComplete();
ts.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoMultipleSubscriptions() {
PublishProcessor<Object> other = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Flowable.just(1)
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);
ts.assertNotComplete();
ts.assertNoErrors();
ts.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onNext(1);
other.onNext(2);
Assert.assertEquals("No subscription", 1, subscribed.get());
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierCompletes() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
// FIXME should this complete the source instead of consuming it?
pp.onComplete();
ts.assertValues(1, 2, 3, 4, 5);
ts.assertComplete();
ts.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierSimple() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
pp.onNext(1);
ts.assertValues(1, 2, 3, 4, 5);
ts.assertComplete();
ts.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierErrors() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
pp.onError(new TestException());
ts.assertNoValues();
ts.assertNotComplete();
ts.assertError(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!