io.reactivex.Observable.cast()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中io.reactivex.Observable.cast()方法的一些代码示例,展示了Observable.cast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cast()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:cast

Observable.cast介绍

[英]Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified type.

Scheduler: cast does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源ObservableSource发出的项,并转换为指定的类型。
调度程序:默认情况下,cast不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Object> apply(Observable<? extends Throwable> t1) {
  3. return t1.map(new Function<Throwable, Integer>() {
  4. @Override
  5. public Integer apply(Throwable t1) {
  6. return 0;
  7. }
  8. }).startWith(0).cast(Object.class);
  9. }
  10. }).subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void castNull() {
  3. just1.cast(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCastWithWrongType() {
  3. Observable<?> source = Observable.just(1, 2);
  4. Observable<Boolean> observable = source.cast(Boolean.class);
  5. Observer<Boolean> observer = TestHelper.mockObserver();
  6. observable.subscribe(observer);
  7. verify(observer, times(1)).onError(
  8. any(ClassCastException.class));
  9. }
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCast() {
  3. Observable<?> source = Observable.just(1, 2);
  4. Observable<Integer> observable = source.cast(Integer.class);
  5. Observer<Integer> observer = TestHelper.mockObserver();
  6. observable.subscribe(observer);
  7. verify(observer, times(1)).onNext(1);
  8. verify(observer, times(1)).onNext(1);
  9. verify(observer, never()).onError(
  10. any(Throwable.class));
  11. verify(observer, times(1)).onComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Object> apply(Observable<? extends Throwable> attempts) {
  3. // Worker w = Schedulers.computation().createWorker();
  4. return attempts
  5. .map(new Function<Throwable, Tuple>() {
  6. @Override
  7. public Tuple apply(Throwable n) {
  8. return new Tuple(new Long(1), n);
  9. }})
  10. .scan(new BiFunction<Tuple, Tuple, Tuple>() {
  11. @Override
  12. public Tuple apply(Tuple t, Tuple n) {
  13. return new Tuple(t.count + n.count, n.n);
  14. }})
  15. .flatMap(new Function<Tuple, Observable<Long>>() {
  16. @Override
  17. public Observable<Long> apply(Tuple t) {
  18. System.out.println("Retry # " + t.count);
  19. return t.count > 20 ?
  20. Observable.<Long>error(t.n) :
  21. Observable.timer(t.count * 1L, TimeUnit.MILLISECONDS);
  22. }}).cast(Object.class);
  23. }
  24. }).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Filters the items emitted by an ObservableSource, only emitting those of the specified type.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param <U> the output type
  11. * @param clazz
  12. * the class type to filter the items emitted by the source ObservableSource
  13. * @return an Observable that emits items from the source ObservableSource of type {@code clazz}
  14. * @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
  15. */
  16. @CheckReturnValue
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public final <U> Observable<U> ofType(final Class<U> clazz) {
  19. ObjectHelper.requireNonNull(clazz, "clazz is null");
  20. return filter(Functions.isInstanceOf(clazz)).cast(clazz);
  21. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Filters the items emitted by an ObservableSource, only emitting those of the specified type.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param <U> the output type
  11. * @param clazz
  12. * the class type to filter the items emitted by the source ObservableSource
  13. * @return an Observable that emits items from the source ObservableSource of type {@code clazz}
  14. * @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
  15. */
  16. @CheckReturnValue
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public final <U> Observable<U> ofType(final Class<U> clazz) {
  19. ObjectHelper.requireNonNull(clazz, "clazz is null");
  20. return filter(Functions.isInstanceOf(clazz)).cast(clazz);
  21. }

代码示例来源:origin: leftcoding/GankLy

  1. /**
  2. * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
  3. *
  4. * @param code 事件code
  5. * @param eventType 事件类型
  6. * @param <T>
  7. * @return
  8. */
  9. public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
  10. return bus.ofType(Message.class)
  11. .filter(message -> message.getCode() == code && eventType.isInstance(message.getObject()))
  12. .map(message -> message.getObject())
  13. .cast(eventType);
  14. }

代码示例来源:origin: Cognifide/knotx

  1. private void deployVerticles(JsonObject config, Future<Void> completion) {
  2. LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
  3. Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
  4. .cast(String.class)
  5. .map(ModuleDescriptor::parse)
  6. .flatMap(item -> deployVerticle(config, item))
  7. .reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
  8. accumulator.add(item);
  9. return accumulator;
  10. })
  11. .subscribe(
  12. deployments -> {
  13. deployedModules = Lists.newArrayList(deployments);
  14. LOGGER.info("Knot.x STARTED {}", buildMessage());
  15. if (completion != null) {
  16. completion.complete();
  17. }
  18. },
  19. error -> {
  20. LOGGER.error("Verticle could not be deployed", error);
  21. if (completion != null) {
  22. completion.fail(error);
  23. }
  24. }
  25. );
  26. }

