rx.Observable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(157)

本文整理了Java中rx.Observable.materialize()方法的一些代码示例,展示了Observable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:materialize

Observable.materialize介绍

[英]Returns an Observable that represents all of the emissions and notifications from the source Observable into emissions marked with their original types within Notification objects.

Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可观测值,该值表示源可观测到的所有排放通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return Observable
  4. .combineLatest(
  5. view,
  6. observable
  7. .materialize()
  8. .filter(new Func1<Notification<T>, Boolean>() {
  9. @Override
  10. public Boolean call(Notification<T> notification) {
  11. return !notification.isOnCompleted();
  12. }
  13. }),
  14. new Func2<View, Notification<T>, Delivery<View, T>>() {
  15. @Override
  16. public Delivery<View, T> call(View view, Notification<T> notification) {
  17. return view == null ? null : new Delivery<>(view, notification);
  18. }
  19. })
  20. .filter(new Func1<Delivery<View, T>, Boolean>() {
  21. @Override
  22. public Boolean call(Delivery<View, T> delivery) {
  23. return delivery != null;
  24. }
  25. });
  26. }
  27. }

代码示例来源:origin: konmik/nucleus

  1. final ReplaySubject<Notification<T>> subject = ReplaySubject.create();
  2. final Subscription subscription = observable
  3. .materialize()
  4. .filter(new Func1<Notification<T>, Boolean>() {
  5. @Override

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return observable.materialize()
  4. .take(1)
  5. .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
  6. @Override
  7. public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
  8. return view.map(new Func1<View, Delivery<View, T>>() {
  9. @Override
  10. public Delivery<View, T> call(View view) {
  11. return view == null ? null : new Delivery<>(view, notification);
  12. }
  13. });
  14. }
  15. })
  16. .filter(new Func1<Delivery<View, T>, Boolean>() {
  17. @Override
  18. public Boolean call(Delivery<View, T> delivery) {
  19. return delivery != null;
  20. }
  21. })
  22. .take(1);
  23. }
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadContextOnTimeout() {
  3. final AtomicBoolean isInitialized = new AtomicBoolean();
  4. new TimeoutCommand().toObservable()
  5. .doOnError(new Action1<Throwable>() {
  6. @Override
  7. public void call(Throwable throwable) {
  8. isInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  9. }
  10. })
  11. .materialize()
  12. .toBlocking().single();
  13. System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
  14. System.out.println("initialized inside onError = " + isInitialized.get());
  15. assertEquals(true, isInitialized.get());
  16. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void subscribe(Observable<String> o) {
  2. o = o.materialize().flatMap(n -> {
  3. if (n.isOnError()) {
  4. if (n.getThrowable() instanceof IllegalStateException) {
  5. return Observable.just(n);
  6. } else {
  7. return Observable.error(n.getThrowable());
  8. }
  9. } else {
  10. return Observable.just(n);
  11. }
  12. }).retry().dematerialize();
  13. o.subscribe(System.out::println, t -> t.printStackTrace());
  14. }
  15. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. @Override
  2. public Iterator<T> iterator() {
  3. LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
  4. source.materialize().subscribe(lio);
  5. return lio;
  6. }
  7. };

代码示例来源:origin: com.novoda/rxmocks

  1. /**
  2. * Asserts that a given {@code observable} emits an element matching a given {@code matcher}
  3. *
  4. * @param matcher The matcher to use for the assertion
  5. * @param observable The observable to assert against
  6. * @param <T> The type of the observable
  7. */
  8. public static <T> void expect(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
  9. observable.materialize()
  10. .subscribe(expect(matcher));
  11. }

代码示例来源:origin: com.novoda/rxmocks

  1. /**
  2. * Asserts that a given {@code observable} emits only elements matching a given {@code matcher}
  3. *
  4. * @param matcher The matcher to use for the assertion
  5. * @param observable The observable to assert against
  6. * @param <T> The type of the observable
  7. */
  8. public static <T> void expectOnly(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
  9. observable.materialize()
  10. .subscribe(expectOnly(matcher));
  11. }

