rx.Observable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(226)

本文整理了Java中rx.Observable.doOnEach()方法的一些代码示例,展示了Observable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnEach()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnEach

Observable.doOnEach介绍

[英]Modifies the source Observable so that it notifies an Observer for each item it emits.

Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改可观察的源,以便它为其发出的每个项目通知观察者。
调度器:默认情况下,doOnEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnEach(setRequestContext)
  2. .lift(new FallbackHookApplication(_cmd))
  3. .lift(new DeprecatedOnFallbackHookApplication(_cmd))

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnCompleted(markOnCompleted)
  2. .onErrorResumeNext(handleFallback)
  3. .doOnEach(setRequestContext);

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: PipelineAI/pipeline

  1. command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).doOnEach(new Action1<Notification<? super Integer>>() {
  4. @Override
  5. public void call(Notification<? super Integer> notification) {
  6. log("doOnEach:" + notification.getKind() + " " + notification.getValue());
  7. }
  8. }).subscribe(new Action1<Integer>() {
  9. @Override
  10. public void call(Integer integer) {
  11. log(integer);
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: com.netflix.eureka2/eureka-read-server

  1. .doOnEach(new Action1<Notification<? super InstanceInfo>>() {
  2. @Override
  3. public void call(Notification<? super InstanceInfo> notification) {

代码示例来源:origin: com.netflix.eureka2/eureka-bridge

  1. public Observable<InstanceInfo> connect() {
  2. return new InstanceInfoFromConfig(config)
  3. .get()
  4. .map(new Func1<InstanceInfo.Builder, InstanceInfo>() {
  5. @Override
  6. public InstanceInfo call(InstanceInfo.Builder builder) {
  7. HashSet<ServicePort> ports = new HashSet<>();
  8. ports.add(new ServicePort(Names.DISCOVERY, config.getDiscoveryPort(), false));
  9. return builder.withPorts(ports).build();
  10. }
  11. })
  12. .doOnEach(new Action1<Notification<? super InstanceInfo>>() {
  13. @Override
  14. public void call(Notification<? super InstanceInfo> notification) {
  15. switch (notification.getKind()) {
  16. case OnNext:
  17. replaySubject.onNext((InstanceInfo) notification.getValue());
  18. replaySubject.onCompleted();
  19. logger.info("Own instance info resolved to {}", notification.getValue());
  20. break;
  21. case OnError:
  22. replaySubject.onError(notification.getThrowable());
  23. logger.error("Could not resolve own instance info", notification.getThrowable());
  24. break;
  25. }
  26. }
  27. });
  28. }
  29. }

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

  1. .doOnEach(new Action1<Notification<? super InstanceInfo>>() {
  2. @Override
  3. public void call(Notification<? super InstanceInfo> notification) {

代码示例来源:origin: patrick-doyle/android-rxmvp-tutorial

  1. private Subscription observeLookupButton() {
  2. return view.observeButton()
  3. .doOnNext(__ -> view.showLoading(true))
  4. .map(__ -> view.getUsernameEdit())
  5. .observeOn(Schedulers.io())
  6. .switchMap(username -> model.getUserReops(username))
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .doOnNext(gitHubRepoList -> model.saveRepoListState(gitHubRepoList))
  9. .doOnEach(__ -> view.showLoading(false))
  10. .retry()
  11. .subscribe(gitHubRepoList -> {
  12. model.startRepoActivity(gitHubRepoList);
  13. });
  14. }
  15. }

代码示例来源:origin: com.novoda/rxmocks

  1. private void initialiseMockedObservable(Method method, Object[] args) {
  2. ClearableBehaviorSubject<Notification> subject = ClearableBehaviorSubject.create();
  3. PublishSubject<Notification> notificationSubject = PublishSubject.create();
  4. final String keyForArgs = getKeyFor(method, args);
  5. final Observable observable = subject
  6. .dematerialize()
  7. .doOnEach(new NotifyDataEvent(notificationSubject))
  8. .lift(new SwallowUnsubscribe());
  9. observableHashMap.put(keyForArgs, observable);
  10. mapSubject.put(observable, new Pair<>(subject, notificationSubject));
  11. }

代码示例来源:origin: com.sonymobile/lumbermill-core

  1. private Func1<Observable<? extends Throwable>, Observable<?>> doCreate(int attempts, Timer.Factory timerFactory) {
  2. return errorNotification -> {
  3. Timer timer = timerFactory.create();
  4. return errorNotification
  5. .doOnEach(throwable -> printException(throwable))
  6. .zipWith(Observable.range(1, attempts), RetryStrategyImpl::create)
  7. .flatMap(attempt ->
  8. attempt.getSecond() == attempts || !exceptionMatch(attempt.getFirst()) ?
  9. Observable.error(attempt.getFirst()) :
  10. timer.next()
  11. );
  12. };
  13. }

代码示例来源:origin: com.netflix.turbine/turbine-core

  1. return attempts.flatMap(e -> {
  2. return Observable.timer(1, TimeUnit.SECONDS)
  3. .doOnEach(n -> logger.info("Turbine => Retrying connection to: " + uri));
  4. });
  5. });

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the composite operation until the operation successfully returns and the precondition is met.
  3. * The precondition receives the composite result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
  7. @Nullable Predicate<CompositeResult> until) {
  8. logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  9. Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
  10. (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the operation until the operation successfully returns and the precondition is met. The precondition
  3. * receives the result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
  7. @Nullable Predicate<ModelNode> until) {
  8. logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  9. Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
  10. (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> !r.isFailure(); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

相关文章

Observable类方法