rx.Observable.ignoreElements()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中rx.Observable.ignoreElements()方法的一些代码示例,展示了Observable.ignoreElements()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.ignoreElements()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:ignoreElements

Observable.ignoreElements介绍

[英]Ignores all items emitted by the source Observable and only calls onCompleted or onError.

Scheduler: ignoreElements does not operate by default on a particular Scheduler.
[中]忽略源Observable发出的所有项,仅调用onCompleted或onError。
调度器:默认情况下,ignoreElements不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  3. return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
  4. @Override
  5. public void call(BatchReturnType batchReturnType) {
  6. // this is a blocking call in HystrixCollapser
  7. self.mapResponseToRequests(batchReturnType, requests);
  8. }
  9. }).ignoreElements().cast(Void.class);
  10. }

代码示例来源:origin: ReactiveX/RxNetty

  1. /**
  2. * Ignores all input on this connection.
  3. *
  4. * Unless, {@link ChannelOption#AUTO_READ} is set to {@code true}, the content will only be read from the
  5. * underneath channel, if there is a subscriber to the input. So, upon recieving this connection, either one should
  6. * call this method or eventually subscribe to the stream returned by {@link #getInput()}
  7. *
  8. * @return An {@link Observable}, subscription to which will discard the input. This {@code Observable} will
  9. * error/complete when the input errors/completes and unsubscription from here will unsubscribe from the content.
  10. */
  11. public Observable<Void> ignoreInput() {
  12. return getInput().map(new Func1<R, Void>() {
  13. @Override
  14. public Void call(R r) {
  15. ReferenceCountUtil.release(r);
  16. return null;
  17. }
  18. }).ignoreElements();
  19. }

代码示例来源:origin: PipelineAI/pipeline

  1. }).ignoreElements().cast(Void.class);

代码示例来源:origin: davidmoten/rxjava-jdbc

  1. /**
  2. * Returns the concatenation of two {@link Observable}s but the first
  3. * sequence will be emitted in its entirety and ignored before o2 starts
  4. * emitting.
  5. *
  6. * @param <T>
  7. * the generic type of the second observable
  8. * @param o1
  9. * the sequence to ignore
  10. * @param o2
  11. * the sequence to emit after o1 ignored
  12. * @return observable result of concatenating two observables, ignoring the
  13. * first
  14. */
  15. @SuppressWarnings("unchecked")
  16. public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1,
  17. Observable<T> o2) {
  18. return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
  19. }

代码示例来源:origin: davidmoten/rxjava-jdbc

  1. private static <T> Observable<Boolean> commitOrRollbackOnCompleteTransformerIfAtLeastOneValue(
  2. final boolean isCommit, final Database db, Observable<T> source) {
  3. CountingAction<T> counter = RxUtil.counter();
  4. Observable<Boolean> commit = counter
  5. // get count
  6. .count()
  7. // greater than zero or empty
  8. .filter(greaterThanZero())
  9. // commit if at least one value
  10. .compose(db.commitOrRollback_(isCommit));
  11. return Observable
  12. // concatenate
  13. .concat(source
  14. // count emissions
  15. .doOnNext(counter)
  16. // ignore emissions
  17. .ignoreElements()
  18. // cast the empty sequence to type Boolean
  19. .cast(Boolean.class),
  20. // concat with commit
  21. commit);
  22. }

代码示例来源:origin: henrymorgen/android-advanced-light

  1. private void ignoreElements() {
  2. Observable.just(1, 2, 3, 4).ignoreElements().subscribe(new Observer<Integer>() {
  3. @Override
  4. public void onCompleted() {
  5. Log.i("wangshu", "onCompleted");
  6. }
  7. @Override
  8. public void onError(Throwable e) {
  9. Log.i("wangshu", "onError");
  10. }
  11. @Override
  12. public void onNext(Integer integer) {
  13. Log.i("wangshu", "onNext");
  14. }
  15. });
  16. }

代码示例来源:origin: dswarm/dswarm

  1. @Override
  2. public Observable<T> call(final Observable<T> thisOne) {
  3. return thisOne.concatWith(other.ignoreElements().map(ignored -> emptyResultValue.get()));
  4. }
  5. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. @SuppressWarnings("unchecked")
  2. @Override
  3. public Observable<R> call(Observable<T> source) {
  4. return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
  5. }
  6. };

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @SuppressWarnings("unchecked")
  2. @Override
  3. public Observable<R> call(Observable<T> source) {
  4. return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
  5. }
  6. };

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.create(new Observable.OnSubscribe<Integer>() {
  4. @Override
  5. public void call(Subscriber<? super Integer> subscriber) {
  6. subscriber.onNext(1);
  7. subscriber.onNext(2);
  8. subscriber.onCompleted();
  9. }
  10. }).ignoreElements().subscribe(new Action1<Integer>() {
  11. @Override
  12. public void call(Integer integer) {
  13. log(integer);
  14. }
  15. }, new Action1<Throwable>() {
  16. @Override
  17. public void call(Throwable throwable) {
  18. log(throwable);
  19. }
  20. }, new Action0() {
  21. @Override
  22. public void call() {
  23. log("onComplete");
  24. }
  25. });
  26. }
  27. }

代码示例来源:origin: com.hotels.styx/styx-api

  1. /**
  2. * Sets an empty body on this response.
  3. *
  4. * @return {@code this}
  5. */
  6. public Builder removeBody() {
  7. return body(body.content()
  8. .doOnNext(ReferenceCountUtil::release)
  9. .ignoreElements()
  10. );
  11. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  3. return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
  4. @Override
  5. public void call(BatchReturnType batchReturnType) {
  6. // this is a blocking call in HystrixCollapser
  7. self.mapResponseToRequests(batchReturnType, requests);
  8. }
  9. }).ignoreElements().cast(Void.class);
  10. }

