本文整理了Java中rx.Observable.materialize()
方法的一些代码示例,展示了Observable.materialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:materialize
[英]Returns an Observable that represents all of the emissions and notifications from the source Observable into emissions marked with their original types within Notification objects.
Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可观测值,该值表示源可观测到的所有排放和通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。
代码示例来源:origin: konmik/nucleus
@Override
public Observable<Delivery<View, T>> call(Observable<T> observable) {
return Observable
.combineLatest(
view,
observable
.materialize()
.filter(new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> notification) {
return !notification.isOnCompleted();
}
}),
new Func2<View, Notification<T>, Delivery<View, T>>() {
@Override
public Delivery<View, T> call(View view, Notification<T> notification) {
return view == null ? null : new Delivery<>(view, notification);
}
})
.filter(new Func1<Delivery<View, T>, Boolean>() {
@Override
public Boolean call(Delivery<View, T> delivery) {
return delivery != null;
}
});
}
}
代码示例来源:origin: konmik/nucleus
final ReplaySubject<Notification<T>> subject = ReplaySubject.create();
final Subscription subscription = observable
.materialize()
.filter(new Func1<Notification<T>, Boolean>() {
@Override
代码示例来源:origin: konmik/nucleus
@Override
public Observable<Delivery<View, T>> call(Observable<T> observable) {
return observable.materialize()
.take(1)
.switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
@Override
public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
return view.map(new Func1<View, Delivery<View, T>>() {
@Override
public Delivery<View, T> call(View view) {
return view == null ? null : new Delivery<>(view, notification);
}
});
}
})
.filter(new Func1<Delivery<View, T>, Boolean>() {
@Override
public Boolean call(Delivery<View, T> delivery) {
return delivery != null;
}
})
.take(1);
}
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadContextOnTimeout() {
final AtomicBoolean isInitialized = new AtomicBoolean();
new TimeoutCommand().toObservable()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
isInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
}
})
.materialize()
.toBlocking().single();
System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
System.out.println("initialized inside onError = " + isInitialized.get());
assertEquals(true, isInitialized.get());
}
代码示例来源:origin: jhusain/learnrxjava
public static void subscribe(Observable<String> o) {
o = o.materialize().flatMap(n -> {
if (n.isOnError()) {
if (n.getThrowable() instanceof IllegalStateException) {
return Observable.just(n);
} else {
return Observable.error(n.getThrowable());
}
} else {
return Observable.just(n);
}
}).retry().dematerialize();
o.subscribe(System.out::println, t -> t.printStackTrace());
}
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
source.materialize().subscribe(lio);
return lio;
}
};
代码示例来源:origin: com.novoda/rxmocks
/**
* Asserts that a given {@code observable} emits an element matching a given {@code matcher}
*
* @param matcher The matcher to use for the assertion
* @param observable The observable to assert against
* @param <T> The type of the observable
*/
public static <T> void expect(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
observable.materialize()
.subscribe(expect(matcher));
}
代码示例来源:origin: com.novoda/rxmocks
/**
* Asserts that a given {@code observable} emits only elements matching a given {@code matcher}
*
* @param matcher The matcher to use for the assertion
* @param observable The observable to assert against
* @param <T> The type of the observable
*/
public static <T> void expectOnly(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
observable.materialize()
.subscribe(expectOnly(matcher));
}
代码示例来源:origin: com.novoda/rxmocks
/**
* Asserts that a given {@code observable} emits an element matching a given {@code matcher} *
*
* @param matcher The matcher to use for the assertion
* @param observable The observable to assert against
* @param matched A callback for when the assertion is matched
* @param <T> The type of the observable
*/
public static <T> void expect(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
observable.materialize()
.subscribe(expect(matcher, matched));
}
代码示例来源:origin: com.novoda/rxmocks
/**
* Asserts that a given {@code observable} emits only elements matching a given {@code matcher} *
*
* @param matcher The matcher to use for the assertion
* @param observable The observable to assert against
* @param matched A callback for when the assertion is matched
* @param <T> The type of the observable
*/
public static <T> void expectOnly(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
observable.materialize()
.subscribe(expectOnly(matcher, matched));
}
代码示例来源:origin: au.gov.amsa.risky/ais
public static <T> void print(Observable<T> o) {
o.materialize().toBlocking().forEach(System.out::println);
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<T> call(Observable<T> o) {
return o.materialize().buffer(2, 1)
.flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
@Override
public Observable<T> call(List<Notification<T>> list) {
Notification<T> a = list.get(0);
if (list.size() == 2 && list.get(1).isOnCompleted()) {
return Observable.just(a.getValue()).repeat();
} else if (a.isOnError()) {
return Observable.error(list.get(0).getThrowable());
} else if (a.isOnCompleted()) {
return Observable.empty();
} else {
return Observable.just(a.getValue());
}
}
});
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<T> call(Observable<T> o) {
return o.materialize().buffer(2, 1)
.flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
@Override
public Observable<T> call(List<Notification<T>> list) {
Notification<T> a = list.get(0);
if (list.size() == 2 && list.get(1).isOnCompleted()) {
return Observable.just(a.getValue()).repeat();
} else if (a.isOnError()) {
return Observable.error(list.get(0).getThrowable());
} else if (a.isOnCompleted()) {
return Observable.empty();
} else {
return Observable.just(a.getValue());
}
}
});
}
};
代码示例来源:origin: com.netflix.ribbon/ribbon
@Override
public void call(
final Subscriber<? super RibbonResponse<Observable<T>>> t1) {
final Subject<T, T> subject = ReplaySubject.create();
hystrixNotificationObservable.materialize().subscribe(new Action1<Notification<ResultCommandPair<T>>>() {
AtomicBoolean first = new AtomicBoolean(true);
@Override
public void call(Notification<ResultCommandPair<T>> notification) {
if (first.compareAndSet(true, false)) {
HystrixObservableCommand<T> command = notification.isOnError() ? commandChain.getLastCommand() : notification.getValue().getCommand();
t1.onNext(new ResponseWithSubject<T>(subject, command));
t1.onCompleted();
}
if (notification.isOnNext()) {
subject.onNext(notification.getValue().getResult());
} else if (notification.isOnCompleted()) {
subject.onCompleted();
} else { // onError
subject.onError(notification.getThrowable());
}
}
});
}
});
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable o1 = Observable.range(1, 3).materialize();
o1.subscribe(new Action1<Notification<Integer>>() {
@Override
public void call(Notification<Integer> integerNotification) {
log("******");
log("kind:" + integerNotification.getKind());
log("value:" + integerNotification.getValue());
}
});
o1.dematerialize().subscribe(new Action1() {
@Override
public void call(Object o) {
log(o.toString());
}
});
}
});
代码示例来源:origin: com.netflix.eureka/eureka2-client
.materialize()
代码示例来源:origin: com.netflix.eureka/eureka2-eureka1-rest-api
private Observable<Void> instanceGetByAppAndInstanceId(final String appName, final String instanceId, final EncodingFormat format,
final boolean gzip, final HttpServerResponse<ByteBuf> response) {
return registryViewCache.findInstance(instanceId).materialize().toList().flatMap(
new Func1<List<Notification<InstanceInfo>>, Observable<Void>>() {
@Override
public Observable<Void> call(List<Notification<InstanceInfo>> notifications) {
// If completed with error propagate it
Notification<InstanceInfo> lastNotification = notifications.get(notifications.size() - 1);
if (lastNotification.getKind() == Kind.OnError) {
return Observable.error(lastNotification.getThrowable());
}
// If onComplete only => instance info not found
if (notifications.size() == 1) {
logger.info("Instance info with id {} not found", instanceId);
response.setStatus(HttpResponseStatus.NOT_FOUND);
return Observable.empty();
}
// InstanceInfo object found
InstanceInfo v1InstanceInfo = notifications.get(0).getValue();
if (appName != null && !appName.equalsIgnoreCase(v1InstanceInfo.getAppName())) {
logger.info("Instance info with id {} is associated with application {}, not {}",
instanceId, v1InstanceInfo.getAppName(), appName);
response.setStatus(HttpResponseStatus.NOT_FOUND);
return Observable.empty();
}
return encodeResponse(format, gzip, response, v1InstanceInfo);
}
}
);
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Out> call() {
Mutable<State> state = new Mutable<State>(initialState.call());
return source.materialize()
// do state transitions and emit notifications
// use flatMap to emit notification values
.flatMap(execute(transition, completion, state, backpressureStrategy),
initialRequest)
// complete if we encounter an unsubscribed sentinel
.takeWhile(NOT_UNSUBSCRIBED)
// flatten notifications to a stream which will enable
// early termination from the state machine if desired
.dematerialize();
}
});
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Out> call() {
Mutable<State> state = new Mutable<State>(initialState.call());
return source.materialize()
// do state transitions and emit notifications
// use flatMap to emit notification values
.flatMap(execute(transition, completion, state, backpressureStrategy),
initialRequest)
// complete if we encounter an unsubscribed sentinel
.takeWhile(NOT_UNSUBSCRIBED)
// flatten notifications to a stream which will enable
// early termination from the state machine if desired
.dematerialize();
}
});
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_87() throws Exception {
Observable<Notification<Integer>> notifications = Observable
.just(3, 0, 2, 0, 1, 0)
.concatMapDelayError(x -> fromCallable(() -> 100 / x))
.materialize();
List<Notification.Kind> kinds = notifications
.map(Notification::getKind)
.toList()
.toBlocking()
.single();
assertThat(kinds).containsExactly(OnNext, OnNext, OnNext, OnError);
}
内容来源于网络,如有侵权,请联系作者删除!