io.reactivex.Flowable.subscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(322)

本文整理了Java中io.reactivex.Flowable.subscribe()方法的一些代码示例,展示了Flowable.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribe()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribe

Flowable.subscribe介绍

[英]Subscribes to a Publisher and ignores onNext and onComplete emissions.

If the Flowable emits an error, it is wrapped into an io.reactivex.exceptions.OnErrorNotImplementedExceptionand routed to the RxJavaPlugins.onError handler. Backpressure: The operator consumes the source Publisher in an unbounded manner (i.e., no backpressure is applied to it). Scheduler: subscribe does not operate by default on a particular Scheduler.
[中]订阅发布者并忽略onNext和onComplete排放。
如果Flowable发出错误,它将被包装到io中。reactivex。例外情况。OnErrorNotImplementedException并路由到RxJavaPlugins。一个错误处理器。背压:操作员以无限制的方式使用源发布服务器(即,不向其应用背压)。调度程序:默认情况下,订阅不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> child) {
  TakeUntilMainSubscriber<T> parent = new TakeUntilMainSubscriber<T>(child);
  child.onSubscribe(parent);
  other.subscribe(parent.other);
  source.subscribe(parent);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onNext(Flowable<Integer> args) {
  final Subscriber<Object> mo = TestHelper.mockSubscriber();
  values.add(mo);
  args.subscribe(mo);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> s) {
  source.subscribe(new DebounceTimedSubscriber<T>(
      new SerializedSubscriber<T>(s),
      timeout, unit, scheduler.createWorker()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
  f.subscribe();
  f.subscribe();
  f.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipError() {
  Flowable<Integer> src = Flowable.just(1, 2, 42, 5, 3, 1);
  src.skipWhile(LESS_THAN_FIVE).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, never()).onNext(anyInt());
  inOrder.verify(w, never()).onComplete();
  inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testUnsubscribeFromSynchronousInfiniteFlowable() {
  final AtomicLong count = new AtomicLong();
  INFINITE_OBSERVABLE.take(10).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long l) {
      count.set(l);
    }
  });
  assertEquals(10, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipEverything() {
  Flowable<Integer> src = Flowable.just(1, 2, 3, 4, 3, 2, 1);
  src.skipWhile(LESS_THAN_FIVE).subscribe(w);
  verify(w, never()).onNext(anyInt());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTimerOnce() {
  Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  verify(subscriber, times(1)).onNext(0L);
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake1() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  Flowable<String> take = w.take(2);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  take.subscribe(subscriber);
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, times(1)).onNext("two");
  verify(subscriber, never()).onNext("three");
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipEmptyStream() {
  Flowable<String> w = Flowable.empty();
  Flowable<String> skip = w.skip(1);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  skip.subscribe(subscriber);
  verify(subscriber, never()).onNext(any(String.class));
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipTwoElements() {
  Flowable<String> skip = Flowable.just("one", "two", "three").skip(2);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  skip.subscribe(subscriber);
  verify(subscriber, never()).onNext("one");
  verify(subscriber, never()).onNext("two");
  verify(subscriber, times(1)).onNext("three");
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values not allowed")
public void testSkipLastWithNull() {
  Flowable<String> flowable = Flowable.fromIterable(Arrays.asList("one", null, "two")).skipLast(1);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, times(1)).onNext(null);
  verify(subscriber, never()).onNext("two");
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatError() {
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  Flowable.error(new TestException()).repeat(10).subscribe(subscriber);
  verify(subscriber).onError(any(TestException.class));
  verify(subscriber, never()).onNext(any());
  verify(subscriber, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipError() {
  Exception e = new Exception();
  Flowable<String> ok = Flowable.just("one");
  Flowable<String> error = Flowable.error(e);
  Flowable<String> skip = Flowable.concat(ok, error).skip(100);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  skip.subscribe(subscriber);
  verify(subscriber, never()).onNext(any(String.class));
  verify(subscriber, times(1)).onError(e);
  verify(subscriber, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithTooManyElementsFlowable() {
  Flowable<Integer> flowable = Flowable.just(1, 2).single(3).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onError(
      isA(IllegalArgumentException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustTwoEmissionsObservableThrowsError() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.just("First", "Second").single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertError(IllegalArgumentException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testEmptyObservable() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.<String>empty().single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertResult("");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRequestOverflowDoesNotOccur() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>(Long.MAX_VALUE - 1);
  Flowable.range(1, 10).skip(5).subscribe(ts);
  ts.assertTerminated();
  ts.assertComplete();
  ts.assertNoErrors();
  assertEquals(Arrays.asList(6, 7, 8, 9, 10), ts.values());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustTwoEmissionsObservableThrowsError() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Completable cmp = Flowable.just("First", "Second").ignoreElements();
  cmp.<String>toFlowable().subscribe(subscriber);
  subscriber.assertNoErrors();
  subscriber.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}

相关文章

Flowable类方法