rx.Observable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中rx.Observable.materialize()方法的一些代码示例,展示了Observable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:materialize

Observable.materialize介绍

[英]Returns an Observable that represents all of the emissions and notifications from the source Observable into emissions marked with their original types within Notification objects.

Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可观测值,该值表示源可观测到的所有排放通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return Observable
      .combineLatest(
        view,
        observable
          .materialize()
          .filter(new Func1<Notification<T>, Boolean>() {
            @Override
            public Boolean call(Notification<T> notification) {
              return !notification.isOnCompleted();
            }
          }),
        new Func2<View, Notification<T>, Delivery<View, T>>() {
          @Override
          public Delivery<View, T> call(View view, Notification<T> notification) {
            return view == null ? null : new Delivery<>(view, notification);
          }
        })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      });
  }
}

代码示例来源:origin: konmik/nucleus

final ReplaySubject<Notification<T>> subject = ReplaySubject.create();
final Subscription subscription = observable
  .materialize()
  .filter(new Func1<Notification<T>, Boolean>() {
    @Override

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return observable.materialize()
      .take(1)
      .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
        @Override
        public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
          return view.map(new Func1<View, Delivery<View, T>>() {
            @Override
            public Delivery<View, T> call(View view) {
              return view == null ? null : new Delivery<>(view, notification);
            }
          });
        }
      })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      })
      .take(1);
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadContextOnTimeout() {
  final AtomicBoolean isInitialized = new AtomicBoolean();
  new TimeoutCommand().toObservable()
      .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          isInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
        }
      })
      .materialize()
      .toBlocking().single();
  System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
  System.out.println("initialized inside onError = " + isInitialized.get());
  assertEquals(true, isInitialized.get());
}

代码示例来源:origin: jhusain/learnrxjava

public static void subscribe(Observable<String> o) {
    o = o.materialize().flatMap(n -> {
      if (n.isOnError()) {
        if (n.getThrowable() instanceof IllegalStateException) {
          return Observable.just(n);
        } else {
          return Observable.error(n.getThrowable());
        }
      } else {
        return Observable.just(n);
      }
    }).retry().dematerialize();

    o.subscribe(System.out::println, t -> t.printStackTrace());
  }
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
  public Iterator<T> iterator() {
    LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
    source.materialize().subscribe(lio);
    return lio;
  }
};

代码示例来源:origin: com.novoda/rxmocks

/**
 * Asserts that a given {@code observable} emits an element matching a given {@code matcher}
 *
 * @param matcher    The matcher to use for the assertion
 * @param observable The observable to assert against
 * @param <T>        The type of the observable
 */
public static <T> void expect(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
  observable.materialize()
      .subscribe(expect(matcher));
}

代码示例来源:origin: com.novoda/rxmocks

/**
 * Asserts that a given {@code observable} emits only elements matching a given {@code matcher}
 *
 * @param matcher    The matcher to use for the assertion
 * @param observable The observable to assert against
 * @param <T>        The type of the observable
 */
public static <T> void expectOnly(RxMatcher<Notification<T>> matcher, Observable<T> observable) {
  observable.materialize()
      .subscribe(expectOnly(matcher));
}

代码示例来源:origin: com.novoda/rxmocks

/**
 * Asserts that a given {@code observable} emits an element matching a given {@code matcher}     *
 *
 * @param matcher    The matcher to use for the assertion
 * @param observable The observable to assert against
 * @param matched    A callback for when the assertion is matched
 * @param <T>        The type of the observable
 */
public static <T> void expect(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
  observable.materialize()
      .subscribe(expect(matcher, matched));
}

代码示例来源:origin: com.novoda/rxmocks

/**
 * Asserts that a given {@code observable} emits only elements matching a given {@code matcher}     *
 *
 * @param matcher    The matcher to use for the assertion
 * @param observable The observable to assert against
 * @param matched    A callback for when the assertion is matched
 * @param <T>        The type of the observable
 */
public static <T> void expectOnly(final RxMatcher<Notification<T>> matcher, final Observable<T> observable, final Action1<Notification<T>> matched) {
  observable.materialize()
      .subscribe(expectOnly(matcher, matched));
}

代码示例来源:origin: au.gov.amsa.risky/ais

public static <T> void print(Observable<T> o) {
  o.materialize().toBlocking().forEach(System.out::println);
}

代码示例来源:origin: davidmoten/rxjava-extras

@Override
  public Observable<T> call(Observable<T> o) {
    return o.materialize().buffer(2, 1)
        .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
          @Override
          public Observable<T> call(List<Notification<T>> list) {
            Notification<T> a = list.get(0);
            if (list.size() == 2 && list.get(1).isOnCompleted()) {
              return Observable.just(a.getValue()).repeat();
            } else if (a.isOnError()) {
              return Observable.error(list.get(0).getThrowable());
            } else if (a.isOnCompleted()) {
              return Observable.empty();
            } else {
              return Observable.just(a.getValue());
            }
          }
        });
  }
};

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
  public Observable<T> call(Observable<T> o) {
    return o.materialize().buffer(2, 1)
        .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
          @Override
          public Observable<T> call(List<Notification<T>> list) {
            Notification<T> a = list.get(0);
            if (list.size() == 2 && list.get(1).isOnCompleted()) {
              return Observable.just(a.getValue()).repeat();
            } else if (a.isOnError()) {
              return Observable.error(list.get(0).getThrowable());
            } else if (a.isOnCompleted()) {
              return Observable.empty();
            } else {
              return Observable.just(a.getValue());
            }
          }
        });
  }
};