代码示例来源:origin: spotify/mobius

  1. @Override
  2. public Observable<R> call(final T value) {
  3. Completable completable = func.call(value);
  4. if (scheduler != null) {
  5. completable = completable.subscribeOn(scheduler);
  6. }
  7. return completable
  8. .toObservable()
  9. .ignoreElements()
  10. .map(
  11. new Func1<Object, R>() {
  12. @Override
  13. public R call(Object ignored) {
  14. // Since our upstream has ignoreElements on it, values will never ever be emitted, and
  15. // therefore this function call won't actually be executed. This map is really only present
  16. // in order to cast the stream to type R. Throwing an exception in this never-to-be-executed
  17. // function allows us say that the return type is T without actually needing to be able
  18. // to produce values of type T.
  19. throw new IllegalStateException(
  20. "Impossible state! ignoreElements() mustn't allow values to be emitted!");
  21. }
  22. });
  23. }
  24. });

代码示例来源:origin: akarnokd/akarnokd-misc

  1. static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
  2. return f ->
  3. f.publish(g ->
  4. g.take(1)
  5. .concatWith(
  6. g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
  7. .take(1)
  8. .ignoreElements()
  9. )
  10. .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
  11. )
  12. ;
  13. }
  14. }

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

  1. @Override
  2. public Observable<Void> register(final InstanceInfo instanceInfo) {
  3. logger.debug("Replicated registry entry: {}", instanceInfo);
  4. if (STATES.Opened != state.get()) {
  5. return Observable.error(state.get() == STATES.Closed ? CHANNEL_CLOSED_EXCEPTION : IDLE_STATE_EXCEPTION);
  6. }
  7. if (replicationLoop) {
  8. return Observable.error(REPLICATION_LOOP_EXCEPTION);
  9. }
  10. if (instanceInfoById.containsKey(instanceInfo.getId())) {
  11. logger.info("Overwriting existing registration entry for instance {}", instanceInfo.getId());
  12. }
  13. InstanceInfo tempNewInfo = new InstanceInfo.Builder()
  14. .withInstanceInfo(instanceInfo).withVersion(currentVersion++).build();
  15. return registry.register(tempNewInfo, replicationSource)
  16. .ignoreElements()
  17. .cast(Void.class)
  18. .doOnCompleted(new Action0() {
  19. @Override
  20. public void call() {
  21. instanceInfoById.put(instanceInfo.getId(), instanceInfo);
  22. }
  23. });
  24. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. public static void main(String[] args) throws Exception {
  2. PublishSubject<Integer> ps = PublishSubject.create();
  3. ps.publish(o ->
  4. o.mergeWith(
  5. o.switchMap(e ->
  6. Observable.just(1).delay(200, TimeUnit.MILLISECONDS)
  7. .ignoreElements()
  8. .doOnCompleted(() -> System.out.println("Timeout action: " + e))
  9. )
  10. )
  11. ).subscribe(System.out::println);
  12. ps.onNext(1);
  13. ps.onNext(2);
  14. Thread.sleep(100);
  15. ps.onNext(3);
  16. Thread.sleep(250);
  17. ps.onNext(4);
  18. Thread.sleep(250);
  19. }
  20. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. public static void main(String[] args) {
  2. Observable<String> names = Observable.just(
  3. "John", "Steve", "Ruth",
  4. "Sam", "Jane", "James");
  5. names.groupBy(s -> s.charAt(0))
  6. .flatMap(grp -> grp.publish(o -> o.first().concatWith(o.ignoreElements())))
  7. .subscribe(s -> System.out.println(s));
  8. }
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_191() throws Exception {
  3. List<Ticket> tickets = Arrays.asList(new Ticket(), new Ticket(), new Ticket());
  4. Observable
  5. .from(tickets)
  6. .flatMap(ticket ->
  7. rxSendEmail(ticket)
  8. .ignoreElements()
  9. .doOnError(e -> log.warn("Failed to send {}", ticket, e))
  10. .subscribeOn(Schedulers.io()));
  11. }

代码示例来源:origin: com.netflix.zuul/zuul-netty

  1. @Override
  2. public Observable<ZuulMessage> write(ZuulMessage msg, HttpServerResponse nativeResponse)
  3. {
  4. HttpResponseMessage zuulResp = (HttpResponseMessage) msg;
  5. // Set the response status code.
  6. nativeResponse.setStatus(HttpResponseStatus.valueOf(zuulResp.getStatus()));
  7. // Now set all of the response headers - note this is a multi-set in keeping with HTTP semantics
  8. for (Map.Entry<String, String> entry : zuulResp.getHeaders().entries()) {
  9. nativeResponse.getHeaders().add(entry.getKey(), entry.getValue());
  10. }
  11. // Write response body stream as received.
  12. Observable<ZuulMessage> chain;
  13. Observable<ByteBuf> bodyStream = zuulResp.getBodyStream();
  14. if (bodyStream != null) {
  15. chain = bodyStream
  16. .doOnNext(bb -> nativeResponse.writeBytesAndFlush(bb))
  17. .ignoreElements()
  18. .doOnCompleted(() -> nativeResponse.close())
  19. .map(bb -> msg);
  20. }
  21. else {
  22. chain = Observable.just(msg);
  23. }
  24. return chain;
  25. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_138() throws Exception {
  3. Single<Integer> ignored = Single
  4. .just(1)
  5. .toObservable()
  6. .ignoreElements() //PROBLEM
  7. .toSingle();
  8. }

相关文章

Observable类方法