rx.Observable.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(227)

本文整理了Java中rx.Observable.zipWith()方法的一些代码示例,展示了Observable.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zipWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:zipWith

Observable.zipWith介绍

[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source Observable and a specified Iterable sequence.

Note that the other Iterable is evaluated as items are observed from the source Observable; it is not pre-consumed. This allows you to zip infinite streams on either side. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源Observable和指定的Iterable序列。
请注意,另一个Iterable是从源可观察项中观察到的;它不是预先消费的。这允许你在两边压缩无限的流。调度器:zipWith默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. HttpClient client = vertx.createHttpClient();
  4. // Create two requests
  5. HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  6. HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  7. // Turn the requests responses into Observable<JsonObject>
  8. Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
  9. map(buf -> new JsonObject(buf.toString("UTF-8")));
  10. Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
  11. map(buf -> new JsonObject(buf.toString("UTF-8")));
  12. // Combine the responses with the zip into a single response
  13. obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
  14. subscribe(json -> {
  15. System.out.println("Got combined result " + json);
  16. },
  17. err -> {
  18. err.printStackTrace();
  19. });
  20. req1.end();
  21. req2.end();
  22. }
  23. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String... args) {
  2. /*
  3. * retry(n) can be used to immediately retry n times
  4. */
  5. Observable.create(s -> {
  6. System.out.println("1) subscribing");
  7. s.onError(new RuntimeException("1) always fails"));
  8. }).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t));
  9. System.out.println("");
  10. /*
  11. * retryWhen allows custom behavior on when and if a retry should be done
  12. */
  13. Observable.create(s -> {
  14. System.out.println("2) subscribing");
  15. s.onError(new RuntimeException("2) always fails"));
  16. }).retryWhen(attempts -> {
  17. return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
  18. System.out.println("2) delay retry by " + i + " second(s)");
  19. return Observable.timer(i, TimeUnit.SECONDS);
  20. }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
  21. }).toBlocking().forEach(System.out::println);
  22. }
  23. }

代码示例来源:origin: jhusain/learnrxjava

  1. throw new RuntimeException("failed!");
  2. }).retryWhen(attempts -> {
  3. return attempts.zipWith(Observable.range(1, 3), (throwable, i) -> i)
  4. .flatMap(i -> {
  5. System.out.println("delay retry by " + i + " second(s)");

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Indexed<T>> call(Observable<T> source) {
  3. return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
  4. @Override
  5. public Indexed<T> call(T t, Long n) {
  6. return new Indexed<T>(t, n);
  7. }
  8. });
  9. }

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Indexed<T>> call(Observable<T> source) {
  3. return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
  4. @Override
  5. public Indexed<T> call(T t, Long n) {
  6. return new Indexed<T>(t, n);
  7. }
  8. });
  9. }

代码示例来源:origin: com.microsoft.azure/azure-documentdb-rx

  1. @Override
  2. public Observable<Long> call(final Observable<? extends Throwable> failures) {
  3. return failures
  4. .zipWith(Observable.range(1, MAX_RETRIES_LIMIT),
  5. (err, attempt) ->
  6. attempt < MAX_RETRIES_LIMIT ?
  7. handleRetryAttempt(err, attempt, retryPolicy) :
  8. Observable.<Long>error(extractDocumentClientCause(err, attempt)) )
  9. .flatMap(x -> x);
  10. }
  11. };

