rx.Observable.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(192)

本文整理了Java中rx.Observable.zipWith()方法的一些代码示例,展示了Observable.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zipWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:zipWith

Observable.zipWith介绍

[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source Observable and a specified Iterable sequence.

Note that the other Iterable is evaluated as items are observed from the source Observable; it is not pre-consumed. This allows you to zip infinite streams on either side. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源Observable和指定的Iterable序列。
请注意,另一个Iterable是从源可观察项中观察到的;它不是预先消费的。这允许你在两边压缩无限的流。调度器:zipWith默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  HttpClient client = vertx.createHttpClient();

  // Create two requests
  HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");

  // Turn the requests responses into Observable<JsonObject>
  Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
    map(buf -> new JsonObject(buf.toString("UTF-8")));
  Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
    map(buf -> new JsonObject(buf.toString("UTF-8")));

  // Combine the responses with the zip into a single response
  obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
    subscribe(json -> {
       System.out.println("Got combined result " + json);
      },
      err -> {
       err.printStackTrace();
      });

  req1.end();
  req2.end();
 }
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {

    /*
     * retry(n) can be used to immediately retry n times
     */
    Observable.create(s -> {
      System.out.println("1) subscribing");
      s.onError(new RuntimeException("1) always fails"));
    }).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t));

    System.out.println("");
    
    /*
     * retryWhen allows custom behavior on when and if a retry should be done
     */
    Observable.create(s -> {
      System.out.println("2) subscribing");
      s.onError(new RuntimeException("2) always fails"));
    }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
        System.out.println("2) delay retry by " + i + " second(s)");
        return Observable.timer(i, TimeUnit.SECONDS);
      }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
    }).toBlocking().forEach(System.out::println);

  }
}

代码示例来源:origin: jhusain/learnrxjava

throw new RuntimeException("failed!");
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (throwable, i) -> i)
      .flatMap(i -> {
        System.out.println("delay retry by " + i + " second(s)");

代码示例来源:origin: davidmoten/rxjava-extras

@Override
public Observable<Indexed<T>> call(Observable<T> source) {
  return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
    @Override
    public Indexed<T> call(T t, Long n) {
      return new Indexed<T>(t, n);
    }
  });
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
public Observable<Indexed<T>> call(Observable<T> source) {
  return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
    @Override
    public Indexed<T> call(T t, Long n) {
      return new Indexed<T>(t, n);
    }
  });
}

代码示例来源:origin: com.microsoft.azure/azure-documentdb-rx

@Override
  public Observable<Long> call(final Observable<? extends Throwable> failures) {
    return failures
        .zipWith(Observable.range(1, MAX_RETRIES_LIMIT),
            (err, attempt) ->
        attempt < MAX_RETRIES_LIMIT ?
            handleRetryAttempt(err, attempt, retryPolicy) :
              Observable.<Long>error(extractDocumentClientCause(err, attempt)) )
        .flatMap(x -> x);
  }
};

代码示例来源:origin: NielsMasdorp/Speculum-Android

public static Func1<Observable<? extends Throwable>, Observable<?>> exponentialBackoff(
      int maxRetryCount, long delay, TimeUnit unit) {
    return errors -> errors
        .zipWith(Observable.range(1, maxRetryCount), (error, retryCount) -> retryCount)
        .flatMap(retryCount -> Observable.timer((long) Math.pow(delay, retryCount), unit));
  }
}

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

public static <T> Observable.Transformer<T, T> retryDelay(Action1<Attempt> onAttempt, int maxRetry) {
  return o -> o.retryWhen(attempts -> attempts
      .zipWith(Observable.range(1, maxRetry), (err, i) -> new Attempt(i, err))
      .flatMap((Attempt x) -> {
        if (x.idx > maxRetry) return error(x.err);
        onAttempt.call(x);
        return timer(min(x.idx * x.idx, MAX_RETRY_TIME), SECONDS);
      }));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

static <T> Observable<T> odd(Observable<T> upstream) {
  Observable<Boolean> trueFalse = just(true, false).repeat();
  return upstream
      .zipWith(trueFalse, Pair::of)
      .filter(Pair::getRight)
      .map(Pair::getLeft);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

private <T> Observable.Transformer<T, T> odd() {
  Observable<Boolean> trueFalse = just(true, false).repeat();
  return upstream -> upstream
      .zipWith(trueFalse, Pair::of)
      .filter(Pair::getRight)
      .map(Pair::getLeft);
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
  public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
    return metricObservable.flatMap(metric -> {
      long now = System.currentTimeMillis();
      MetricId<T> metricId = metric.getMetricId();
      return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
          .zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
              -> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
          .switchIfEmpty(Observable.just(metric));
    });
  }
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
  public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
    return metricObservable.flatMap(metric -> {
      long now = System.currentTimeMillis();
      MetricId<T> metricId = metric.getMetricId();
      return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
          .zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
              -> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
          .switchIfEmpty(Observable.just(metric));
    });
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_85() throws Exception {
  Observable<Flight> flight =
      rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
  Observable<Passenger> passenger =
      rxFindPassenger(42).subscribeOn(Schedulers.io());
  Observable<Ticket> ticket = flight
      .zipWith(passenger, this::rxBookTicket)
      .flatMap(obs -> obs);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_76() throws Exception {
  Observable<Flight> flight =
      rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
  Observable<Passenger> passenger =
      rxFindPassenger(42).subscribeOn(Schedulers.io());
  Observable<Ticket> ticket = flight
      .zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
      .flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_589() throws Exception {
  Observable<Boolean> trueFalse = Observable.just(true, false).repeat();
  Observable<Integer> upstream = Observable.range(30, 8);
  Observable<Integer> downstream = upstream
      .zipWith(trueFalse, Pair::of)
      .filter(Pair::getRight)
      .map(Pair::getLeft);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_271() throws Exception {
  Observable<Instant> timestamps = Observable
      .fromCallable(() -> dbQuery())
      .doOnSubscribe(() -> log.info("subscribe()"))
      .doOnRequest(c -> log.info("Requested {}", c))
      .doOnNext(instant -> log.info("Got: {}", instant));
  timestamps
      .zipWith(timestamps.skip(1), Duration::between)
      .map(Object::toString)
      .subscribe(log::info);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_49() throws Exception {
  Observable<Flight> flight = rxLookupFlight("LOT 783");
  Observable<Passenger> passenger = rxFindPassenger(42);
  Observable<Ticket> ticket =
      flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
  ticket.subscribe(this::sendEmail);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_74() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
              attempt < ATTEMPTS ?
                  Observable.timer(1, SECONDS) :
                  Observable.error(err))
          .flatMap(x -> x)
      );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_89() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS),
              this::handleRetryAttempt)
          .flatMap(x -> x)
      );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_286() throws Exception {
  final WeatherStation station = new BasicWeatherStation();
  Observable<Temperature> temperatureMeasurements = station.temperature();
  Observable<Wind> windMeasurements = station.wind();
  temperatureMeasurements
      .zipWith(windMeasurements,
          (temperature, wind) -> new Weather(temperature, wind));
}

相关文章

Observable类方法