代码示例来源:origin: com.netflix.ribbon/ribbon

@Override
  public void call(
      final Subscriber<? super RibbonResponse<Observable<T>>> t1) {
    final Subject<T, T> subject = ReplaySubject.create();
    hystrixNotificationObservable.materialize().subscribe(new Action1<Notification<ResultCommandPair<T>>>() {
      AtomicBoolean first = new AtomicBoolean(true);
      @Override
      public void call(Notification<ResultCommandPair<T>> notification) {
        if (first.compareAndSet(true, false)) {
          HystrixObservableCommand<T> command = notification.isOnError() ? commandChain.getLastCommand() : notification.getValue().getCommand();
          t1.onNext(new ResponseWithSubject<T>(subject, command));
          t1.onCompleted();
        }
        if (notification.isOnNext()) {
          subject.onNext(notification.getValue().getResult());
        } else if (notification.isOnCompleted()) {
          subject.onCompleted();
        } else { // onError
          subject.onError(notification.getThrowable());
        }
      }
    });
  }
});

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable o1 = Observable.range(1, 3).materialize();
  o1.subscribe(new Action1<Notification<Integer>>() {
   @Override
   public void call(Notification<Integer> integerNotification) {
    log("******");
    log("kind:" + integerNotification.getKind());
    log("value:" + integerNotification.getValue());
   }
  });
  o1.dematerialize().subscribe(new Action1() {
   @Override
   public void call(Object o) {
    log(o.toString());
   }
  });
 }
});

代码示例来源:origin: com.netflix.eureka/eureka2-client

.materialize()

代码示例来源:origin: com.netflix.eureka/eureka2-eureka1-rest-api

private Observable<Void> instanceGetByAppAndInstanceId(final String appName, final String instanceId, final EncodingFormat format,
                            final boolean gzip, final HttpServerResponse<ByteBuf> response) {
  return registryViewCache.findInstance(instanceId).materialize().toList().flatMap(
      new Func1<List<Notification<InstanceInfo>>, Observable<Void>>() {
        @Override
        public Observable<Void> call(List<Notification<InstanceInfo>> notifications) {
          // If completed with error propagate it
          Notification<InstanceInfo> lastNotification = notifications.get(notifications.size() - 1);
          if (lastNotification.getKind() == Kind.OnError) {
            return Observable.error(lastNotification.getThrowable());
          }
          // If onComplete only => instance info not found
          if (notifications.size() == 1) {
            logger.info("Instance info with id {} not found", instanceId);
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return Observable.empty();
          }
          // InstanceInfo object found
          InstanceInfo v1InstanceInfo = notifications.get(0).getValue();
          if (appName != null && !appName.equalsIgnoreCase(v1InstanceInfo.getAppName())) {
            logger.info("Instance info with id {} is associated with application {}, not {}",
                instanceId, v1InstanceInfo.getAppName(), appName);
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return Observable.empty();
          }
          return encodeResponse(format, gzip, response, v1InstanceInfo);
        }
      }
  );
}

代码示例来源:origin: davidmoten/rxjava-extras

@Override
  public Observable<Out> call() {
    Mutable<State> state = new Mutable<State>(initialState.call());
    return source.materialize()
        // do state transitions and emit notifications
        // use flatMap to emit notification values
        .flatMap(execute(transition, completion, state, backpressureStrategy),
            initialRequest)
        // complete if we encounter an unsubscribed sentinel
        .takeWhile(NOT_UNSUBSCRIBED)
        // flatten notifications to a stream which will enable
        // early termination from the state machine if desired
        .dematerialize();
  }
});

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
  public Observable<Out> call() {
    Mutable<State> state = new Mutable<State>(initialState.call());
    return source.materialize()
        // do state transitions and emit notifications
        // use flatMap to emit notification values
        .flatMap(execute(transition, completion, state, backpressureStrategy),
            initialRequest)
        // complete if we encounter an unsubscribed sentinel
        .takeWhile(NOT_UNSUBSCRIBED)
        // flatten notifications to a stream which will enable
        // early termination from the state machine if desired
        .dematerialize();
  }
});

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_87() throws Exception {
  Observable<Notification<Integer>> notifications = Observable
      .just(3, 0, 2, 0, 1, 0)
      .concatMapDelayError(x -> fromCallable(() -> 100 / x))
      .materialize();
  List<Notification.Kind> kinds = notifications
      .map(Notification::getKind)
      .toList()
      .toBlocking()
      .single();
  assertThat(kinds).containsExactly(OnNext, OnNext, OnNext, OnError);
}

相关文章

Observable类方法