rx.Observable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(187)

本文整理了Java中rx.Observable.doOnEach()方法的一些代码示例,展示了Observable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnEach()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnEach

Observable.doOnEach介绍

[英]Modifies the source Observable so that it notifies an Observer for each item it emits.

Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改可观察的源,以便它为其发出的每个项目通知观察者。
调度器:默认情况下,doOnEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).doOnEach(new Action1<Notification<? super Integer>>() {
   @Override
   public void call(Notification<? super Integer> notification) {
    log("doOnEach:" + notification.getKind() + " " + notification.getValue());
   }
  }).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
 }
});

代码示例来源:origin: com.netflix.eureka2/eureka-read-server

.doOnEach(new Action1<Notification<? super InstanceInfo>>() {
  @Override
  public void call(Notification<? super InstanceInfo> notification) {

代码示例来源:origin: com.netflix.eureka2/eureka-bridge

public Observable<InstanceInfo> connect() {
    return new InstanceInfoFromConfig(config)
        .get()
        .map(new Func1<InstanceInfo.Builder, InstanceInfo>() {
          @Override
          public InstanceInfo call(InstanceInfo.Builder builder) {
            HashSet<ServicePort> ports = new HashSet<>();
            ports.add(new ServicePort(Names.DISCOVERY, config.getDiscoveryPort(), false));

            return builder.withPorts(ports).build();
          }
        })
        .doOnEach(new Action1<Notification<? super InstanceInfo>>() {
          @Override
          public void call(Notification<? super InstanceInfo> notification) {
            switch (notification.getKind()) {
              case OnNext:
                replaySubject.onNext((InstanceInfo) notification.getValue());
                replaySubject.onCompleted();
                logger.info("Own instance info resolved to {}", notification.getValue());
                break;
              case OnError:
                replaySubject.onError(notification.getThrowable());
                logger.error("Could not resolve own instance info", notification.getThrowable());
                break;
            }
          }
        });
  }
}

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

.doOnEach(new Action1<Notification<? super InstanceInfo>>() {
  @Override
  public void call(Notification<? super InstanceInfo> notification) {

代码示例来源:origin: patrick-doyle/android-rxmvp-tutorial

private Subscription observeLookupButton() {
  return view.observeButton()
    .doOnNext(__ -> view.showLoading(true))
    .map(__ -> view.getUsernameEdit())
    .observeOn(Schedulers.io())
    .switchMap(username -> model.getUserReops(username))
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(gitHubRepoList -> model.saveRepoListState(gitHubRepoList))
    .doOnEach(__ -> view.showLoading(false))
    .retry()
    .subscribe(gitHubRepoList -> {
     model.startRepoActivity(gitHubRepoList);
    });
 }
}

代码示例来源:origin: com.novoda/rxmocks

private void initialiseMockedObservable(Method method, Object[] args) {
  ClearableBehaviorSubject<Notification> subject = ClearableBehaviorSubject.create();
  PublishSubject<Notification> notificationSubject = PublishSubject.create();
  final String keyForArgs = getKeyFor(method, args);
  final Observable observable = subject
      .dematerialize()
      .doOnEach(new NotifyDataEvent(notificationSubject))
      .lift(new SwallowUnsubscribe());
  observableHashMap.put(keyForArgs, observable);
  mapSubject.put(observable, new Pair<>(subject, notificationSubject));
}

代码示例来源:origin: com.sonymobile/lumbermill-core

private  Func1<Observable<? extends Throwable>, Observable<?>> doCreate(int attempts, Timer.Factory timerFactory) {
  return errorNotification -> {
    Timer timer = timerFactory.create();
    return errorNotification
        .doOnEach(throwable -> printException(throwable))
        .zipWith(Observable.range(1, attempts), RetryStrategyImpl::create)
        .flatMap(attempt ->
            attempt.getSecond() == attempts || !exceptionMatch(attempt.getFirst()) ?
                Observable.error(attempt.getFirst()) :
                timer.next()
        );
  };
}

代码示例来源:origin: com.netflix.turbine/turbine-core

return attempts.flatMap(e -> {
    return Observable.timer(1, TimeUnit.SECONDS)
        .doOnEach(n -> logger.info("Turbine => Retrying connection to: " + uri));
  });
});

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the composite operation until the operation successfully returns and the precondition is met.
 * The precondition receives the composite result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
    @Nullable Predicate<CompositeResult> until) {
  logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
      (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the operation until the operation successfully returns and the precondition is met. The precondition
 * receives the result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
    @Nullable Predicate<ModelNode> until) {
  logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
      (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> !r.isFailure(); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

相关文章

Observable类方法