本文整理了Java中io.reactivex.Flowable.lift()
方法的一些代码示例,展示了Flowable.lift()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.lift()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:lift
[英]This method requires advanced knowledge about building operators, please consider other standard composition methods first; Returns a Flowable which, when subscribed to, invokes the FlowableOperator#apply(Subscriber) method of the provided FlowableOperator for each individual downstream Subscriber and allows the insertion of a custom operator by accessing the downstream's Subscriber during this subscription phase and providing a new Subscriber, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream.
Generally, such a new Subscriber will wrap the downstream's Subscriber and forwards the onNext, onError and onComplete events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of cancel and request that would have traveled upstream and perform additional actions depending on the same business logic requirements.
Example:
// Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
// The downstream's Subscriber that will receive the onXXX events
final Subscriber<? super String> downstream;
// The connection to the upstream source that will call this class' onXXX methods
Subscription upstream;
// The constructor takes the downstream subscriber and usually any other parameters
public CustomSubscriber(Subscriber<? super String> downstream) {
this.downstream = downstream;
}
// In the subscription phase, the upstream sends a Subscription to this class
// and subsequently this class has to send a Subscription to the downstream.
// Note that relaying the upstream's Subscription directly is not allowed in RxJava
@Override
public void onSubscribe(Subscription s) {
if (upstream != null) {
s.cancel();
} else {
upstream = s;
downstream.onSubscribe(this);
}
}
// The upstream calls this with the next item and the implementation's
// responsibility is to emit an item to the downstream based on the intended
// business logic, or if it can't do so for the particular item,
// request more from the upstream
@Override
public void onNext(T item) {
String str = item.toString();
if (str.length() < 2) {
downstream.onNext(str);
} else {
upstream.request(1);
}
}
// Some operators may handle the upstream's error while others
// could just forward it to the downstream.
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
// When the upstream completes, usually the downstream should complete as well.
@Override
public void onComplete() {
downstream.onComplete();
}
// Some operators have to intercept the downstream's request calls to trigger
// the emission of queued items while others can simply forward the request
// amount as is.
@Override
public void request(long n) {
upstream.request(n);
}
// Some operators may use their own resources which should be cleaned up if
// the downstream cancels the flow before it completed. Operators without
// resources can simply forward the cancellation to the upstream.
// In some cases, a canceled flag may be set by this method so that other parts
// of this class may detect the cancellation and stop sending events
// to the downstream.
@Override
public void cancel() {
upstream.cancel();
}
}
// Step 2: Create a class that implements the FlowableOperator interface and
// returns the custom consumer type from above in its apply() method.
// Such class may define additional parameters to be submitted to
// the custom consumer type.
final class CustomOperator<T> implements FlowableOperator<String> {
@Override
public Subscriber<? super String> apply(Subscriber<? super T> upstream) {
return new CustomSubscriber<T>(upstream);
}
}
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
// or reusing an existing one.
Flowable.range(5, 10)
.lift(new CustomOperator<Integer>())
.test()
.assertResult("5", "6", "7", "8", "9");
Creating custom operators can be complicated and it is recommended one consults the RxJava wiki: Writing operators page about the tools, requirements, rules, considerations and pitfalls of implementing them.
Note that implementing custom operators via this lift() method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Flowableclass and creating a FlowableTransformer with it is recommended.
Note also that it is not possible to stop the subscription phase in lift() as the apply() method requires a non-null Subscriber instance to be returned, which is then unconditionally subscribed to the upstream Flowable. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return a Subscriber that should immediately cancel the upstream's Subscription in its onSubscribe method. Again, using a FlowableTransformer and extending the Flowable is a better option as #subscribeActual can decide to not subscribe to its upstream after all. Backpressure: The Subscriber instance returned by the FlowableOperator is responsible to be backpressure-aware or document the fact that the consumer of the returned Publisher has to apply one of the onBackpressureXXX operators. Scheduler: lift does not operate by default on a particular Scheduler, however, the FlowableOperator may use a Scheduler to support its own asynchronous behavior.
[中]这种方法需要先进的建筑操作员的知识,请先考虑其他标准的作曲方法;返回一个FlowTable,在订阅时,为每个下游订户调用提供的FlowTableOperator的FlowTableOperator#apply(Subscriber)方法,并允许通过在此订阅阶段访问下游订户并提供新订户来插入自定义运算符,包含自定义操作员的预期业务逻辑,将在进一步向上游的订阅过程中使用。
通常,这样的新订户将包装下游的订户,并直接或根据定制运营商的业务逻辑要求的发射模式,从上游转发onNext、onError和onComplete事件。此外,这样的操作员可以拦截本应向上游移动的cancel和request的流控制调用,并根据相同的业务逻辑要求执行其他操作。
例子:
// Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
// The downstream's Subscriber that will receive the onXXX events
final Subscriber<? super String> downstream;
// The connection to the upstream source that will call this class' onXXX methods
Subscription upstream;
// The constructor takes the downstream subscriber and usually any other parameters
public CustomSubscriber(Subscriber<? super String> downstream) {
this.downstream = downstream;
}
// In the subscription phase, the upstream sends a Subscription to this class
// and subsequently this class has to send a Subscription to the downstream.
// Note that relaying the upstream's Subscription directly is not allowed in RxJava
@Override
public void onSubscribe(Subscription s) {
if (upstream != null) {
s.cancel();
} else {
upstream = s;
downstream.onSubscribe(this);
}
}
// The upstream calls this with the next item and the implementation's
// responsibility is to emit an item to the downstream based on the intended
// business logic, or if it can't do so for the particular item,
// request more from the upstream
@Override
public void onNext(T item) {
String str = item.toString();
if (str.length() < 2) {
downstream.onNext(str);
} else {
upstream.request(1);
}
}
// Some operators may handle the upstream's error while others
// could just forward it to the downstream.
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
// When the upstream completes, usually the downstream should complete as well.
@Override
public void onComplete() {
downstream.onComplete();
}
// Some operators have to intercept the downstream's request calls to trigger
// the emission of queued items while others can simply forward the request
// amount as is.
@Override
public void request(long n) {
upstream.request(n);
}
// Some operators may use their own resources which should be cleaned up if
// the downstream cancels the flow before it completed. Operators without
// resources can simply forward the cancellation to the upstream.
// In some cases, a canceled flag may be set by this method so that other parts
// of this class may detect the cancellation and stop sending events
// to the downstream.
@Override
public void cancel() {
upstream.cancel();
}
}
// Step 2: Create a class that implements the FlowableOperator interface and
// returns the custom consumer type from above in its apply() method.
// Such class may define additional parameters to be submitted to
// the custom consumer type.
final class CustomOperator<T> implements FlowableOperator<String> {
@Override
public Subscriber<? super String> apply(Subscriber<? super T> upstream) {
return new CustomSubscriber<T>(upstream);
}
}
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
// or reusing an existing one.
Flowable.range(5, 10)
.lift(new CustomOperator<Integer>())
.test()
.assertResult("5", "6", "7", "8", "9");
创建自定义运算符可能会很复杂,建议您参考{$0$}页面,了解实现自定义运算符的工具、要求、规则、注意事项和陷阱。
请注意,通过此lift()方法实现自定义运算符会稍微增加开销,因为每个组装的流需要额外的分配和间接寻址。相反,建议扩展抽象Flowableclass并使用它创建FlowableTransformer。
还请注意,在lift()中不可能停止订阅阶段,因为apply()方法要求返回一个非空的订阅服务器实例,然后该实例将无条件地订阅给上游的可流动订阅服务器。例如,如果运营商由于某种优化可能性或运营商准备失败而决定没有理由订阅上游源,那么它仍然必须返回一个订户,该订户应在其onSubscribe方法中立即取消上游的订阅。同样,使用FlowableTransformer并扩展FlowableTransformer是一个更好的选择,因为#subscribeActual最终可以决定不订阅其上游。Backpressure:FlowableOperator返回的订阅服务器实例负责识别或记录返回的发布服务器的使用者必须应用onBackpressureXXX运算符之一的事实。调度器:默认情况下,lift不会在特定的调度器上运行,但是,FlowableOperator可以使用调度器来支持其自身的异步行为。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.lift(new FlowableOperator<Object, Object>() {
@Override
public Subscriber<? super Object> apply(
Subscriber<? super Object> s) throws Exception {
return new SubscriberResourceWrapper<Object>(s);
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void liftNull() {
just1.lift(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void liftReturnsNull() {
just1.lift(new FlowableOperator<Object, Integer>() {
@Override
public Subscriber<? super Integer> apply(Subscriber<? super Object> s) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.never().lift(new FlowableOperator<Object, Object>() {
@Override
public Subscriber<? super Object> apply(
Subscriber<? super Object> s) throws Exception {
return new SubscriberResourceWrapper<Object>(s);
}
}));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnStartCalledOnceViaLift() {
final AtomicInteger c = new AtomicInteger();
Flowable.just(1, 2, 3, 4).lift(new FlowableOperator<Integer, Integer>() {
@Override
public Subscriber<? super Integer> apply(final Subscriber<? super Integer> child) {
return new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
c.incrementAndGet();
request(1);
}
@Override
public void onComplete() {
child.onComplete();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Integer t) {
child.onNext(t);
request(1);
}
};
}
}).subscribe();
assertEquals(1, c.get());
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitAllEvents() {
Flowable.fromArray("Event 1", "Event 2")
.lift(BulkheadOperator.of(bulkhead))
.test()
.assertResult("Event 1", "Event 2");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
Flowable.error(new IOException("BAM!"))
.lift(BulkheadOperator.of(bulkhead))
.test()
.assertSubscribed()
.assertError(IOException.class)
.assertNotComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Test that we receive the onError if an exception is thrown from an operator that
* does not have manual try/catch handling like map does.
*/
@Test
@Ignore("Failed operator may leave the child subscriber in an inconsistent state which prevents further error delivery.")
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.just(1).lift(new FlowableOperator<String, Integer>() {
@Override
public Subscriber<? super Integer> apply(Subscriber<? super String> t1) {
throw new RuntimeException("failed");
}
}).onErrorResumeNext(new Function<Throwable, Flowable<String>>() {
@Override
public Flowable<String> apply(Throwable t1) {
if (t1.getMessage().equals("failed")) {
return Flowable.just("success");
} else {
return Flowable.error(t1);
}
}
}).subscribe(ts);
ts.assertTerminated();
System.out.println(ts.values());
ts.assertValue("success");
}
代码示例来源:origin: ReactiveX/RxJava
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.just(1).lift(new FlowableOperator<String, Integer>() {
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithBulkheadFullException() {
bulkhead.isCallPermitted();
Flowable.fromArray("Event 1", "Event 2")
.lift(BulkheadOperator.of(bulkhead))
.test()
.assertSubscribed()
.assertError(BulkheadFullException.class)
.assertNotComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
Flowable.error(new IOException("BAM!"))
.lift(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertSubscribed()
.assertError(IOException.class)
.assertNotComplete();
assertSingleFailedCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
Flowable.error(new IOException("BAM!"))
.lift(RateLimiterOperator.of(rateLimiter))
.test()
.assertSubscribed()
.assertError(IOException.class)
.assertNotComplete();
assertSinglePermitUsed();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitSingleEventWithSinglePermit() {
Flowable.just(1)
.lift(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1);
assertSinglePermitUsed();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitAllEvents() {
Flowable.fromArray("Event 1", "Event 2")
.lift(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertResult("Event 1", "Event 2");
assertSingleSuccessfulCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitAllEvents() {
Flowable.fromArray(1, 2)
.lift(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1, 2);
assertUsedPermits(2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSetProducerSynchronousRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.just(1, 2, 3).lift(new FlowableOperator<Integer, Integer>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callbackCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.lift(new FlowableOperator<Object, Integer>() {
@Override
public Subscriber<? super Integer> apply(Subscriber<? super Object> subscriber) throws Exception {
throw new TestException();
}
})
.test();
fail("Should have thrown");
} catch (NullPointerException ex) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
代码示例来源:origin: ReactiveX/RxJava
.lift(new FlowableOperator<Long, Long>() {
@Override
public Subscriber<? super Long> apply(final Subscriber<? super Long> child) {
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
saturateRateLimiter();
Flowable.just(1)
.lift(RateLimiterOperator.of(rateLimiter))
.test()
.assertSubscribed()
.assertError(RequestNotPermitted.class)
.assertNotComplete();
assertNoPermitLeft();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
circuitBreaker.transitionToOpenState();
Flowable.fromArray("Event 1", "Event 2")
.lift(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertSubscribed()
.assertError(CircuitBreakerOpenException.class)
.assertNotComplete();
assertNoRegisteredCall();
}
内容来源于网络,如有侵权,请联系作者删除!