本文整理了Java中io.reactivex.Flowable.subscribe()
方法的一些代码示例,展示了Flowable.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribe()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribe
[英]Subscribes to a Publisher and ignores onNext and onComplete emissions.
If the Flowable emits an error, it is wrapped into an io.reactivex.exceptions.OnErrorNotImplementedExceptionand routed to the RxJavaPlugins.onError handler. Backpressure: The operator consumes the source Publisher in an unbounded manner (i.e., no backpressure is applied to it). Scheduler: subscribe does not operate by default on a particular Scheduler.
[中]订阅发布者并忽略onNext和onComplete排放。
如果Flowable发出错误,它将被包装到io中。reactivex。例外情况。OnErrorNotImplementedException并路由到RxJavaPlugins。一个错误处理器。背压:操作员以无限制的方式使用源发布服务器(即,不向其应用背压)。调度程序:默认情况下,订阅不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
protected void subscribeActual(Subscriber<? super T> child) {
TakeUntilMainSubscriber<T> parent = new TakeUntilMainSubscriber<T>(child);
child.onSubscribe(parent);
other.subscribe(parent.other);
source.subscribe(parent);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public void onNext(Flowable<Integer> args) {
final Subscriber<Object> mo = TestHelper.mockSubscriber();
values.add(mo);
args.subscribe(mo);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new DebounceTimedSubscriber<T>(
new SerializedSubscriber<T>(s),
timeout, unit, scheduler.createWorker()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
f.subscribe();
f.subscribe();
f.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipError() {
Flowable<Integer> src = Flowable.just(1, 2, 42, 5, 3, 1);
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, never()).onNext(anyInt());
inOrder.verify(w, never()).onComplete();
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testUnsubscribeFromSynchronousInfiniteFlowable() {
final AtomicLong count = new AtomicLong();
INFINITE_OBSERVABLE.take(10).subscribe(new Consumer<Long>() {
@Override
public void accept(Long l) {
count.set(l);
}
});
assertEquals(10, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipEverything() {
Flowable<Integer> src = Flowable.just(1, 2, 3, 4, 3, 2, 1);
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTimerOnce() {
Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(0L);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake1() {
Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
Flowable<String> take = w.take(2);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
take.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, never()).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipEmptyStream() {
Flowable<String> w = Flowable.empty();
Flowable<String> skip = w.skip(1);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, never()).onNext(any(String.class));
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipTwoElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(2);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, never()).onNext("one");
verify(subscriber, never()).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values not allowed")
public void testSkipLastWithNull() {
Flowable<String> flowable = Flowable.fromIterable(Arrays.asList("one", null, "two")).skipLast(1);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
flowable.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext(null);
verify(subscriber, never()).onNext("two");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatError() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
Flowable.error(new TestException()).repeat(10).subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipError() {
Exception e = new Exception();
Flowable<String> ok = Flowable.just("one");
Flowable<String> error = Flowable.error(e);
Flowable<String> skip = Flowable.concat(ok, error).skip(100);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, never()).onNext(any(String.class));
verify(subscriber, times(1)).onError(e);
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleOrDefaultWithTooManyElementsFlowable() {
Flowable<Integer> flowable = Flowable.just(1, 2).single(3).toFlowable();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
flowable.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onError(
isA(IllegalArgumentException.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testJustTwoEmissionsObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.just("First", "Second").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(IllegalArgumentException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testEmptyObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.<String>empty().single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertResult("");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRequestOverflowDoesNotOccur() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(Long.MAX_VALUE - 1);
Flowable.range(1, 10).skip(5).subscribe(ts);
ts.assertTerminated();
ts.assertComplete();
ts.assertNoErrors();
assertEquals(Arrays.asList(6, 7, 8, 9, 10), ts.values());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testJustTwoEmissionsObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Completable cmp = Flowable.just("First", "Second").ignoreElements();
cmp.<String>toFlowable().subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertNoValues();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure2() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
.observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
内容来源于网络,如有侵权,请联系作者删除!