代码示例来源:origin: com.novoda/rxmocks

  1. /**
  2. * Asserts that a given {@code observable} emits an element matching a given {@code matcher} *
  3. *
  4. * @param matcher The matcher to use for the assertion
  5. * @param observable The observable to assert against
  6. * @param matched A callback for when the assertion is matched
  7. * @param <T> The type of the observable
  8. */
  9. public static <T> void expect(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
  10. observable.materialize()
  11. .subscribe(expect(matcher, matched));
  12. }

代码示例来源:origin: com.novoda/rxmocks

  1. /**
  2. * Asserts that a given {@code observable} emits only elements matching a given {@code matcher} *
  3. *
  4. * @param matcher The matcher to use for the assertion
  5. * @param observable The observable to assert against
  6. * @param matched A callback for when the assertion is matched
  7. * @param <T> The type of the observable
  8. */
  9. public static <T> void expectOnly(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
  10. observable.materialize()
  11. .subscribe(expectOnly(matcher, matched));
  12. }

代码示例来源:origin: au.gov.amsa.risky/ais

  1. public static <T> void print(Observable<T> o) {
  2. o.materialize().toBlocking().forEach(System.out::println);
  3. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<T> call(Observable<T> o) {
  3. return o.materialize().buffer(2, 1)
  4. .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
  5. @Override
  6. public Observable<T> call(List<Notification<T>> list) {
  7. Notification<T> a = list.get(0);
  8. if (list.size() == 2 && list.get(1).isOnCompleted()) {
  9. return Observable.just(a.getValue()).repeat();
  10. } else if (a.isOnError()) {
  11. return Observable.error(list.get(0).getThrowable());
  12. } else if (a.isOnCompleted()) {
  13. return Observable.empty();
  14. } else {
  15. return Observable.just(a.getValue());
  16. }
  17. }
  18. });
  19. }
  20. };

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<T> call(Observable<T> o) {
  3. return o.materialize().buffer(2, 1)
  4. .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
  5. @Override
  6. public Observable<T> call(List<Notification<T>> list) {
  7. Notification<T> a = list.get(0);
  8. if (list.size() == 2 && list.get(1).isOnCompleted()) {
  9. return Observable.just(a.getValue()).repeat();
  10. } else if (a.isOnError()) {
  11. return Observable.error(list.get(0).getThrowable());
  12. } else if (a.isOnCompleted()) {
  13. return Observable.empty();
  14. } else {
  15. return Observable.just(a.getValue());
  16. }
  17. }
  18. });
  19. }
  20. };

代码示例来源:origin: com.netflix.ribbon/ribbon

  1. @Override
  2. public void call(
  3. final Subscriber<? super RibbonResponse<Observable<T>>> t1) {
  4. final Subject<T, T> subject = ReplaySubject.create();
  5. hystrixNotificationObservable.materialize().subscribe(new Action1<Notification<ResultCommandPair<T>>>() {
  6. AtomicBoolean first = new AtomicBoolean(true);
  7. @Override
  8. public void call(Notification<ResultCommandPair<T>> notification) {
  9. if (first.compareAndSet(true, false)) {
  10. HystrixObservableCommand<T> command = notification.isOnError() ? commandChain.getLastCommand() : notification.getValue().getCommand();
  11. t1.onNext(new ResponseWithSubject<T>(subject, command));
  12. t1.onCompleted();
  13. }
  14. if (notification.isOnNext()) {
  15. subject.onNext(notification.getValue().getResult());
  16. } else if (notification.isOnCompleted()) {
  17. subject.onCompleted();
  18. } else { // onError
  19. subject.onError(notification.getThrowable());
  20. }
  21. }
  22. });
  23. }
  24. });

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable o1 = Observable.range(1, 3).materialize();
  4. o1.subscribe(new Action1<Notification<Integer>>() {
  5. @Override
  6. public void call(Notification<Integer> integerNotification) {
  7. log("******");
  8. log("kind:" + integerNotification.getKind());
  9. log("value:" + integerNotification.getValue());
  10. }
  11. });
  12. o1.dematerialize().subscribe(new Action1() {
  13. @Override
  14. public void call(Object o) {
  15. log(o.toString());
  16. }
  17. });
  18. }
  19. });