代码示例来源:origin: io.knotx/knotx-core

  1. private void deployVerticles(JsonObject config, Future<Void> completion) {
  2. LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
  3. Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
  4. .cast(String.class)
  5. .map(ModuleDescriptor::parse)
  6. .flatMap(item -> deployVerticle(config, item))
  7. .reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
  8. accumulator.add(item);
  9. return accumulator;
  10. })
  11. .subscribe(
  12. deployments -> {
  13. deployedModules = Lists.newArrayList(deployments);
  14. LOGGER.info("Knot.x STARTED {}", buildMessage());
  15. if (completion != null) {
  16. completion.complete();
  17. }
  18. },
  19. error -> {
  20. LOGGER.error("Verticle could not be deployed", error);
  21. if (completion != null) {
  22. completion.fail(error);
  23. }
  24. }
  25. );
  26. }

代码示例来源:origin: cescoffier/vertx-kubernetes-workshop

  1. /**
  2. * This method is called when the verticle is deployed.
  3. */
  4. @Override
  5. public void start(Future<Void> future) {
  6. discovery = ServiceDiscovery.create(vertx);
  7. ConfigRetriever retriever = ConfigRetriever.create(vertx, getConfigurationOptions());
  8. retriever.rxGetConfig()
  9. // Read the configuration, and deploy a MarketDataVerticle for each company listed in the configuration.
  10. .flatMap(config ->
  11. Observable.fromIterable(config.getJsonArray("companies"))
  12. .cast(JsonObject.class)
  13. // Deploy the verticle with a configuration.
  14. .flatMapSingle(company -> vertx.rxDeployVerticle(MarketDataVerticle.class.getName(),
  15. new DeploymentOptions().setConfig(company)))
  16. .toList()
  17. )
  18. // Deploy another verticle
  19. .flatMap(l -> vertx.rxDeployVerticle(RestQuoteAPIVerticle.class.getName()))
  20. // Expose the market-data message source
  21. .flatMap(x -> discovery.rxPublish(MessageSource.createRecord("market-data", ADDRESS)))
  22. .subscribe((rec, err) -> {
  23. if (rec != null) {
  24. this.record = rec;
  25. future.complete();
  26. } else {
  27. future.fail(err);
  28. }
  29. });
  30. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() throws Exception {
  3. Observable<String> first = Observable.fromCallable(() -> "HEY").delay(250, TimeUnit.MILLISECONDS);
  4. Observable<Integer> second = Observable.fromCallable(() -> 1).delay(350, TimeUnit.MILLISECONDS);
  5. List<Observable<?>> observables = com.google.common.collect.Lists.newArrayList(first, second);
  6. Map<Long, Object> someWeirdMapWithObject = com.google.common.collect.ImmutableMap.of(
  7. 1L, new BrandBuilder(1),
  8. 2L, new BrandBuilder(2)
  9. );
  10. Observable
  11. .fromIterable(observables)
  12. .flatMap(task -> task.observeOn(Schedulers.computation()))
  13. // wait for all tasks to finish
  14. .lastOrError()
  15. .flattenAsObservable(x -> someWeirdMapWithObject.values())
  16. .<BrandBuilder>cast(BrandBuilder.class)
  17. .map(BrandBuilder::build)
  18. .toList().blockingGet();
  19. }

相关文章

Observable类方法