本文整理了Java中rx.Observable.zipWith()
方法的一些代码示例,展示了Observable.zipWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zipWith()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:zipWith
[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source Observable and a specified Iterable sequence.
Note that the other Iterable is evaluated as items are observed from the source Observable; it is not pre-consumed. This allows you to zip infinite streams on either side. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源Observable和指定的Iterable序列。
请注意,另一个Iterable是从源可观察项中观察到的;它不是预先消费的。这允许你在两边压缩无限的流。调度器:zipWith默认情况下不会在特定的调度器上运行。
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
// Create two requests
HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");
// Turn the requests responses into Observable<JsonObject>
Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
map(buf -> new JsonObject(buf.toString("UTF-8")));
Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
map(buf -> new JsonObject(buf.toString("UTF-8")));
// Combine the responses with the zip into a single response
obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
subscribe(json -> {
System.out.println("Got combined result " + json);
},
err -> {
err.printStackTrace();
});
req1.end();
req2.end();
}
}
代码示例来源:origin: jhusain/learnrxjava
public static void main(String... args) {
/*
* retry(n) can be used to immediately retry n times
*/
Observable.create(s -> {
System.out.println("1) subscribing");
s.onError(new RuntimeException("1) always fails"));
}).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t));
System.out.println("");
/*
* retryWhen allows custom behavior on when and if a retry should be done
*/
Observable.create(s -> {
System.out.println("2) subscribing");
s.onError(new RuntimeException("2) always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("2) delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
}).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
}).toBlocking().forEach(System.out::println);
}
}
代码示例来源:origin: jhusain/learnrxjava
throw new RuntimeException("failed!");
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (throwable, i) -> i)
.flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Indexed<T>> call(Observable<T> source) {
return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
@Override
public Indexed<T> call(T t, Long n) {
return new Indexed<T>(t, n);
}
});
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Indexed<T>> call(Observable<T> source) {
return source.zipWith(NaturalNumbers.instance(), new Func2<T, Long, Indexed<T>>() {
@Override
public Indexed<T> call(T t, Long n) {
return new Indexed<T>(t, n);
}
});
}
代码示例来源:origin: com.microsoft.azure/azure-documentdb-rx
@Override
public Observable<Long> call(final Observable<? extends Throwable> failures) {
return failures
.zipWith(Observable.range(1, MAX_RETRIES_LIMIT),
(err, attempt) ->
attempt < MAX_RETRIES_LIMIT ?
handleRetryAttempt(err, attempt, retryPolicy) :
Observable.<Long>error(extractDocumentClientCause(err, attempt)) )
.flatMap(x -> x);
}
};
代码示例来源:origin: NielsMasdorp/Speculum-Android
public static Func1<Observable<? extends Throwable>, Observable<?>> exponentialBackoff(
int maxRetryCount, long delay, TimeUnit unit) {
return errors -> errors
.zipWith(Observable.range(1, maxRetryCount), (error, retryCount) -> retryCount)
.flatMap(retryCount -> Observable.timer((long) Math.pow(delay, retryCount), unit));
}
}
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
public static <T> Observable.Transformer<T, T> retryDelay(Action1<Attempt> onAttempt, int maxRetry) {
return o -> o.retryWhen(attempts -> attempts
.zipWith(Observable.range(1, maxRetry), (err, i) -> new Attempt(i, err))
.flatMap((Attempt x) -> {
if (x.idx > maxRetry) return error(x.err);
onAttempt.call(x);
return timer(min(x.idx * x.idx, MAX_RETRY_TIME), SECONDS);
}));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
static <T> Observable<T> odd(Observable<T> upstream) {
Observable<Boolean> trueFalse = just(true, false).repeat();
return upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
private <T> Observable.Transformer<T, T> odd() {
Observable<Boolean> trueFalse = just(true, false).repeat();
return upstream -> upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft);
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
return metricObservable.flatMap(metric -> {
long now = System.currentTimeMillis();
MetricId<T> metricId = metric.getMetricId();
return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
.zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
-> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
.switchIfEmpty(Observable.just(metric));
});
}
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<Metric<T>> call(Observable<Metric<T>> metricObservable) {
return metricObservable.flatMap(metric -> {
long now = System.currentTimeMillis();
MetricId<T> metricId = metric.getMetricId();
return metricsService.findDataPoints(metricId, 0, now, 1, Order.ASC)
.zipWith(metricsService.findDataPoints(metricId, 0, now, 1, Order.DESC), (p1, p2)
-> new Metric<>(metric, p1.getTimestamp(), p2.getTimestamp()))
.switchIfEmpty(Observable.just(metric));
});
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_85() throws Exception {
Observable<Flight> flight =
rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
rxFindPassenger(42).subscribeOn(Schedulers.io());
Observable<Ticket> ticket = flight
.zipWith(passenger, this::rxBookTicket)
.flatMap(obs -> obs);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_76() throws Exception {
Observable<Flight> flight =
rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
rxFindPassenger(42).subscribeOn(Schedulers.io());
Observable<Ticket> ticket = flight
.zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
.flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_589() throws Exception {
Observable<Boolean> trueFalse = Observable.just(true, false).repeat();
Observable<Integer> upstream = Observable.range(30, 8);
Observable<Integer> downstream = upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_271() throws Exception {
Observable<Instant> timestamps = Observable
.fromCallable(() -> dbQuery())
.doOnSubscribe(() -> log.info("subscribe()"))
.doOnRequest(c -> log.info("Requested {}", c))
.doOnNext(instant -> log.info("Got: {}", instant));
timestamps
.zipWith(timestamps.skip(1), Duration::between)
.map(Object::toString)
.subscribe(log::info);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_49() throws Exception {
Observable<Flight> flight = rxLookupFlight("LOT 783");
Observable<Passenger> passenger = rxFindPassenger(42);
Observable<Ticket> ticket =
flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
ticket.subscribe(this::sendEmail);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_74() throws Exception {
risky()
.timeout(1, SECONDS)
.retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
attempt < ATTEMPTS ?
Observable.timer(1, SECONDS) :
Observable.error(err))
.flatMap(x -> x)
);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_89() throws Exception {
risky()
.timeout(1, SECONDS)
.retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS),
this::handleRetryAttempt)
.flatMap(x -> x)
);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_286() throws Exception {
final WeatherStation station = new BasicWeatherStation();
Observable<Temperature> temperatureMeasurements = station.temperature();
Observable<Wind> windMeasurements = station.wind();
temperatureMeasurements
.zipWith(windMeasurements,
(temperature, wind) -> new Weather(temperature, wind));
}
内容来源于网络,如有侵权,请联系作者删除!