本文整理了Java中io.reactivex.Flowable.safeSubscribe()
方法的一些代码示例,展示了Flowable.safeSubscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.safeSubscribe()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:safeSubscribe
[英]Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber (if not already a SafeSubscriber) that deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the Reactive-Streams specification). Backpressure: This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior. Scheduler: safeSubscribe does not operate by default on a particular Scheduler.
[中]订阅当前的Flowable并将给定订阅服务器包装到SafeSubscriber(如果还不是SafeSubscriber)中,该订阅服务器处理行为不端的订阅服务器(不遵循反应流规范)引发的异常。背压:该操作符离开反应世界,背压行为取决于订户的行为。调度程序:默认情况下,safeSubscribe不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void safeSubscribeNull() {
just1.safeSubscribe(null);
}
代码示例来源:origin: ReactiveX/RxJava
f.safeSubscribe(subscriber);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testPeriodicObserverThrows() {
Flowable<Long> source = Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
InOrder inOrder = inOrder(subscriber);
source.safeSubscribe(new DefaultSubscriber<Long>() {
@Override
public void onNext(Long t) {
if (t > 0) {
throw new TestException();
}
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
inOrder.verify(subscriber).onNext(0L);
inOrder.verify(subscriber).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnceObserverThrows() {
Flowable<Long> source = Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler);
source.safeSubscribe(new DefaultSubscriber<Long>() {
@Override
public void onNext(Long t) {
throw new TestException();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(anyLong());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
.safeSubscribe(subscriber);
代码示例来源:origin: ReactiveX/RxJava
.safeSubscribe(new DefaultSubscriber<String>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeWithErrorInObserver() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Flowable.just("1", "2", "three", "4").take(3)
.safeSubscribe(new DefaultSubscriber<String>() {
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
代码示例来源:origin: ReactiveX/RxJava
.safeSubscribe(new DefaultSubscriber<String>() {
@Override
public void onComplete() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void safeSubscriberAlreadySafe() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.just(1).safeSubscribe(new SafeSubscriber<Integer>(ts));
ts.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Flowable.<String>error(new RuntimeException()));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnCancel(unsub)
.doOnError(onError);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("disposed", "error"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeFinalValueThrows() {
Flowable<Integer> source = Flowable.just(1).take(1);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
throw new TestException();
}
};
source.safeSubscribe(ts);
ts.assertNoValues();
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnCancel(unsub)
.doOnComplete(completion);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("disposed", "completed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnCancel(unsub)
.doOnComplete(completion);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("completed", "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
final Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Flowable.<String>error(new RuntimeException()));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnCancel(unsub)
.doOnError(onError);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("error", "disposed"), events);
}
代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators
@Override
public CompletionStage<Void> apply(Flowable<I> source) {
WrappedSubscriber<I> wrapped = new WrappedSubscriber<>(subscriber);
source.safeSubscribe(wrapped);
return wrapped.future();
}
}
代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
Objects.requireNonNull(stage).getRsProcessor())
);
return source -> Flowable.defer(() -> {
Flowable<O> flowable = Flowable.fromPublisher(processor);
source.safeSubscribe(processor);
return flowable;
});
}
}
代码示例来源:origin: iagocanalejas/retrocache
@Test
public void bodyThrowingInOnNextDeliveredToError() {
server.enqueue(new MockResponse());
RecordingSubscriber<String> subscriber = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.body().safeSubscribe(new ForwardingSubscriber<String>(subscriber) {
@Override
public void onNext(String value) {
throw e;
}
});
subscriber.assertError(e);
}
代码示例来源:origin: iagocanalejas/retrocache
@Test
public void responseThrowingInOnNextDeliveredToError() {
server.enqueue(new MockResponse());
RecordingSubscriber<Response<String>> subscriber = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.response().safeSubscribe(new ForwardingSubscriber<Response<String>>(subscriber) {
@Override
public void onNext(Response<String> value) {
throw e;
}
});
subscriber.assertError(e);
}
代码示例来源:origin: iagocanalejas/retrocache
@Test
public void resultThrowingInOnNextDeliveredToError() {
server.enqueue(new MockResponse());
RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
@Override
public void onNext(Result<String> value) {
throw e;
}
});
subscriber.assertError(e);
}
代码示例来源:origin: iagocanalejas/retrocache
@Test
public void resultThrowingInOnErrorDeliveredToPlugin() {
server.enqueue(new MockResponse());
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (!throwableRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
}
});
RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
final RuntimeException first = new RuntimeException();
final RuntimeException second = new RuntimeException();
service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
@Override
public void onNext(Result<String> value) {
// The only way to trigger onError for a result is if onNext throws.
throw first;
}
@Override
public void onError(Throwable throwable) {
throw second;
}
});
//noinspection ThrowableResultOfMethodCallIgnored
CompositeException composite = (CompositeException) throwableRef.get();
assertThat(composite.getExceptions()).containsExactly(first, second);
}
内容来源于网络,如有侵权,请联系作者删除!