本文整理了Java中io.reactivex.Observable.delaySubscription()
方法的一些代码示例,展示了Observable.delaySubscription()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.delaySubscription()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:delaySubscription
[英]Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time.
Scheduler: This version of delaySubscription operates by default on the computation Scheduler.
[中]返回一个Observable,它将源ObservableSource的订阅延迟给定的时间。
调度程序:默认情况下,此版本的delaySubscription在计算调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Integer> o) throws Exception {
return Observable.just(1).delaySubscription(o);
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedUnitNull() {
just1.delaySubscription(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionFunctionNull() {
just1.delaySubscription((Observable<Object>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedSchedulerNull() {
just1.delaySubscription(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void delaySubscriptionOtherNull() {
just1.delaySubscription((Observable<Object>)null);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delaySubscription} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay
* the time to delay the subscription
* @param unit
* the time unit of {@code delay}
* @return an Observable that delays the subscription to the source ObservableSource by the given amount
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
return delaySubscription(delay, unit, Schedulers.computation());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time,
* both waiting and subscribing on a given Scheduler.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param delay
* the time to delay the subscription
* @param unit
* the time unit of {@code delay}
* @param scheduler
* the Scheduler on which the waiting and subscription will happen
* @return an Observable that delays the subscription to the source ObservableSource by a given
* amount, waiting and subscribing on the given Scheduler
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return delaySubscription(timer(delay, unit, scheduler));
}
代码示例来源:origin: ReactiveX/RxJava
public final <U, V> Observable<T> delay(ObservableSource<U> subscriptionDelay,
Function<? super T, ? extends ObservableSource<V>> itemDelay) {
return delaySubscription(subscriptionDelay).delay(itemDelay);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySubscription() {
Observable<Integer> result = Observable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
result.subscribe(o);
inOrder.verify(o, never()).onNext(any());
inOrder.verify(o, never()).onComplete();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(o, times(1)).onNext(1);
inOrder.verify(o, times(1)).onNext(2);
inOrder.verify(o, times(1)).onNext(3);
inOrder.verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySubscriptionDisposeBeforeTime() {
Observable<Integer> result = Observable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<Object>(o);
result.subscribe(to);
to.dispose();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void afterDelayNoInterrupt() {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
final TestObserver<Boolean> observer = TestObserver.create();
observer.withTag(s.getClass().getSimpleName());
Observable.<Boolean>create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(Thread.interrupted());
emitter.onComplete();
}
})
.delaySubscription(100, TimeUnit.MILLISECONDS, s)
.subscribe(observer);
observer.awaitTerminalEvent();
observer.assertValue(false);
}
} finally {
exec.shutdown();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompleteTriggersSubscription() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoPrematureSubscription() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onNext(1);
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoMultipleSubscriptions() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onNext(1);
other.onNext(2);
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithSubscriptionTimedDelay() {
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, Flowable.bufferSize() * 2)
.delaySubscription(100, TimeUnit.MILLISECONDS)
.delay(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
int c;
@Override
public Integer apply(Integer t) {
if (c++ <= 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
return t;
}
}).subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
assertEquals(Flowable.bufferSize() * 2, to.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoPrematureSubscriptionToError() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.<Integer>error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertNoValues();
to.assertNotComplete();
to.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierSimple() {
final PublishSubject<Integer> ps = PublishSubject.create();
Observable<Integer> source = Observable.range(1, 5);
TestObserver<Integer> to = new TestObserver<Integer>();
source.delaySubscription(ps).subscribe(to);
to.assertNoValues();
to.assertNoErrors();
to.assertNotComplete();
ps.onNext(1);
to.assertValues(1, 2, 3, 4, 5);
to.assertComplete();
to.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoSubscriptionIfOtherErrors() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.<Integer>error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onError(new TestException());
Assert.assertEquals("Premature subscription", 0, subscribed.get());
to.assertNoValues();
to.assertNotComplete();
to.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierCompletes() {
final PublishSubject<Integer> ps = PublishSubject.create();
Observable<Integer> source = Observable.range(1, 5);
TestObserver<Integer> to = new TestObserver<Integer>();
source.delaySubscription(ps).subscribe(to);
to.assertNoValues();
to.assertNoErrors();
to.assertNotComplete();
// FIXME should this complete the source instead of consuming it?
ps.onComplete();
to.assertValues(1, 2, 3, 4, 5);
to.assertComplete();
to.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierErrors() {
final PublishSubject<Integer> ps = PublishSubject.create();
Observable<Integer> source = Observable.range(1, 5);
TestObserver<Integer> to = new TestObserver<Integer>();
source.delaySubscription(ps).subscribe(to);
to.assertNoValues();
to.assertNoErrors();
to.assertNotComplete();
ps.onError(new TestException());
to.assertNoValues();
to.assertNotComplete();
to.assertError(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!