本文整理了Java中io.reactivex.Flowable.compose()
方法的一些代码示例,展示了Flowable.compose()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.compose()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:compose
[英]Transform a Publisher by applying a particular Transformer function to it.
This method operates on the Publisher itself whereas #lift operates on the Publisher's Subscribers or Subscribers.
If the operator you are creating is designed to act on the individual items emitted by a source Publisher, use #lift. If your operator is designed to transform the source Publisher as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Backpressure: The operator itself doesn't interfere with the backpressure behavior which only depends on what kind of Publisher the transformer returns. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对发布服务器应用特定的转换器函数来转换发布服务器。
此方法对发布服务器本身进行操作,而#lift对发布服务器的订阅者或订阅者进行操作。
如果要创建的操作符旨在对源发布者发出的单个项目执行操作,请使用#lift。如果您的操作符旨在将源发布服务器作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。背压:操作员本身不会干扰背压行为,而背压行为只取决于转换器返回的发布器类型。调度程序:默认情况下,compose不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void composeNull() {
just1.compose(null);
}
代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_compose_operator_example);
/*
Compose for reusable code.
*/
Observable.just(1, 2, 3, 4, 5)
.compose(schedulers.<Integer>applyObservableAsync())
.subscribe(/* */);
Flowable.just(1, 2, 3, 4, 5)
.compose(schedulers.<Integer>applyFlowableAsysnc())
.subscribe(/* */);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose() {
Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
Flowable<Movie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, Movie>() {
@Override
public Publisher<Movie> apply(Flowable<HorrorMovie> t) {
return Flowable.just(new Movie());
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void flowableGenericsSignatureTest() {
A<String, Integer> a = new A<String, Integer>() { };
Flowable.just(a).compose(TransformerTest.<String>testFlowableTransformerCreator());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose4() {
Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, HorrorMovie>() {
@Override
public Publisher<HorrorMovie> apply(Flowable<HorrorMovie> t1) {
return t1.map(new Function<HorrorMovie, HorrorMovie>() {
@Override
public HorrorMovie apply(HorrorMovie v) {
return v;
}
});
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose2() {
Flowable<Movie> movie = Flowable.<Movie> just(new HorrorMovie());
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
@Override
public Publisher<HorrorMovie> apply(Flowable<Movie> t) {
return Flowable.just(new HorrorMovie());
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose3() {
Flowable<Movie> movie = Flowable.<Movie>just(new HorrorMovie());
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
@Override
public Publisher<HorrorMovie> apply(Flowable<Movie> t) {
return Flowable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
@Override
public HorrorMovie apply(HorrorMovie v) {
return v;
}
});
}
}
);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testComposeWithDeltaLogic() {
List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
Flowable<List<Movie>> movies = Flowable.just(list1, list2);
movies.compose(deltaTransformer);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void flowableTransformerThrows() {
try {
Flowable.just(1).compose(new FlowableTransformer<Integer, Integer>() {
@Override
public Publisher<Integer> apply(Flowable<Integer> v) {
throw new TestException("Forced failure");
}
});
fail("Should have thrown!");
} catch (TestException ex) {
assertEquals("Forced failure", ex.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompose() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.just(1, 2, 3).compose(new FlowableTransformer<Integer, String>() {
@Override
public Publisher<String> apply(Flowable<Integer> t1) {
return t1.map(new Function<Integer, String>() {
@Override
public String apply(Integer v) {
return String.valueOf(v);
}
});
}
})
.subscribe(ts);
ts.assertTerminated();
ts.assertNoErrors();
ts.assertValues("1", "2", "3");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void pollThrowsDelayError() {
Flowable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.concatMapDelayError(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v)
throws Exception {
return Flowable.just(v);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedMapCrash() {
Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(new StripBoundary<Object>(null))
.parallel()
.sequential()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void pollThrows() {
Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.flowableStripBoundary())
.publish()
.autoConnect()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void pollThrows() {
Flowable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.concatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v)
throws Exception {
return Flowable.just(v);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void pollThrowsNoSubscribers() {
ConnectableFlowable<Integer> cf = Flowable.just(1, 2)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
if (v == 2) {
throw new TestException();
}
return v;
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.publish();
TestSubscriber<Integer> ts = cf.take(1)
.test();
cf.connect();
ts.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedInnerCrash() {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Object>flowableStripBoundary())
)
)
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorCombinedFused() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.compose(new FlowableTransformer<Integer, Integer>() {
@Override
public Publisher<Integer> apply(Flowable<Integer> v) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedMapCrash() {
UnicastProcessor<Integer> up = UnicastProcessor.create();
up.onNext(1);
up
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(new StripBoundary<Object>(null))
.parallel()
.sequential()
.test()
.assertFailure(TestException.class);
assertFalse(up.hasSubscribers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorCombinedFusedConditional() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.compose(new FlowableTransformer<Integer, Integer>() {
@Override
public Publisher<Integer> apply(Flowable<Integer> v) {
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldNotRetryFromPredicateUsingFlowable() {
//Given
RetryConfig config = RetryConfig.custom()
.retryOnException(t -> t instanceof IOException)
.maxAttempts(3).build();
Retry retry = Retry.of("testName", config);
given(helloWorldService.returnHelloWorld())
.willThrow(new WebServiceException("BAM!"));
//When
Flowable.fromCallable(helloWorldService::returnHelloWorld)
.compose(RetryTransformer.of(retry))
.test()
.assertError(WebServiceException.class)
.assertNotComplete()
.assertSubscribed();
//Then
BDDMockito.then(helloWorldService).should(Mockito.times(1)).returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(1);
assertThat(metrics.getNumberOfFailedCallsWithRetryAttempt()).isEqualTo(0);
}
内容来源于网络,如有侵权,请联系作者删除!