本文整理了Java中rx.Observable.doOnEach()
方法的一些代码示例,展示了Observable.doOnEach()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnEach()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnEach
[英]Modifies the source Observable so that it notifies an Observer for each item it emits.
Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改可观察的源,以便它为其发出的每个项目通知观察者。
调度器:默认情况下,doOnEach不会在特定的调度器上运行。
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: PipelineAI/pipeline
command.toObservable().doOnEach(new Action1<Notification<? super Boolean>>() {
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
log("doOnEach:" + notification.getKind() + " " + notification.getValue());
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: com.netflix.eureka2/eureka-read-server
.doOnEach(new Action1<Notification<? super InstanceInfo>>() {
@Override
public void call(Notification<? super InstanceInfo> notification) {
代码示例来源:origin: com.netflix.eureka2/eureka-bridge
public Observable<InstanceInfo> connect() {
return new InstanceInfoFromConfig(config)
.get()
.map(new Func1<InstanceInfo.Builder, InstanceInfo>() {
@Override
public InstanceInfo call(InstanceInfo.Builder builder) {
HashSet<ServicePort> ports = new HashSet<>();
ports.add(new ServicePort(Names.DISCOVERY, config.getDiscoveryPort(), false));
return builder.withPorts(ports).build();
}
})
.doOnEach(new Action1<Notification<? super InstanceInfo>>() {
@Override
public void call(Notification<? super InstanceInfo> notification) {
switch (notification.getKind()) {
case OnNext:
replaySubject.onNext((InstanceInfo) notification.getValue());
replaySubject.onCompleted();
logger.info("Own instance info resolved to {}", notification.getValue());
break;
case OnError:
replaySubject.onError(notification.getThrowable());
logger.error("Could not resolve own instance info", notification.getThrowable());
break;
}
}
});
}
}
代码示例来源:origin: com.netflix.eureka2/eureka-write-server
.doOnEach(new Action1<Notification<? super InstanceInfo>>() {
@Override
public void call(Notification<? super InstanceInfo> notification) {
代码示例来源:origin: patrick-doyle/android-rxmvp-tutorial
private Subscription observeLookupButton() {
return view.observeButton()
.doOnNext(__ -> view.showLoading(true))
.map(__ -> view.getUsernameEdit())
.observeOn(Schedulers.io())
.switchMap(username -> model.getUserReops(username))
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(gitHubRepoList -> model.saveRepoListState(gitHubRepoList))
.doOnEach(__ -> view.showLoading(false))
.retry()
.subscribe(gitHubRepoList -> {
model.startRepoActivity(gitHubRepoList);
});
}
}
代码示例来源:origin: com.novoda/rxmocks
private void initialiseMockedObservable(Method method, Object[] args) {
ClearableBehaviorSubject<Notification> subject = ClearableBehaviorSubject.create();
PublishSubject<Notification> notificationSubject = PublishSubject.create();
final String keyForArgs = getKeyFor(method, args);
final Observable observable = subject
.dematerialize()
.doOnEach(new NotifyDataEvent(notificationSubject))
.lift(new SwallowUnsubscribe());
observableHashMap.put(keyForArgs, observable);
mapSubject.put(observable, new Pair<>(subject, notificationSubject));
}
代码示例来源:origin: com.sonymobile/lumbermill-core
private Func1<Observable<? extends Throwable>, Observable<?>> doCreate(int attempts, Timer.Factory timerFactory) {
return errorNotification -> {
Timer timer = timerFactory.create();
return errorNotification
.doOnEach(throwable -> printException(throwable))
.zipWith(Observable.range(1, attempts), RetryStrategyImpl::create)
.flatMap(attempt ->
attempt.getSecond() == attempts || !exceptionMatch(attempt.getFirst()) ?
Observable.error(attempt.getFirst()) :
timer.next()
);
};
}
代码示例来源:origin: com.netflix.turbine/turbine-core
return attempts.flatMap(e -> {
return Observable.timer(1, TimeUnit.SECONDS)
.doOnEach(n -> logger.info("Turbine => Retrying connection to: " + uri));
});
});
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the composite operation until the operation successfully returns and the precondition is met.
* The precondition receives the composite result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
@Nullable Predicate<CompositeResult> until) {
logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
(op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the operation until the operation successfully returns and the precondition is met. The precondition
* receives the result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
@Nullable Predicate<ModelNode> until) {
logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
(op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> !r.isFailure(); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
内容来源于网络,如有侵权,请联系作者删除!