本文整理了Java中io.reactivex.Flowable.doAfterTerminate()
方法的一些代码示例,展示了Flowable.doAfterTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doAfterTerminate()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doAfterTerminate
[英]Registers an Action to be called when this Publisher invokes either Subscriber#onComplete or Subscriber#onError.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doAfterTerminate does not operate by default on a particular Scheduler.
[中]注册此发布服务器调用Subscriber#onComplete或Subscriber#onError时要调用的操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doAfterTerminate默认情况下不在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doAfterTerminateNull() {
just1.doAfterTerminate(null);
}
代码示例来源:origin: ReactiveX/RxJava
private void checkActionCalled(Flowable<String> input) {
input.doAfterTerminate(aAction0).subscribe(subscriber);
try {
verify(aAction0, times(1)).run();
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullActionShouldBeCheckedInConstructor() {
try {
Flowable.empty().doAfterTerminate(null);
fail("Should have thrown NullPointerException");
} catch (NullPointerException expected) {
assertEquals("onAfterTerminate is null", expected.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullFinallyActionShouldBeCheckedASAP() {
try {
Flowable
.just("value")
.doAfterTerminate(null);
fail();
} catch (NullPointerException expected) {
}
}
代码示例来源:origin: micronaut-projects/micronaut-core
Flowable buildFlowable(ReplaySubject subject, Integer dataKey, boolean controlsFlow) {
Flowable flowable = FlowableReplay.createFrom(subject.toFlowable(BackpressureStrategy.BUFFER)).refCount();
if (controlsFlow) {
flowable = flowable.doOnRequest(onRequest);
}
return flowable
.doAfterTerminate(() -> {
if (controlsFlow) {
HttpDataReference dataReference = dataReferences.get(dataKey);
dataReference.destroy();
}
});
}
代码示例来源:origin: ReactiveX/RxJava
}).doAfterTerminate(new Action() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void ifFinallyActionThrowsExceptionShouldNotBeSwallowedAndActionShouldBeCalledOnce() throws Exception {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Action finallyAction = Mockito.mock(Action.class);
doThrow(new IllegalStateException()).when(finallyAction).run();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
Flowable
.just("value")
.doAfterTerminate(finallyAction)
.subscribe(testSubscriber);
testSubscriber.assertValue("value");
verify(finallyAction).run();
TestHelper.assertError(errors, 0, IllegalStateException.class);
// Actual result:
// Not only IllegalStateException was swallowed
// But finallyAction was called twice!
} finally {
RxJavaPlugins.reset();
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfter() {
final int[] call = { 0 };
Flowable.just(1)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
})
.test()
.assertResult(1);
assertEquals(1, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new TestException());
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorAfterCrashConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new TestException());
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrashConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: akarnokd/RxJava2Extensions
@Override
public void run() throws Exception {
Flowable.just(1)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
scheduler.shutdown();
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
t1[0] = Thread.currentThread();
}
});
}
});
代码示例来源:origin: akarnokd/RxJava2Extensions
@Override
public void run() throws Exception {
Flowable.range(1, 5)
.subscribeOn(scheduler)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
scheduler.shutdown();
}
})
.subscribe(ts);
ts.assertEmpty();
}
});
代码示例来源:origin: akarnokd/RxJava2Extensions
@Override
public void run() throws Exception {
Flowable.range(1, 5)
.subscribeOn(scheduler)
.delay(100, TimeUnit.MILLISECONDS, scheduler)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
scheduler.shutdown();
}
})
.subscribe(ts);
ts.assertEmpty();
}
});
代码示例来源:origin: reactiverse/reactive-pg-client
private Flowable<Row> createFlowable(String sql) {
return pool.rxBegin()
.flatMapPublisher(tx -> tx.rxPrepare(sql)
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
PgStream<io.reactiverse.reactivex.pgclient.Row> stream = preparedQuery.createStream(50, Tuple.tuple());
return stream.toFlowable();
})
.doAfterTerminate(tx::commit));
}
内容来源于网络,如有侵权,请联系作者删除!