本文整理了Java中io.reactivex.Observable.compose()
方法的一些代码示例,展示了Observable.compose()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.compose()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:compose
[英]Transform an ObservableSource by applying a particular Transformer function to it.
This method operates on the ObservableSource itself whereas #lift operates on the ObservableSource's Observers.
If the operator you are creating is designed to act on the individual items emitted by a source ObservableSource, use #lift. If your operator is designed to transform the source ObservableSource as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对一个可观察资源应用特定的变换函数来变换它。
该方法对可观察资源本身进行操作,而#lift对可观察资源的观察者进行操作。
如果您正在创建的操作符旨在作用于源可观测源发出的单个项目,请使用#lift。如果操作符被设计为将源ObservableSource作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。调度器:默认情况下,compose不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void composeNull() {
just1.compose(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void observableGenericsSignatureTest() {
A<String, Integer> a = new A<String, Integer>() { };
Observable.just(a).compose(TransformerTest.<String>testObservableTransformerCreator());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
@Override
public Observable<Movie> apply(Observable<HorrorMovie> t) {
return Observable.just(new Movie());
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose4() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<HorrorMovie> t1) {
return t1.map(new Function<HorrorMovie, HorrorMovie>() {
@Override
public HorrorMovie apply(HorrorMovie v) {
return v;
}
});
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose2() {
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie());
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose3() {
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
@Override
public HorrorMovie apply(HorrorMovie v) {
return v;
}
});
}
}
);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void observableTransformerThrows() {
try {
Observable.just(1).compose(new ObservableTransformer<Integer, Integer>() {
@Override
public Observable<Integer> apply(Observable<Integer> v) {
throw new TestException("Forced failure");
}
});
fail("Should have thrown!");
} catch (TestException ex) {
assertEquals("Forced failure", ex.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testComposeWithDeltaLogic() {
List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
Observable<List<Movie>> movies = Observable.just(list1, list2);
movies.compose(deltaTransformer);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompose() {
TestObserver<String> to = new TestObserver<String>();
Observable.just(1, 2, 3).compose(new ObservableTransformer<Integer, String>() {
@Override
public Observable<String> apply(Observable<Integer> t1) {
return t1.map(new Function<Integer, String>() {
@Override
public String apply(Integer v) {
return String.valueOf(v);
}
});
}
})
.subscribe(to);
to.assertTerminated();
to.assertNoErrors();
to.assertValues("1", "2", "3");
}
代码示例来源:origin: Polidea/RxAndroidBle
private static Observable<PresenterEvent> setupReadingBehaviour(Observable<Boolean> readClicks,
BluetoothGattCharacteristic characteristic,
RxBleConnection connection) {
return !hasProperty(characteristic, BluetoothGattCharacteristic.PROPERTY_READ)
// if the characteristic is not readable return an empty (dummy) observable
? Observable.empty()
: readClicks // else use the readClicks observable from the activity
// every click is requesting a read operation from the peripheral
.flatMapSingle(ignoredClick -> connection.readCharacteristic(characteristic))
.compose(transformToPresenterEvent(Type.READ)); // convenience method to wrap reads
}
代码示例来源:origin: trello/RxLifecycle
@Test
public void testBindLifecycle() {
BehaviorSubject<Object> lifecycle = BehaviorSubject.create();
TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
testObserver.assertNotComplete();
lifecycle.onNext(new Object());
testObserver.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedPollCrash() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps
.switchMap(Functions.justFunction(
Observable.range(1, 5)
.observeOn(ImmediateThinScheduler.INSTANCE)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();
to.assertEmpty();
ps.onNext(1);
to
.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
代码示例来源:origin: trello/RxLifecycle
@Test
public void testEndsImmediatelyOutsideActivityLifecycle() {
BehaviorSubject<ActivityEvent> lifecycle = BehaviorSubject.create();
lifecycle.onNext(ActivityEvent.DESTROY);
TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindActivity(lifecycle)).test();
testObserver.assertComplete();
}
代码示例来源:origin: trello/RxLifecycle
@Test
public void testBindLifecycleOtherObject() {
// Ensures it works with other types as well, and not just "Object"
BehaviorSubject<String> lifecycle = BehaviorSubject.create();
TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
testObserver.assertNotComplete();
lifecycle.onNext("");
testObserver.assertComplete();
}
代码示例来源:origin: trello/RxLifecycle
@Test
public void testEndsImmediatelyOutsideLifecycle() {
BehaviorSubject<Lifecycle.Event> lifecycle = BehaviorSubject.create();
lifecycle.onNext(Lifecycle.Event.ON_DESTROY);
TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroidLifecycle.bindLifecycle(lifecycle)).test();
testObserver.assertComplete();
}
代码示例来源:origin: trello/RxLifecycle
@Test
public void testEndsImmediatelyOutsideFragmentLifecycle() {
BehaviorSubject<FragmentEvent> lifecycle = BehaviorSubject.create();
lifecycle.onNext(FragmentEvent.DETACH);
TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindFragment(lifecycle)).test();
testObserver.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedPollCrashDelayError() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps
.switchMapDelayError(Functions.justFunction(
Observable.range(1, 5)
.observeOn(ImmediateThinScheduler.INSTANCE)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();
to.assertEmpty();
ps.onNext(1);
assertTrue(ps.hasObservers());
to.assertEmpty();
ps.onComplete();
to
.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
代码示例来源:origin: trello/RxLifecycle
private void testBindUntilEvent(LifecycleOwner owner) {
Fragment fragment = (Fragment) owner;
ActivityController<?> controller = startFragment(fragment);
TestObserver<Object> testObserver = observable.compose(AndroidLifecycle.createLifecycleProvider(owner).bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
testObserver.assertNotComplete();
controller.start();
testObserver.assertNotComplete();
controller.resume();
testObserver.assertNotComplete();
controller.pause();
testObserver.assertNotComplete();
controller.stop();
testObserver.assertComplete();
}
代码示例来源:origin: trello/RxLifecycle
private void testBindUntilEvent(ActivityController<? extends LifecycleProvider<ActivityEvent>> controller) {
LifecycleProvider<ActivityEvent> activity = controller.get();
TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(STOP)).test();
controller.create();
testObserver.assertNotComplete();
controller.start();
testObserver.assertNotComplete();
controller.resume();
testObserver.assertNotComplete();
controller.pause();
testObserver.assertNotComplete();
controller.stop();
testObserver.assertComplete();
}
代码示例来源:origin: trello/RxLifecycle
private void testBindUntilEvent(ActivityController<? extends LifecycleOwner> controller) {
LifecycleProvider<Lifecycle.Event> activity = AndroidLifecycle.createLifecycleProvider(controller.get());
TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
controller.create();
testObserver.assertNotComplete();
controller.start();
testObserver.assertNotComplete();
controller.resume();
testObserver.assertNotComplete();
controller.pause();
testObserver.assertNotComplete();
controller.stop();
testObserver.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!