io.reactivex.Flowable.compose()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(259)

本文整理了Java中io.reactivex.Flowable.compose()方法的一些代码示例,展示了Flowable.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.compose()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:compose

Flowable.compose介绍

[英]Transform a Publisher by applying a particular Transformer function to it.

This method operates on the Publisher itself whereas #lift operates on the Publisher's Subscribers or Subscribers.

If the operator you are creating is designed to act on the individual items emitted by a source Publisher, use #lift. If your operator is designed to transform the source Publisher as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Backpressure: The operator itself doesn't interfere with the backpressure behavior which only depends on what kind of Publisher the transformer returns. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对发布服务器应用特定的转换器函数来转换发布服务器。
此方法对发布服务器本身进行操作,而#lift对发布服务器的订阅者或订阅者进行操作。
如果要创建的操作符旨在对源发布者发出的单个项目执行操作,请使用#lift。如果您的操作符旨在将源发布服务器作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。背压:操作员本身不会干扰背压行为,而背压行为只取决于转换器返回的发布器类型。调度程序:默认情况下,compose不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void composeNull() {
  just1.compose(null);
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

@Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_compose_operator_example);

    /*
      Compose for reusable code.
     */
    Observable.just(1, 2, 3, 4, 5)
        .compose(schedulers.<Integer>applyObservableAsync())
        .subscribe(/* */);

    Flowable.just(1, 2, 3, 4, 5)
        .compose(schedulers.<Integer>applyFlowableAsysnc())
        .subscribe(/* */);

  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose() {
  Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
  Flowable<Movie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, Movie>() {
    @Override
    public Publisher<Movie> apply(Flowable<HorrorMovie> t) {
      return Flowable.just(new Movie());
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void flowableGenericsSignatureTest() {
  A<String, Integer> a = new A<String, Integer>() { };
  Flowable.just(a).compose(TransformerTest.<String>testFlowableTransformerCreator());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose4() {
  Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
  Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, HorrorMovie>() {
    @Override
    public Publisher<HorrorMovie> apply(Flowable<HorrorMovie> t1) {
      return t1.map(new Function<HorrorMovie, HorrorMovie>() {
        @Override
        public HorrorMovie apply(HorrorMovie v) {
          return v;
        }
      });
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose2() {
  Flowable<Movie> movie = Flowable.<Movie> just(new HorrorMovie());
  Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
    @Override
    public Publisher<HorrorMovie> apply(Flowable<Movie> t) {
      return Flowable.just(new HorrorMovie());
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose3() {
  Flowable<Movie> movie = Flowable.<Movie>just(new HorrorMovie());
  Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
    @Override
    public Publisher<HorrorMovie> apply(Flowable<Movie> t) {
      return Flowable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
        @Override
        public HorrorMovie apply(HorrorMovie v) {
          return v;
        }
      });
    }
  }
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testComposeWithDeltaLogic() {
  List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
  List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
  Flowable<List<Movie>> movies = Flowable.just(list1, list2);
  movies.compose(deltaTransformer);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void flowableTransformerThrows() {
  try {
    Flowable.just(1).compose(new FlowableTransformer<Integer, Integer>() {
      @Override
      public Publisher<Integer> apply(Flowable<Integer> v) {
        throw new TestException("Forced failure");
      }
    });
    fail("Should have thrown!");
  } catch (TestException ex) {
    assertEquals("Forced failure", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCompose() {
  TestSubscriber<String> ts = new TestSubscriber<String>();
  Flowable.just(1, 2, 3).compose(new FlowableTransformer<Integer, String>() {
    @Override
    public Publisher<String> apply(Flowable<Integer> t1) {
      return t1.map(new Function<Integer, String>() {
        @Override
        public String apply(Integer v) {
          return String.valueOf(v);
        }
      });
    }
  })
  .subscribe(ts);
  ts.assertTerminated();
  ts.assertNoErrors();
  ts.assertValues("1", "2", "3");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void pollThrowsDelayError() {
  Flowable.just(1)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .compose(TestHelper.<Integer>flowableStripBoundary())
  .concatMapDelayError(new Function<Integer, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Integer v)
        throws Exception {
      return Flowable.just(v);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void syncFusedMapCrash() {
  Flowable.just(1)
  .map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .compose(new StripBoundary<Object>(null))
  .parallel()
  .sequential()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void pollThrows() {
  Flowable.just(1)
  .map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .compose(TestHelper.flowableStripBoundary())
  .publish()
  .autoConnect()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void pollThrows() {
  Flowable.just(1)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .compose(TestHelper.<Integer>flowableStripBoundary())
  .concatMap(new Function<Integer, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Integer v)
        throws Exception {
      return Flowable.just(v);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void pollThrowsNoSubscribers() {
  ConnectableFlowable<Integer> cf = Flowable.just(1, 2)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer v) throws Exception {
      if (v == 2) {
        throw new TestException();
      }
      return v;
    }
  })
  .compose(TestHelper.<Integer>flowableStripBoundary())
  .publish();
  TestSubscriber<Integer> ts = cf.take(1)
  .test();
  cf.connect();
  ts.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedInnerCrash() {
  Flowable.just(1).hide()
  .switchMap(Functions.justFunction(Flowable.just(1)
      .map(new Function<Integer, Object>() {
        @Override
        public Object apply(Integer v) throws Exception {
          throw new TestException();
        }
      })
      .compose(TestHelper.<Object>flowableStripBoundary())
    )
  )
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorCombinedFused() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
      .compose(new FlowableTransformer<Integer, Integer>() {
        @Override
        public Publisher<Integer> apply(Flowable<Integer> v) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void asyncFusedMapCrash() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  up.onNext(1);
  up
  .map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .compose(new StripBoundary<Object>(null))
  .parallel()
  .sequential()
  .test()
  .assertFailure(TestException.class);
  assertFalse(up.hasSubscribers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorCombinedFusedConditional() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
      .compose(new FlowableTransformer<Integer, Integer>() {
        @Override
        public Publisher<Integer> apply(Flowable<Integer> v) {

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldNotRetryFromPredicateUsingFlowable() {
  //Given
  RetryConfig config = RetryConfig.custom()
      .retryOnException(t -> t instanceof IOException)
      .maxAttempts(3).build();
  Retry retry = Retry.of("testName", config);
  given(helloWorldService.returnHelloWorld())
      .willThrow(new WebServiceException("BAM!"));
  //When
  Flowable.fromCallable(helloWorldService::returnHelloWorld)
      .compose(RetryTransformer.of(retry))
      .test()
      .assertError(WebServiceException.class)
      .assertNotComplete()
      .assertSubscribed();
  //Then
  BDDMockito.then(helloWorldService).should(Mockito.times(1)).returnHelloWorld();
  Retry.Metrics metrics = retry.getMetrics();
  assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(1);
  assertThat(metrics.getNumberOfFailedCallsWithRetryAttempt()).isEqualTo(0);
}

相关文章

Flowable类方法