本文整理了Java中rx.Observable.ignoreElements()
方法的一些代码示例,展示了Observable.ignoreElements()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.ignoreElements()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:ignoreElements
[英]Ignores all items emitted by the source Observable and only calls onCompleted or onError.
Scheduler: ignoreElements does not operate by default on a particular Scheduler.
[中]忽略源Observable发出的所有项,仅调用onCompleted或onError。
调度器:默认情况下,ignoreElements不会在特定的调度器上运行。
代码示例来源:origin: PipelineAI/pipeline
@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(batchReturnType, requests);
}
}).ignoreElements().cast(Void.class);
}
代码示例来源:origin: ReactiveX/RxNetty
/**
* Ignores all input on this connection.
*
* Unless, {@link ChannelOption#AUTO_READ} is set to {@code true}, the content will only be read from the
* underneath channel, if there is a subscriber to the input. So, upon recieving this connection, either one should
* call this method or eventually subscribe to the stream returned by {@link #getInput()}
*
* @return An {@link Observable}, subscription to which will discard the input. This {@code Observable} will
* error/complete when the input errors/completes and unsubscription from here will unsubscribe from the content.
*/
public Observable<Void> ignoreInput() {
return getInput().map(new Func1<R, Void>() {
@Override
public Void call(R r) {
ReferenceCountUtil.release(r);
return null;
}
}).ignoreElements();
}
代码示例来源:origin: PipelineAI/pipeline
}).ignoreElements().cast(Void.class);
代码示例来源:origin: davidmoten/rxjava-jdbc
/**
* Returns the concatenation of two {@link Observable}s but the first
* sequence will be emitted in its entirety and ignored before o2 starts
* emitting.
*
* @param <T>
* the generic type of the second observable
* @param o1
* the sequence to ignore
* @param o2
* the sequence to emit after o1 ignored
* @return observable result of concatenating two observables, ignoring the
* first
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1,
Observable<T> o2) {
return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
}
代码示例来源:origin: davidmoten/rxjava-jdbc
private static <T> Observable<Boolean> commitOrRollbackOnCompleteTransformerIfAtLeastOneValue(
final boolean isCommit, final Database db, Observable<T> source) {
CountingAction<T> counter = RxUtil.counter();
Observable<Boolean> commit = counter
// get count
.count()
// greater than zero or empty
.filter(greaterThanZero())
// commit if at least one value
.compose(db.commitOrRollback_(isCommit));
return Observable
// concatenate
.concat(source
// count emissions
.doOnNext(counter)
// ignore emissions
.ignoreElements()
// cast the empty sequence to type Boolean
.cast(Boolean.class),
// concat with commit
commit);
}
代码示例来源:origin: henrymorgen/android-advanced-light
private void ignoreElements() {
Observable.just(1, 2, 3, 4).ignoreElements().subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.i("wangshu", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("wangshu", "onError");
}
@Override
public void onNext(Integer integer) {
Log.i("wangshu", "onNext");
}
});
}
代码示例来源:origin: dswarm/dswarm
@Override
public Observable<T> call(final Observable<T> thisOne) {
return thisOne.concatWith(other.ignoreElements().map(ignored -> emptyResultValue.get()));
}
}
代码示例来源:origin: davidmoten/rxjava-extras
@SuppressWarnings("unchecked")
@Override
public Observable<R> call(Observable<T> source) {
return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@SuppressWarnings("unchecked")
@Override
public Observable<R> call(Observable<T> source) {
return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
}
};
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
}).ignoreElements().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable);
}
}, new Action0() {
@Override
public void call() {
log("onComplete");
}
});
}
}
代码示例来源:origin: com.hotels.styx/styx-api
/**
* Sets an empty body on this response.
*
* @return {@code this}
*/
public Builder removeBody() {
return body(body.content()
.doOnNext(ReferenceCountUtil::release)
.ignoreElements()
);
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(batchReturnType, requests);
}
}).ignoreElements().cast(Void.class);
}
代码示例来源:origin: spotify/mobius
@Override
public Observable<R> call(final T value) {
Completable completable = func.call(value);
if (scheduler != null) {
completable = completable.subscribeOn(scheduler);
}
return completable
.toObservable()
.ignoreElements()
.map(
new Func1<Object, R>() {
@Override
public R call(Object ignored) {
// Since our upstream has ignoreElements on it, values will never ever be emitted, and
// therefore this function call won't actually be executed. This map is really only present
// in order to cast the stream to type R. Throwing an exception in this never-to-be-executed
// function allows us say that the return type is T without actually needing to be able
// to produce values of type T.
throw new IllegalStateException(
"Impossible state! ignoreElements() mustn't allow values to be emitted!");
}
});
}
});
代码示例来源:origin: akarnokd/akarnokd-misc
static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
return f ->
f.publish(g ->
g.take(1)
.concatWith(
g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
.take(1)
.ignoreElements()
)
.repeatWhen(h -> h.takeUntil(g.ignoreElements()))
)
;
}
}
代码示例来源:origin: com.netflix.eureka2/eureka-write-server
@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
logger.debug("Replicated registry entry: {}", instanceInfo);
if (STATES.Opened != state.get()) {
return Observable.error(state.get() == STATES.Closed ? CHANNEL_CLOSED_EXCEPTION : IDLE_STATE_EXCEPTION);
}
if (replicationLoop) {
return Observable.error(REPLICATION_LOOP_EXCEPTION);
}
if (instanceInfoById.containsKey(instanceInfo.getId())) {
logger.info("Overwriting existing registration entry for instance {}", instanceInfo.getId());
}
InstanceInfo tempNewInfo = new InstanceInfo.Builder()
.withInstanceInfo(instanceInfo).withVersion(currentVersion++).build();
return registry.register(tempNewInfo, replicationSource)
.ignoreElements()
.cast(Void.class)
.doOnCompleted(new Action0() {
@Override
public void call() {
instanceInfoById.put(instanceInfo.getId(), instanceInfo);
}
});
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) throws Exception {
PublishSubject<Integer> ps = PublishSubject.create();
ps.publish(o ->
o.mergeWith(
o.switchMap(e ->
Observable.just(1).delay(200, TimeUnit.MILLISECONDS)
.ignoreElements()
.doOnCompleted(() -> System.out.println("Timeout action: " + e))
)
)
).subscribe(System.out::println);
ps.onNext(1);
ps.onNext(2);
Thread.sleep(100);
ps.onNext(3);
Thread.sleep(250);
ps.onNext(4);
Thread.sleep(250);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
Observable<String> names = Observable.just(
"John", "Steve", "Ruth",
"Sam", "Jane", "James");
names.groupBy(s -> s.charAt(0))
.flatMap(grp -> grp.publish(o -> o.first().concatWith(o.ignoreElements())))
.subscribe(s -> System.out.println(s));
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_191() throws Exception {
List<Ticket> tickets = Arrays.asList(new Ticket(), new Ticket(), new Ticket());
Observable
.from(tickets)
.flatMap(ticket ->
rxSendEmail(ticket)
.ignoreElements()
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.subscribeOn(Schedulers.io()));
}
代码示例来源:origin: com.netflix.zuul/zuul-netty
@Override
public Observable<ZuulMessage> write(ZuulMessage msg, HttpServerResponse nativeResponse)
{
HttpResponseMessage zuulResp = (HttpResponseMessage) msg;
// Set the response status code.
nativeResponse.setStatus(HttpResponseStatus.valueOf(zuulResp.getStatus()));
// Now set all of the response headers - note this is a multi-set in keeping with HTTP semantics
for (Map.Entry<String, String> entry : zuulResp.getHeaders().entries()) {
nativeResponse.getHeaders().add(entry.getKey(), entry.getValue());
}
// Write response body stream as received.
Observable<ZuulMessage> chain;
Observable<ByteBuf> bodyStream = zuulResp.getBodyStream();
if (bodyStream != null) {
chain = bodyStream
.doOnNext(bb -> nativeResponse.writeBytesAndFlush(bb))
.ignoreElements()
.doOnCompleted(() -> nativeResponse.close())
.map(bb -> msg);
}
else {
chain = Observable.just(msg);
}
return chain;
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_138() throws Exception {
Single<Integer> ignored = Single
.just(1)
.toObservable()
.ignoreElements() //PROBLEM
.toSingle();
}
内容来源于网络,如有侵权,请联系作者删除!