rx.Observable.ignoreElements()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中rx.Observable.ignoreElements()方法的一些代码示例,展示了Observable.ignoreElements()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.ignoreElements()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:ignoreElements

Observable.ignoreElements介绍

[英]Ignores all items emitted by the source Observable and only calls onCompleted or onError.

Scheduler: ignoreElements does not operate by default on a particular Scheduler.
[中]忽略源Observable发出的所有项,仅调用onCompleted或onError。
调度器:默认情况下,ignoreElements不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
    @Override
    public void call(BatchReturnType batchReturnType) {
      // this is a blocking call in HystrixCollapser
      self.mapResponseToRequests(batchReturnType, requests);
    }
  }).ignoreElements().cast(Void.class);
}

代码示例来源:origin: ReactiveX/RxNetty

/**
 * Ignores all input on this connection.
 *
 * Unless, {@link ChannelOption#AUTO_READ} is set to {@code true}, the content will only be read from the
 * underneath channel, if there is a subscriber to the input. So, upon recieving this connection, either one should
 * call this method or eventually subscribe to the stream returned by {@link #getInput()}
 *
 * @return An {@link Observable}, subscription to which will discard the input. This {@code Observable} will
 * error/complete when the input errors/completes and unsubscription from here will unsubscribe from the content.
 */
public Observable<Void> ignoreInput() {
  return getInput().map(new Func1<R, Void>() {
    @Override
    public Void call(R r) {
      ReferenceCountUtil.release(r);
      return null;
    }
  }).ignoreElements();
}

代码示例来源:origin: PipelineAI/pipeline

}).ignoreElements().cast(Void.class);

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * Returns the concatenation of two {@link Observable}s but the first
 * sequence will be emitted in its entirety and ignored before o2 starts
 * emitting.
 * 
 * @param <T>
 *            the generic type of the second observable
 * @param o1
 *            the sequence to ignore
 * @param o2
 *            the sequence to emit after o1 ignored
 * @return observable result of concatenating two observables, ignoring the
 *         first
 */
@SuppressWarnings("unchecked")
public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1,
    Observable<T> o2) {
  return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
}

代码示例来源:origin: davidmoten/rxjava-jdbc

private static <T> Observable<Boolean> commitOrRollbackOnCompleteTransformerIfAtLeastOneValue(
    final boolean isCommit, final Database db, Observable<T> source) {
  CountingAction<T> counter = RxUtil.counter();
  Observable<Boolean> commit = counter
      // get count
      .count()
      // greater than zero or empty
      .filter(greaterThanZero())
      // commit if at least one value
      .compose(db.commitOrRollback_(isCommit));
  return Observable
      // concatenate
      .concat(source
          // count emissions
          .doOnNext(counter)
          // ignore emissions
          .ignoreElements()
          // cast the empty sequence to type Boolean
          .cast(Boolean.class),
          // concat with commit
          commit);
}

代码示例来源:origin: henrymorgen/android-advanced-light

private void ignoreElements() {
  Observable.just(1, 2, 3, 4).ignoreElements().subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
      Log.i("wangshu", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
      Log.i("wangshu", "onError");
    }
    @Override
    public void onNext(Integer integer) {
      Log.i("wangshu", "onNext");
    }
  });
}

代码示例来源:origin: dswarm/dswarm

@Override
  public Observable<T> call(final Observable<T> thisOne) {
    return thisOne.concatWith(other.ignoreElements().map(ignored -> emptyResultValue.get()));
  }
}

代码示例来源:origin: davidmoten/rxjava-extras

@SuppressWarnings("unchecked")
  @Override
  public Observable<R> call(Observable<T> source) {
    return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
  }
};

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@SuppressWarnings("unchecked")
  @Override
  public Observable<R> call(Observable<T> source) {
    return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
  }
};

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.create(new Observable.OnSubscribe<Integer>() {
   @Override
   public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);
    subscriber.onCompleted();
   }
  }).ignoreElements().subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  }, new Action1<Throwable>() {
   @Override
   public void call(Throwable throwable) {
    log(throwable);
   }
  }, new Action0() {
   @Override
   public void call() {
    log("onComplete");
   }
  });
 }
}

代码示例来源:origin: com.hotels.styx/styx-api

/**
 * Sets an empty body on this response.
 *
 * @return {@code this}
 */
public Builder removeBody() {
  return body(body.content()
      .doOnNext(ReferenceCountUtil::release)
      .ignoreElements()
  );
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
    @Override
    public void call(BatchReturnType batchReturnType) {
      // this is a blocking call in HystrixCollapser
      self.mapResponseToRequests(batchReturnType, requests);
    }
  }).ignoreElements().cast(Void.class);
}

