本文整理了Java中io.reactivex.Flowable.doOnLifecycle()
方法的一些代码示例,展示了Flowable.doOnLifecycle()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnLifecycle()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnLifecycle
[英]Calls the appropriate onXXX method (shared between all Subscribers) for the lifecycle events of the sequence (subscription, cancellation, requesting).
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnLifecycle does not operate by default on a particular Scheduler.
[中]为序列的生命周期事件(订阅、取消、请求)调用适当的onXXX方法(在所有订阅服务器之间共享)。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:默认情况下,doOnLifecycle不会在特定调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
return f
.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
calls[0]++;
}
}, Functions.EMPTY_LONG_CONSUMER, new Action() {
@Override
public void run() throws Exception {
calls[1]++;
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnLifecycleOnSubscribeNull() {
just1.doOnLifecycle(null, new LongConsumer() {
@Override
public void accept(long v) { }
}, new Action() {
@Override
public void run() { }
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnLifecycleOnCancelNull() {
just1.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) { }
}, new LongConsumer() {
@Override
public void accept(long v) { }
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnLifecycleOnRequestNull() {
just1.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) { }
}, null, new Action() {
@Override
public void run() { }
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnLifecycleOnDisposeNull() {
just1.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) { }
},
new LongConsumer() {
@Override
public void accept(long v) throws Exception { }
},
null);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source {@code Publisher} so that it invokes the given action when it is subscribed from
* its subscribers. Each subscription will result in an invocation of the given action except when the
* source {@code Publisher} is reference counted, in which case the source {@code Publisher} will invoke
* the given action for the first subscription.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnSubscribe.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* the Consumer that gets called when a Subscriber subscribes to the current {@code Flowable}
* @return the source {@code Publisher} modified so as to call this Consumer when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
return doOnLifecycle(onSubscribe, Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source {@code Publisher} so that it invokes the given action when it receives a
* request for more items.
* <p>
* <b>Note:</b> This operator is for tracing the internal behavior of back-pressure request
* patterns and generally intended for debugging use.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnRequest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onRequest
* the action that gets called when a Subscriber requests items from this
* {@code Publisher}
* @return the source {@code Publisher} modified so as to call this Action when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators
* documentation: Do</a>
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnRequest(LongConsumer onRequest) {
return doOnLifecycle(Functions.emptyConsumer(), onRequest, Functions.EMPTY_ACTION);
}
代码示例来源:origin: redisson/redisson
/**
* Modifies the source {@code Publisher} so that it invokes the given action when it is subscribed from
* its subscribers. Each subscription will result in an invocation of the given action except when the
* source {@code Publisher} is reference counted, in which case the source {@code Publisher} will invoke
* the given action for the first subscription.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnSubscribe.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* the Consumer that gets called when a Subscriber subscribes to the current {@code Flowable}
* @return the source {@code Publisher} modified so as to call this Consumer when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
return doOnLifecycle(onSubscribe, Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnCancel(Action onCancel) {
return doOnLifecycle(Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, onCancel);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSubscribeCrashed() {
Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
throw new TestException();
}
}, Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION)
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: redisson/redisson
/**
* Modifies the source {@code Publisher} so that it invokes the given action when it receives a
* request for more items.
* <p>
* <b>Note:</b> This operator is for tracing the internal behavior of back-pressure request
* patterns and generally intended for debugging use.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnRequest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onRequest
* the action that gets called when a Subscriber requests items from this
* {@code Publisher}
* @return the source {@code Publisher} modified so as to call this Action when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators
* documentation: Do</a>
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnRequest(LongConsumer onRequest) {
return doOnLifecycle(Functions.emptyConsumer(), onRequest, Functions.EMPTY_ACTION);
}
代码示例来源:origin: redisson/redisson
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnCancel(Action onCancel) {
return doOnLifecycle(Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, onCancel);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
final int[] calls = { 0, 0 };
TestHelper.checkDisposed(Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
calls[0]++;
}
}, Functions.EMPTY_LONG_CONSUMER, new Action() {
@Override
public void run() throws Exception {
calls[1]++;
}
})
);
assertEquals(1, calls[0]);
assertEquals(1, calls[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void requestCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
new LongConsumer() {
@Override
public void accept(long v) throws Exception {
throw new TestException();
}
},
Functions.EMPTY_ACTION)
.test()
.assertResult(1);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
throw new TestException();
}
})
.take(1)
.test()
.assertResult(1);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!