代码示例来源:origin: com.netflix.eureka/eureka2-client

  1. .materialize()

代码示例来源:origin: com.netflix.eureka/eureka2-eureka1-rest-api

  1. private Observable<Void> instanceGetByAppAndInstanceId(final String appName, final String instanceId, final EncodingFormat format,
  2. final boolean gzip, final HttpServerResponse<ByteBuf> response) {
  3. return registryViewCache.findInstance(instanceId).materialize().toList().flatMap(
  4. new Func1<List<Notification<InstanceInfo>>, Observable<Void>>() {
  5. @Override
  6. public Observable<Void> call(List<Notification<InstanceInfo>> notifications) {
  7. // If completed with error propagate it
  8. Notification<InstanceInfo> lastNotification = notifications.get(notifications.size() - 1);
  9. if (lastNotification.getKind() == Kind.OnError) {
  10. return Observable.error(lastNotification.getThrowable());
  11. }
  12. // If onComplete only => instance info not found
  13. if (notifications.size() == 1) {
  14. logger.info("Instance info with id {} not found", instanceId);
  15. response.setStatus(HttpResponseStatus.NOT_FOUND);
  16. return Observable.empty();
  17. }
  18. // InstanceInfo object found
  19. InstanceInfo v1InstanceInfo = notifications.get(0).getValue();
  20. if (appName != null && !appName.equalsIgnoreCase(v1InstanceInfo.getAppName())) {
  21. logger.info("Instance info with id {} is associated with application {}, not {}",
  22. instanceId, v1InstanceInfo.getAppName(), appName);
  23. response.setStatus(HttpResponseStatus.NOT_FOUND);
  24. return Observable.empty();
  25. }
  26. return encodeResponse(format, gzip, response, v1InstanceInfo);
  27. }
  28. }
  29. );
  30. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Out> call() {
  3. Mutable<State> state = new Mutable<State>(initialState.call());
  4. return source.materialize()
  5. // do state transitions and emit notifications
  6. // use flatMap to emit notification values
  7. .flatMap(execute(transition, completion, state, backpressureStrategy),
  8. initialRequest)
  9. // complete if we encounter an unsubscribed sentinel
  10. .takeWhile(NOT_UNSUBSCRIBED)
  11. // flatten notifications to a stream which will enable
  12. // early termination from the state machine if desired
  13. .dematerialize();
  14. }
  15. });

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Out> call() {
  3. Mutable<State> state = new Mutable<State>(initialState.call());
  4. return source.materialize()
  5. // do state transitions and emit notifications
  6. // use flatMap to emit notification values
  7. .flatMap(execute(transition, completion, state, backpressureStrategy),
  8. initialRequest)
  9. // complete if we encounter an unsubscribed sentinel
  10. .takeWhile(NOT_UNSUBSCRIBED)
  11. // flatten notifications to a stream which will enable
  12. // early termination from the state machine if desired
  13. .dematerialize();
  14. }
  15. });

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_87() throws Exception {
  3. Observable<Notification<Integer>> notifications = Observable
  4. .just(3, 0, 2, 0, 1, 0)
  5. .concatMapDelayError(x -> fromCallable(() -> 100 / x))
  6. .materialize();
  7. List<Notification.Kind> kinds = notifications
  8. .map(Notification::getKind)
  9. .toList()
  10. .toBlocking()
  11. .single();
  12. assertThat(kinds).containsExactly(OnNext, OnNext, OnNext, OnError);
  13. }

相关文章

Observable类方法