io.reactivex.Observable.delaySubscription()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中io.reactivex.Observable.delaySubscription()方法的一些代码示例,展示了Observable.delaySubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.delaySubscription()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:delaySubscription

Observable.delaySubscription介绍

[英]Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time.

Scheduler: This version of delaySubscription operates by default on the computation Scheduler.
[中]返回一个Observable,它将源ObservableSource的订阅延迟给定的时间。
调度程序:默认情况下,此版本的delaySubscription在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Observable<Integer> o) throws Exception {
  3. return Observable.just(1).delaySubscription(o);
  4. }
  5. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void delaySubscriptionTimedUnitNull() {
  3. just1.delaySubscription(1, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void delaySubscriptionFunctionNull() {
  3. just1.delaySubscription((Observable<Object>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void delaySubscriptionTimedSchedulerNull() {
  3. just1.delaySubscription(1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void delaySubscriptionOtherNull() {
  3. just1.delaySubscription((Observable<Object>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>This version of {@code delaySubscription} operates by default on the {@code computation} {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param delay
  11. * the time to delay the subscription
  12. * @param unit
  13. * the time unit of {@code delay}
  14. * @return an Observable that delays the subscription to the source ObservableSource by the given amount
  15. * @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
  16. */
  17. @CheckReturnValue
  18. @SchedulerSupport(SchedulerSupport.COMPUTATION)
  19. public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
  20. return delaySubscription(delay, unit, Schedulers.computation());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time,
  3. * both waiting and subscribing on a given Scheduler.
  4. * <p>
  5. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.s.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>You specify which {@link Scheduler} this operator will use.</dd>
  9. * </dl>
  10. *
  11. * @param delay
  12. * the time to delay the subscription
  13. * @param unit
  14. * the time unit of {@code delay}
  15. * @param scheduler
  16. * the Scheduler on which the waiting and subscription will happen
  17. * @return an Observable that delays the subscription to the source ObservableSource by a given
  18. * amount, waiting and subscribing on the given Scheduler
  19. * @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.CUSTOM)
  23. public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
  24. return delaySubscription(timer(delay, unit, scheduler));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. public final <U, V> Observable<T> delay(ObservableSource<U> subscriptionDelay,
  2. Function<? super T, ? extends ObservableSource<V>> itemDelay) {
  3. return delaySubscription(subscriptionDelay).delay(itemDelay);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySubscription() {
  3. Observable<Integer> result = Observable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
  4. Observer<Object> o = TestHelper.mockObserver();
  5. InOrder inOrder = inOrder(o);
  6. result.subscribe(o);
  7. inOrder.verify(o, never()).onNext(any());
  8. inOrder.verify(o, never()).onComplete();
  9. scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  10. inOrder.verify(o, times(1)).onNext(1);
  11. inOrder.verify(o, times(1)).onNext(2);
  12. inOrder.verify(o, times(1)).onNext(3);
  13. inOrder.verify(o, times(1)).onComplete();
  14. verify(o, never()).onError(any(Throwable.class));
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySubscriptionDisposeBeforeTime() {
  3. Observable<Integer> result = Observable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
  4. Observer<Object> o = TestHelper.mockObserver();
  5. TestObserver<Object> to = new TestObserver<Object>(o);
  6. result.subscribe(to);
  7. to.dispose();
  8. scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  9. verify(o, never()).onNext(any());
  10. verify(o, never()).onComplete();
  11. verify(o, never()).onError(any(Throwable.class));
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void afterDelayNoInterrupt() {
  3. ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  4. try {
  5. for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
  6. final TestObserver<Boolean> observer = TestObserver.create();
  7. observer.withTag(s.getClass().getSimpleName());
  8. Observable.<Boolean>create(new ObservableOnSubscribe<Boolean>() {
  9. @Override
  10. public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
  11. emitter.onNext(Thread.interrupted());
  12. emitter.onComplete();
  13. }
  14. })
  15. .delaySubscription(100, TimeUnit.MILLISECONDS, s)
  16. .subscribe(observer);
  17. observer.awaitTerminalEvent();
  18. observer.assertValue(false);
  19. }
  20. } finally {
  21. exec.shutdown();
  22. }
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCompleteTriggersSubscription() {
  3. PublishSubject<Object> other = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Observable.just(1)
  7. .doOnSubscribe(new Consumer<Disposable>() {
  8. @Override
  9. public void accept(Disposable d) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(to);
  15. to.assertNotComplete();
  16. to.assertNoErrors();
  17. to.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onComplete();
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. to.assertValue(1);
  22. to.assertNoErrors();
  23. to.assertComplete();
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoPrematureSubscription() {
  3. PublishSubject<Object> other = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Observable.just(1)
  7. .doOnSubscribe(new Consumer<Disposable>() {
  8. @Override
  9. public void accept(Disposable d) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(to);
  15. to.assertNotComplete();
  16. to.assertNoErrors();
  17. to.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onNext(1);
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. to.assertValue(1);
  22. to.assertNoErrors();
  23. to.assertComplete();
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoMultipleSubscriptions() {
  3. PublishSubject<Object> other = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Observable.just(1)
  7. .doOnSubscribe(new Consumer<Disposable>() {
  8. @Override
  9. public void accept(Disposable d) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(to);
  15. to.assertNotComplete();
  16. to.assertNoErrors();
  17. to.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onNext(1);
  20. other.onNext(2);
  21. Assert.assertEquals("No subscription", 1, subscribed.get());
  22. to.assertValue(1);
  23. to.assertNoErrors();
  24. to.assertComplete();
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureWithSubscriptionTimedDelay() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, Flowable.bufferSize() * 2)
  5. .delaySubscription(100, TimeUnit.MILLISECONDS)
  6. .delay(100, TimeUnit.MILLISECONDS)
  7. .observeOn(Schedulers.computation())
  8. .map(new Function<Integer, Integer>() {
  9. int c;
  10. @Override
  11. public Integer apply(Integer t) {
  12. if (c++ <= 0) {
  13. try {
  14. Thread.sleep(500);
  15. } catch (InterruptedException e) {
  16. }
  17. }
  18. return t;
  19. }
  20. }).subscribe(to);
  21. to.awaitTerminalEvent();
  22. to.assertNoErrors();
  23. assertEquals(Flowable.bufferSize() * 2, to.valueCount());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoPrematureSubscriptionToError() {
  3. PublishSubject<Object> other = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Observable.<Integer>error(new TestException())
  7. .doOnSubscribe(new Consumer<Disposable>() {
  8. @Override
  9. public void accept(Disposable d) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(to);
  15. to.assertNotComplete();
  16. to.assertNoErrors();
  17. to.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onComplete();
  20. Assert.assertEquals("No subscription", 1, subscribed.get());
  21. to.assertNoValues();
  22. to.assertNotComplete();
  23. to.assertError(TestException.class);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierSimple() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. Observable<Integer> source = Observable.range(1, 5);
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. source.delaySubscription(ps).subscribe(to);
  7. to.assertNoValues();
  8. to.assertNoErrors();
  9. to.assertNotComplete();
  10. ps.onNext(1);
  11. to.assertValues(1, 2, 3, 4, 5);
  12. to.assertComplete();
  13. to.assertNoErrors();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNoSubscriptionIfOtherErrors() {
  3. PublishSubject<Object> other = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. final AtomicInteger subscribed = new AtomicInteger();
  6. Observable.<Integer>error(new TestException())
  7. .doOnSubscribe(new Consumer<Disposable>() {
  8. @Override
  9. public void accept(Disposable d) {
  10. subscribed.getAndIncrement();
  11. }
  12. })
  13. .delaySubscription(other)
  14. .subscribe(to);
  15. to.assertNotComplete();
  16. to.assertNoErrors();
  17. to.assertNoValues();
  18. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  19. other.onError(new TestException());
  20. Assert.assertEquals("Premature subscription", 0, subscribed.get());
  21. to.assertNoValues();
  22. to.assertNotComplete();
  23. to.assertError(TestException.class);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierCompletes() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. Observable<Integer> source = Observable.range(1, 5);
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. source.delaySubscription(ps).subscribe(to);
  7. to.assertNoValues();
  8. to.assertNoErrors();
  9. to.assertNotComplete();
  10. // FIXME should this complete the source instead of consuming it?
  11. ps.onComplete();
  12. to.assertValues(1, 2, 3, 4, 5);
  13. to.assertComplete();
  14. to.assertNoErrors();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelaySupplierErrors() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. Observable<Integer> source = Observable.range(1, 5);
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. source.delaySubscription(ps).subscribe(to);
  7. to.assertNoValues();
  8. to.assertNoErrors();
  9. to.assertNotComplete();
  10. ps.onError(new TestException());
  11. to.assertNoValues();
  12. to.assertNotComplete();
  13. to.assertError(TestException.class);
  14. }

相关文章

Observable类方法