io.reactivex.Flowable.delaySubscription()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中io.reactivex.Flowable.delaySubscription()方法的一些代码示例,展示了Flowable.delaySubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.delaySubscription()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:delaySubscription

Flowable.delaySubscription介绍

[英]Returns a Flowable that delays the subscription to the source Publisher by a given amount of time.

Backpressure: The operator doesn't interfere with the backpressure behavior which is determined by the source Publisher. Scheduler: This version of delaySubscription operates by default on the computation Scheduler.
[中]返回将对源发布服务器的订阅延迟给定时间的Flowable。
背压:操作员不会干扰由源发布者确定的背压行为。调度程序:默认情况下,此版本的delaySubscription在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return Flowable.just(1).delaySubscription(f);
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void delaySubscriptionSupplierNull() {
  just1.delaySubscription((Publisher<Object>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void delaySubscriptionFunctionNull() {
  just1.delaySubscription((Publisher<Object>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).delaySubscription(1, TimeUnit.MILLISECONDS)
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void delaySubscriptionOtherNull() {
  just1.delaySubscription((Flowable<Object>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedUnitNull() {
  just1.delaySubscription(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void delaySubscriptionTimedSchedulerNull() {
  just1.delaySubscription(1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void otherNull() {
  Flowable.just(1).delaySubscription((Flowable<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySubscription() {
  Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  result.subscribe(subscriber);
  inOrder.verify(subscriber, never()).onNext(any());
  inOrder.verify(subscriber, never()).onComplete();
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  inOrder.verify(subscriber, times(1)).onNext(1);
  inOrder.verify(subscriber, times(1)).onNext(2);
  inOrder.verify(subscriber, times(1)).onNext(3);
  inOrder.verify(subscriber, times(1)).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySubscriptionCancelBeforeTime() {
  Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  result.subscribe(ts);
  ts.dispose();
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  verify(subscriber, never()).onNext(any());
  verify(subscriber, never()).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySubscriptionDisposeBeforeTime() {
  Flowable<Integer> result = Flowable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  result.subscribe(ts);
  ts.dispose();
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  verify(subscriber, never()).onNext(any());
  verify(subscriber, never()).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void afterDelayNoInterrupt() {
    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
    try {
      for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
        final TestSubscriber<Boolean> ts = TestSubscriber.create();
        ts.withTag(s.getClass().getSimpleName());

        Flowable.<Boolean>create(new FlowableOnSubscribe<Boolean>() {
          @Override
          public void subscribe(FlowableEmitter<Boolean> emitter) throws Exception {
           emitter.onNext(Thread.interrupted());
           emitter.onComplete();
          }
        }, BackpressureStrategy.MISSING)
        .delaySubscription(100, TimeUnit.MILLISECONDS, s)
        .subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertValue(false);
      }
    } finally {
      exec.shutdown();
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayAndTakeUntilNeverSubscribeToSource() {
  PublishProcessor<Integer> delayUntil = PublishProcessor.create();
  PublishProcessor<Integer> interrupt = PublishProcessor.create();
  final AtomicBoolean subscribed = new AtomicBoolean(false);
  Flowable.just(1)
  .doOnSubscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) {
      subscribed.set(true);
    }
  })
  .delaySubscription(delayUntil)
  .takeUntil(interrupt)
  .subscribe();
  interrupt.onNext(9000);
  delayUntil.onNext(1);
  Assert.assertFalse(subscribed.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayAndTakeUntilNeverSubscribeToSource() {
  PublishProcessor<Integer> delayUntil = PublishProcessor.create();
  PublishProcessor<Integer> interrupt = PublishProcessor.create();
  final AtomicBoolean subscribed = new AtomicBoolean(false);
  Flowable.just(1)
  .doOnSubscribe(new Consumer<Subscription>() {
    @Override
    public void accept(Subscription s) {
      subscribed.set(true);
    }
  })
  .delaySubscription(delayUntil)
  .takeUntil(interrupt)
  .subscribe();
  interrupt.onNext(9000);
  delayUntil.onNext(1);
  Assert.assertFalse(subscribed.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCompleteTriggersSubscription() {
  PublishProcessor<Object> other = PublishProcessor.create();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final AtomicInteger subscribed = new AtomicInteger();
  Flowable.just(1)
  .doOnSubscribe(new Consumer<Subscription>() {
    @Override
    public void accept(Subscription s) {
      subscribed.getAndIncrement();
    }
  })
  .delaySubscription(other)
  .subscribe(ts);
  ts.assertNotComplete();
  ts.assertNoErrors();
  ts.assertNoValues();
  Assert.assertEquals("Premature subscription", 0, subscribed.get());
  other.onComplete();
  Assert.assertEquals("No subscription", 1, subscribed.get());
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNoPrematureSubscriptionToError() {
  PublishProcessor<Object> other = PublishProcessor.create();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final AtomicInteger subscribed = new AtomicInteger();
  Flowable.<Integer>error(new TestException())
  .doOnSubscribe(new Consumer<Subscription>() {
    @Override
    public void accept(Subscription s) {
      subscribed.getAndIncrement();
    }
  })
  .delaySubscription(other)
  .subscribe(ts);
  ts.assertNotComplete();
  ts.assertNoErrors();
  ts.assertNoValues();
  Assert.assertEquals("Premature subscription", 0, subscribed.get());
  other.onComplete();
  Assert.assertEquals("No subscription", 1, subscribed.get());
  ts.assertNoValues();
  ts.assertNotComplete();
  ts.assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNoMultipleSubscriptions() {
  PublishProcessor<Object> other = PublishProcessor.create();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final AtomicInteger subscribed = new AtomicInteger();
  Flowable.just(1)
  .doOnSubscribe(new Consumer<Subscription>() {
    @Override
    public void accept(Subscription s) {
      subscribed.getAndIncrement();
    }
  })
  .delaySubscription(other)
  .subscribe(ts);
  ts.assertNotComplete();
  ts.assertNoErrors();
  ts.assertNoValues();
  Assert.assertEquals("Premature subscription", 0, subscribed.get());
  other.onNext(1);
  other.onNext(2);
  Assert.assertEquals("No subscription", 1, subscribed.get());
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySupplierCompletes() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  Flowable<Integer> source = Flowable.range(1, 5);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
    @Override
    public Publisher<Integer> call() {
      return pp;
    }
  })).subscribe(ts);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
  // FIXME should this complete the source instead of consuming it?
  pp.onComplete();
  ts.assertValues(1, 2, 3, 4, 5);
  ts.assertComplete();
  ts.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySupplierSimple() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  Flowable<Integer> source = Flowable.range(1, 5);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
    @Override
    public Publisher<Integer> call() {
      return pp;
    }
  })).subscribe(ts);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
  pp.onNext(1);
  ts.assertValues(1, 2, 3, 4, 5);
  ts.assertComplete();
  ts.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelaySupplierErrors() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  Flowable<Integer> source = Flowable.range(1, 5);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
    @Override
    public Publisher<Integer> call() {
      return pp;
    }
  })).subscribe(ts);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
  pp.onError(new TestException());
  ts.assertNoValues();
  ts.assertNotComplete();
  ts.assertError(TestException.class);
}

相关文章

Flowable类方法