io.reactivex.Observable.compose()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(362)

本文整理了Java中io.reactivex.Observable.compose()方法的一些代码示例,展示了Observable.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.compose()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:compose

Observable.compose介绍

[英]Transform an ObservableSource by applying a particular Transformer function to it.

This method operates on the ObservableSource itself whereas #lift operates on the ObservableSource's Observers.

If the operator you are creating is designed to act on the individual items emitted by a source ObservableSource, use #lift. If your operator is designed to transform the source ObservableSource as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对一个可观察资源应用特定的变换函数来变换它。
该方法对可观察资源本身进行操作,而#lift对可观察资源的观察者进行操作。
如果您正在创建的操作符旨在作用于源可观测源发出的单个项目,请使用#lift。如果操作符被设计为将源ObservableSource作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。调度器:默认情况下,compose不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void composeNull() {
  3. just1.compose(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void observableGenericsSignatureTest() {
  3. A<String, Integer> a = new A<String, Integer>() { };
  4. Observable.just(a).compose(TransformerTest.<String>testObservableTransformerCreator());
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unused")
  2. @Test
  3. public void testCovarianceOfCompose() {
  4. Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
  5. Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
  6. @Override
  7. public Observable<Movie> apply(Observable<HorrorMovie> t) {
  8. return Observable.just(new Movie());
  9. }
  10. });
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unused")
  2. @Test
  3. public void testCovarianceOfCompose4() {
  4. Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
  5. Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, HorrorMovie>() {
  6. @Override
  7. public Observable<HorrorMovie> apply(Observable<HorrorMovie> t1) {
  8. return t1.map(new Function<HorrorMovie, HorrorMovie>() {
  9. @Override
  10. public HorrorMovie apply(HorrorMovie v) {
  11. return v;
  12. }
  13. });
  14. }
  15. });
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unused")
  2. @Test
  3. public void testCovarianceOfCompose2() {
  4. Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
  5. Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
  6. @Override
  7. public Observable<HorrorMovie> apply(Observable<Movie> t) {
  8. return Observable.just(new HorrorMovie());
  9. }
  10. });
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unused")
  2. @Test
  3. public void testCovarianceOfCompose3() {
  4. Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
  5. Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
  6. @Override
  7. public Observable<HorrorMovie> apply(Observable<Movie> t) {
  8. return Observable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
  9. @Override
  10. public HorrorMovie apply(HorrorMovie v) {
  11. return v;
  12. }
  13. });
  14. }
  15. }
  16. );
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void observableTransformerThrows() {
  3. try {
  4. Observable.just(1).compose(new ObservableTransformer<Integer, Integer>() {
  5. @Override
  6. public Observable<Integer> apply(Observable<Integer> v) {
  7. throw new TestException("Forced failure");
  8. }
  9. });
  10. fail("Should have thrown!");
  11. } catch (TestException ex) {
  12. assertEquals("Forced failure", ex.getMessage());
  13. }
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testComposeWithDeltaLogic() {
  3. List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
  4. List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
  5. Observable<List<Movie>> movies = Observable.just(list1, list2);
  6. movies.compose(deltaTransformer);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCompose() {
  3. TestObserver<String> to = new TestObserver<String>();
  4. Observable.just(1, 2, 3).compose(new ObservableTransformer<Integer, String>() {
  5. @Override
  6. public Observable<String> apply(Observable<Integer> t1) {
  7. return t1.map(new Function<Integer, String>() {
  8. @Override
  9. public String apply(Integer v) {
  10. return String.valueOf(v);
  11. }
  12. });
  13. }
  14. })
  15. .subscribe(to);
  16. to.assertTerminated();
  17. to.assertNoErrors();
  18. to.assertValues("1", "2", "3");
  19. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. private static Observable<PresenterEvent> setupReadingBehaviour(Observable<Boolean> readClicks,
  2. BluetoothGattCharacteristic characteristic,
  3. RxBleConnection connection) {
  4. return !hasProperty(characteristic, BluetoothGattCharacteristic.PROPERTY_READ)
  5. // if the characteristic is not readable return an empty (dummy) observable
  6. ? Observable.empty()
  7. : readClicks // else use the readClicks observable from the activity
  8. // every click is requesting a read operation from the peripheral
  9. .flatMapSingle(ignoredClick -> connection.readCharacteristic(characteristic))
  10. .compose(transformToPresenterEvent(Type.READ)); // convenience method to wrap reads
  11. }

代码示例来源:origin: trello/RxLifecycle

  1. @Test
  2. public void testBindLifecycle() {
  3. BehaviorSubject<Object> lifecycle = BehaviorSubject.create();
  4. TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
  5. testObserver.assertNotComplete();
  6. lifecycle.onNext(new Object());
  7. testObserver.assertComplete();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFusedPollCrash() {
  3. PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = ps
  5. .switchMap(Functions.justFunction(
  6. Observable.range(1, 5)
  7. .observeOn(ImmediateThinScheduler.INSTANCE)
  8. .map(new Function<Integer, Integer>() {
  9. @Override
  10. public Integer apply(Integer v) throws Exception {
  11. throw new TestException();
  12. }
  13. })
  14. .compose(TestHelper.<Integer>observableStripBoundary())
  15. ))
  16. .test();
  17. to.assertEmpty();
  18. ps.onNext(1);
  19. to
  20. .assertFailure(TestException.class);
  21. assertFalse(ps.hasObservers());
  22. }

代码示例来源:origin: trello/RxLifecycle

  1. @Test
  2. public void testEndsImmediatelyOutsideActivityLifecycle() {
  3. BehaviorSubject<ActivityEvent> lifecycle = BehaviorSubject.create();
  4. lifecycle.onNext(ActivityEvent.DESTROY);
  5. TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindActivity(lifecycle)).test();
  6. testObserver.assertComplete();
  7. }

代码示例来源:origin: trello/RxLifecycle

  1. @Test
  2. public void testBindLifecycleOtherObject() {
  3. // Ensures it works with other types as well, and not just "Object"
  4. BehaviorSubject<String> lifecycle = BehaviorSubject.create();
  5. TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
  6. testObserver.assertNotComplete();
  7. lifecycle.onNext("");
  8. testObserver.assertComplete();
  9. }

代码示例来源:origin: trello/RxLifecycle

  1. @Test
  2. public void testEndsImmediatelyOutsideLifecycle() {
  3. BehaviorSubject<Lifecycle.Event> lifecycle = BehaviorSubject.create();
  4. lifecycle.onNext(Lifecycle.Event.ON_DESTROY);
  5. TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroidLifecycle.bindLifecycle(lifecycle)).test();
  6. testObserver.assertComplete();
  7. }

代码示例来源:origin: trello/RxLifecycle

  1. @Test
  2. public void testEndsImmediatelyOutsideFragmentLifecycle() {
  3. BehaviorSubject<FragmentEvent> lifecycle = BehaviorSubject.create();
  4. lifecycle.onNext(FragmentEvent.DETACH);
  5. TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindFragment(lifecycle)).test();
  6. testObserver.assertComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFusedPollCrashDelayError() {
  3. PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = ps
  5. .switchMapDelayError(Functions.justFunction(
  6. Observable.range(1, 5)
  7. .observeOn(ImmediateThinScheduler.INSTANCE)
  8. .map(new Function<Integer, Integer>() {
  9. @Override
  10. public Integer apply(Integer v) throws Exception {
  11. throw new TestException();
  12. }
  13. })
  14. .compose(TestHelper.<Integer>observableStripBoundary())
  15. ))
  16. .test();
  17. to.assertEmpty();
  18. ps.onNext(1);
  19. assertTrue(ps.hasObservers());
  20. to.assertEmpty();
  21. ps.onComplete();
  22. to
  23. .assertFailure(TestException.class);
  24. assertFalse(ps.hasObservers());
  25. }

代码示例来源:origin: trello/RxLifecycle

  1. private void testBindUntilEvent(LifecycleOwner owner) {
  2. Fragment fragment = (Fragment) owner;
  3. ActivityController<?> controller = startFragment(fragment);
  4. TestObserver<Object> testObserver = observable.compose(AndroidLifecycle.createLifecycleProvider(owner).bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
  5. testObserver.assertNotComplete();
  6. controller.start();
  7. testObserver.assertNotComplete();
  8. controller.resume();
  9. testObserver.assertNotComplete();
  10. controller.pause();
  11. testObserver.assertNotComplete();
  12. controller.stop();
  13. testObserver.assertComplete();
  14. }

代码示例来源:origin: trello/RxLifecycle

  1. private void testBindUntilEvent(ActivityController<? extends LifecycleProvider<ActivityEvent>> controller) {
  2. LifecycleProvider<ActivityEvent> activity = controller.get();
  3. TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(STOP)).test();
  4. controller.create();
  5. testObserver.assertNotComplete();
  6. controller.start();
  7. testObserver.assertNotComplete();
  8. controller.resume();
  9. testObserver.assertNotComplete();
  10. controller.pause();
  11. testObserver.assertNotComplete();
  12. controller.stop();
  13. testObserver.assertComplete();
  14. }

代码示例来源:origin: trello/RxLifecycle

  1. private void testBindUntilEvent(ActivityController<? extends LifecycleOwner> controller) {
  2. LifecycleProvider<Lifecycle.Event> activity = AndroidLifecycle.createLifecycleProvider(controller.get());
  3. TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
  4. controller.create();
  5. testObserver.assertNotComplete();
  6. controller.start();
  7. testObserver.assertNotComplete();
  8. controller.resume();
  9. testObserver.assertNotComplete();
  10. controller.pause();
  11. testObserver.assertNotComplete();
  12. controller.stop();
  13. testObserver.assertComplete();
  14. }

相关文章

Observable类方法