本文整理了Java中rx.Observable.lift()
方法的一些代码示例,展示了Observable.lift()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.lift()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:lift
[英]Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass the values of the current Observable through the Operator function.
In other words, this allows chaining Observers together on an Observable for acting on the values within the Observable.
observable.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
If the operator you are creating is designed to act on the individual items emitted by a source Observable, use lift. If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) use #compose. Scheduler: lift does not operate by default on a particular Scheduler.
[中]将一个函数提升到当前可观察值,并返回一个新的可观察值,订阅后,该值将通过运算符函数传递当前可观察值。
换句话说,这允许将观察者链接到一个可观察对象上,以便对可观察对象内的值进行操作。
看得见。地图(…)。过滤器(…)。以(5)为例。提升(新操作器()。升降机(新操作器(…))。订阅
如果您正在创建的操作符被设计为作用于可观察到的源发出的单个项目,请使用lift。如果您的操作符被设计为将源可观测作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用#compose。调度程序:默认情况下,lift不会在特定调度程序上运行。
代码示例来源:origin: PipelineAI/pipeline
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
代码示例来源:origin: ReactiveX/RxNetty
private <X> Observable<Void> _write(final Observable<X> msgs, Func1<X, Boolean> flushSelector) {
return _write(msgs.lift(new FlushSelectorOperator<>(flushSelector, this)));
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");
req.toObservable().
flatMap(HttpClientResponse::toObservable).
// Unmarshall the response to the Data object via Jackon
lift(io.vertx.rxjava.core.RxHelper.unmarshaller(Data.class)).
subscribe(
data -> {
System.out.println("Got response " + data.message);
});
// End request
req.end();
}
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super Connection<R, W>> subscriber) {
if (isShutdown) {
subscriber.onError(new IllegalStateException("Connection provider is shutdown."));
}
idleConnectionsHolder.pollThisEventLoopConnections()
.concatWith(connectIfAllowed())
.filter(new Func1<PooledConnection<R, W>, Boolean>() {
@Override
public Boolean call(PooledConnection<R, W> c) {
boolean isUsable = c.isUsable();
if (!isUsable) {
discardNow(c);
}
return isUsable;
}
})
.take(1)
.lift(new ReuseSubscriberLinker())
.lift(new ConnectMetricsOperator())
.unsafeSubscribe(subscriber);
}
});
代码示例来源:origin: ReactiveX/RxNetty
return Observable.<Channel>just(embeddedChannel);
}).lift(new Operator<Channel, Channel>() {
@Override
public Subscriber<? super Channel> call(final Subscriber<? super Channel> subscriber) {
代码示例来源:origin: PipelineAI/pipeline
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
代码示例来源:origin: PipelineAI/pipeline
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
代码示例来源:origin: davidmoten/rtree
/**
* Returns the nearest k entries (k=maxCount) to the given rectangle where the
* entries are strictly less than a given maximum distance from the rectangle.
*
* @param r
* rectangle
* @param maxDistance
* max distance of returned entries from the rectangle
* @param maxCount
* max number of entries to return
* @return nearest entries to maxCount, in ascending order of distance
*/
public Observable<Entry<T, S>> nearest(final Rectangle r, final double maxDistance,
int maxCount) {
return search(r, maxDistance).lift(new OperatorBoundedPriorityQueue<Entry<T, S>>(maxCount,
Comparators.<T, S>ascendingDistance(r)));
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Test
public void fo2ToFo1Crash() {
FlowableOperator<Integer, Integer> transformer = new FlowableOperator<Integer, Integer>() {
@Override
public org.reactivestreams.Subscriber<? super Integer> apply(final org.reactivestreams.Subscriber<? super Integer> o) {
throw new IllegalArgumentException();
}
};
Observable.just(1)
.lift(toV1Operator(transformer))
.test()
.assertFailure(IllegalArgumentException.class);
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Test
public void fo2ToFo1() {
FlowableOperator<Integer, Integer> transformer = new FlowableOperator<Integer, Integer>() {
@Override
public org.reactivestreams.Subscriber<? super Integer> apply(final org.reactivestreams.Subscriber<? super Integer> o) {
return new org.reactivestreams.Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
o.onSubscribe(s);
}
@Override
public void onNext(Integer t) {
o.onNext(t + 1);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
};
}
};
Observable.just(1)
.lift(toV1Operator(transformer))
.test()
.assertResult(2);
}
代码示例来源:origin: io.reactivex/rxnetty
@SuppressWarnings("unchecked")
@Override
public Observable<ObservableConnection<O, I>> connect() {
return super.connect().lift(HANDSHAKE_OPERATOR);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<T> call(Observable<T> o) {
return o.lift(OperatorDoOnNth.create(action, n));
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<T> call(Observable<T> o) {
return o.lift(new OperatorBufferToFile<T>(serializer, scheduler, options));
}
};
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<T> call(Observable<T> source) {
return source.lift(new OperatorSampleFirst<T>(duration, unit, scheduler));
}
};
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
return sendQuery(sql, params)
.lift(toResultSet(dataConverter));
}
代码示例来源:origin: alaisi/postgres-async-driver
@Override
public Observable<Row> queryRows(String sql, Object... params) {
return sendQuery(sql, params)
.lift(toRow(dataConverter));
}
代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.pipeline.impl
@Override
public JsonPipeline handleException(JsonPipelineExceptionHandler handler) {
Observable<JsonPipelineOutput> exceptionHandlingObservable = observable.lift(new HandleExceptionOperator(requests, handler));
return cloneWith(exceptionHandlingObservable, null, "HANDLE_EXCEPTION");
}
代码示例来源:origin: com.netflix.eureka2/eureka-server
private void subscribeToInterest(Interest<InstanceInfo> newInterest) {
BreakerSwitchOperator breaker = new BreakerSwitchOperator();
upgrades.onNext(eurekaRegistry.forInterest(newInterest).lift(breaker));
subscriptionBreakers.put(newInterest, breaker);
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<T> call() {
final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
op.requestMore(n);
}
});
}
});
代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.io.http
private Observable<CaravanHttpResponse> createServletClientResponse(Context ctx, Observable<CaravanHttpResponse> ribbonResponse) {
Observable<CaravanHttpResponse> localhostResponse = servletClient.execute(ctx.request)
.lift(new ErrorDisassembleroperator(ctx, ribbonResponse));
return addHystrixAndErrorMapperAndMetrics(ctx, localhostResponse);
}
内容来源于网络,如有侵权,请联系作者删除!