代码示例来源:origin: spotify/mobius

@Override
 public Observable<R> call(final T value) {
  Completable completable = func.call(value);
  if (scheduler != null) {
   completable = completable.subscribeOn(scheduler);
  }
  return completable
    .toObservable()
    .ignoreElements()
    .map(
      new Func1<Object, R>() {
       @Override
       public R call(Object ignored) {
        // Since our upstream has ignoreElements on it, values will never ever be emitted, and
        // therefore this function call won't actually be executed. This map is really only present
        // in order to cast the stream to type R. Throwing an exception in this never-to-be-executed
        // function allows us say that the return type is T without actually needing to be able
        // to produce values of type T.
        throw new IllegalStateException(
          "Impossible state! ignoreElements() mustn't allow values to be emitted!");
       }
      });
 }
});

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
    return f ->
      f.publish(g ->
        g.take(1)
        .concatWith(
          g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
          .take(1)
          .ignoreElements()
        )
        .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
      )
      ;
  }
}

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
  logger.debug("Replicated registry entry: {}", instanceInfo);
  if (STATES.Opened != state.get()) {
    return Observable.error(state.get() == STATES.Closed ? CHANNEL_CLOSED_EXCEPTION : IDLE_STATE_EXCEPTION);
  }
  if (replicationLoop) {
    return Observable.error(REPLICATION_LOOP_EXCEPTION);
  }
  if (instanceInfoById.containsKey(instanceInfo.getId())) {
    logger.info("Overwriting existing registration entry for instance {}", instanceInfo.getId());
  }
  InstanceInfo tempNewInfo = new InstanceInfo.Builder()
      .withInstanceInfo(instanceInfo).withVersion(currentVersion++).build();
  return registry.register(tempNewInfo, replicationSource)
      .ignoreElements()
      .cast(Void.class)
      .doOnCompleted(new Action0() {
        @Override
        public void call() {
          instanceInfoById.put(instanceInfo.getId(), instanceInfo);
        }
      });
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) throws Exception {
  PublishSubject<Integer> ps = PublishSubject.create();
  
  ps.publish(o -> 
    o.mergeWith(
      o.switchMap(e -> 
         Observable.just(1).delay(200, TimeUnit.MILLISECONDS)
        .ignoreElements()
        .doOnCompleted(() -> System.out.println("Timeout action: " + e))
      )
    )
  ).subscribe(System.out::println);
  
  ps.onNext(1);
  ps.onNext(2);

  Thread.sleep(100);

  ps.onNext(3);

  Thread.sleep(250);

  ps.onNext(4);

  Thread.sleep(250);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
  Observable<String> names = Observable.just(
      "John", "Steve", "Ruth",
      "Sam", "Jane", "James");

  names.groupBy(s -> s.charAt(0))
  .flatMap(grp -> grp.publish(o -> o.first().concatWith(o.ignoreElements())))
  .subscribe(s -> System.out.println(s));
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_191() throws Exception {
  List<Ticket> tickets = Arrays.asList(new Ticket(), new Ticket(), new Ticket());
  Observable
      .from(tickets)
      .flatMap(ticket ->
          rxSendEmail(ticket)
              .ignoreElements()
              .doOnError(e -> log.warn("Failed to send {}", ticket, e))
              .subscribeOn(Schedulers.io()));
}

代码示例来源:origin: com.netflix.zuul/zuul-netty

@Override
public Observable<ZuulMessage> write(ZuulMessage msg, HttpServerResponse nativeResponse)
{
  HttpResponseMessage zuulResp = (HttpResponseMessage) msg;
  // Set the response status code.
  nativeResponse.setStatus(HttpResponseStatus.valueOf(zuulResp.getStatus()));
  // Now set all of the response headers - note this is a multi-set in keeping with HTTP semantics
  for (Map.Entry<String, String> entry : zuulResp.getHeaders().entries()) {
    nativeResponse.getHeaders().add(entry.getKey(), entry.getValue());
  }
  // Write response body stream as received.
  Observable<ZuulMessage> chain;
  Observable<ByteBuf> bodyStream = zuulResp.getBodyStream();
  if (bodyStream != null) {
    chain = bodyStream
        .doOnNext(bb -> nativeResponse.writeBytesAndFlush(bb))
        .ignoreElements()
        .doOnCompleted(() -> nativeResponse.close())
        .map(bb -> msg);
  }
  else {
    chain = Observable.just(msg);
  }
  return chain;
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_138() throws Exception {
  Single<Integer> ignored = Single
      .just(1)
      .toObservable()
      .ignoreElements()   //PROBLEM
      .toSingle();
}

相关文章

Observable类方法