代码示例来源:origin: NielsMasdorp/Speculum-Android

  1. public static Func1<Observable<? extends Throwable>, Observable<?>> exponentialBackoff(
  2. int maxRetryCount, long delay, TimeUnit unit) {
  3. return errors -> errors
  4. .zipWith(Observable.range(1, maxRetryCount), (error, retryCount) -> retryCount)
  5. .flatMap(retryCount -> Observable.timer((long) Math.pow(delay, retryCount), unit));
  6. }
  7. }

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

  1. public static <T> Observable.Transformer<T, T> retryDelay(Action1<Attempt> onAttempt, int maxRetry) {
  2. return o -> o.retryWhen(attempts -> attempts
  3. .zipWith(Observable.range(1, maxRetry), (err, i) -> new Attempt(i, err))
  4. .flatMap((Attempt x) -> {
  5. if (x.idx > maxRetry) return error(x.err);
  6. onAttempt.call(x);
  7. return timer(min(x.idx * x.idx, MAX_RETRY_TIME), SECONDS);
  8. }));
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. static <T> Observable<T> odd(Observable<T> upstream) {
  2. Observable<Boolean> trueFalse = just(true, false).repeat();
  3. return upstream
  4. .zipWith(trueFalse, Pair::of)
  5. .filter(Pair::getRight)
  6. .map(Pair::getLeft);
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. private <T> Observable.Transformer<T, T> odd() {
  2. Observable<Boolean> trueFalse = just(true, false).repeat();
  3. return upstream -> upstream
  4. .zipWith(trueFalse, Pair::of)
  5. .filter(Pair::getRight)
  6. .map(Pair::getLeft);
  7. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
  3. return metricObservable.flatMap(metric -> {
  4. long now = System.currentTimeMillis();
  5. MetricId<T> metricId = metric.getMetricId();
  6. return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
  7. .zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
  8. -> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
  9. .switchIfEmpty(Observable.just(metric));
  10. });
  11. }
  12. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
  3. return metricObservable.flatMap(metric -> {
  4. long now = System.currentTimeMillis();
  5. MetricId<T> metricId = metric.getMetricId();
  6. return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
  7. .zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
  8. -> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
  9. .switchIfEmpty(Observable.just(metric));
  10. });
  11. }
  12. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_85() throws Exception {
  3. Observable<Flight> flight =
  4. rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
  5. Observable<Passenger> passenger =
  6. rxFindPassenger(42).subscribeOn(Schedulers.io());
  7. Observable<Ticket> ticket = flight
  8. .zipWith(passenger, this::rxBookTicket)
  9. .flatMap(obs -> obs);
  10. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_76() throws Exception {
  3. Observable<Flight> flight =
  4. rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
  5. Observable<Passenger> passenger =
  6. rxFindPassenger(42).subscribeOn(Schedulers.io());
  7. Observable<Ticket> ticket = flight
  8. .zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
  9. .flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));
  10. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_589() throws Exception {
  3. Observable<Boolean> trueFalse = Observable.just(true, false).repeat();
  4. Observable<Integer> upstream = Observable.range(30, 8);
  5. Observable<Integer> downstream = upstream
  6. .zipWith(trueFalse, Pair::of)
  7. .filter(Pair::getRight)
  8. .map(Pair::getLeft);
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_271() throws Exception {
  3. Observable<Instant> timestamps = Observable
  4. .fromCallable(() -> dbQuery())
  5. .doOnSubscribe(() -> log.info("subscribe()"))
  6. .doOnRequest(c -> log.info("Requested {}", c))
  7. .doOnNext(instant -> log.info("Got: {}", instant));
  8. timestamps
  9. .zipWith(timestamps.skip(1), Duration::between)
  10. .map(Object::toString)
  11. .subscribe(log::info);
  12. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_49() throws Exception {
  3. Observable<Flight> flight = rxLookupFlight("LOT 783");
  4. Observable<Passenger> passenger = rxFindPassenger(42);
  5. Observable<Ticket> ticket =
  6. flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
  7. ticket.subscribe(this::sendEmail);
  8. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_74() throws Exception {
  3. risky()
  4. .timeout(1, SECONDS)
  5. .retryWhen(failures -> failures
  6. .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
  7. attempt < ATTEMPTS ?
  8. Observable.timer(1, SECONDS) :
  9. Observable.error(err))
  10. .flatMap(x -> x)
  11. );
  12. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_89() throws Exception {
  3. risky()
  4. .timeout(1, SECONDS)
  5. .retryWhen(failures -> failures
  6. .zipWith(Observable.range(1, ATTEMPTS),
  7. this::handleRetryAttempt)
  8. .flatMap(x -> x)
  9. );
  10. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_286() throws Exception {
  3. final WeatherStation station = new BasicWeatherStation();
  4. Observable<Temperature> temperatureMeasurements = station.temperature();
  5. Observable<Wind> windMeasurements = station.wind();
  6. temperatureMeasurements
  7. .zipWith(windMeasurements,
  8. (temperature, wind) -> new Weather(temperature, wind));
  9. }

相关文章

Observable类方法