io.reactivex.Observable.wrap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.5k)|赞(0)|评价(0)|浏览(163)

本文整理了Java中io.reactivex.Observable.wrap()方法的一些代码示例,展示了Observable.wrap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.wrap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:wrap

Observable.wrap介绍

[英]Wraps an ObservableSource into an Observable if not already an Observable. Scheduler: wrap does not operate by default on a particular Scheduler.
[中]将可观察资源包装为可观察的(如果尚未是可观察的)。调度程序:默认情况下,wrap不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<R> apply(Observable<T> t) throws Exception {
    ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
    return Observable.wrap(apply).observeOn(scheduler);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Iterator<T> iterator() {
  BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  materialized.subscribe(lio);
  return lio;
}

代码示例来源:origin: redisson/redisson

@Override
  public ObservableSource<R> apply(Observable<T> t) throws Exception {
    ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
    return Observable.wrap(apply).observeOn(scheduler);
  }
}

代码示例来源:origin: redisson/redisson

@Override
public Iterator<T> iterator() {
  BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  materialized.subscribe(lio);
  return lio;
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Transform an ObservableSource by applying a particular Transformer function to it.
 * <p>
 * This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
 * Observers.
 * <p>
 * If the operator you are creating is designed to act on the individual items emitted by a source
 * ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
 * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <R> the value type of the output ObservableSource
 * @param composer implements the function that transforms the source ObservableSource
 * @return the source ObservableSource, transformed by the transformer function
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
 */
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
  return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
 * ObservableSources and then drains them in order, each one after the previous one completes.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources a sequence of ObservableSources that need to be eagerly concatenated
 * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; Integer.MAX_VALUE
 *                       is interpreted as all inner ObservableSources can be active at the same time
 * @param prefetch the number of elements to prefetch from each inner ObservableSource source
 * @return the new ObservableSource instance with the specified concatenation behavior
 * @since 2.0
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch) {
  return wrap(sources).concatMapEager((Function)Functions.identity(), maxConcurrency, prefetch);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates a variable number of ObservableSource sources and delays errors from any of them
 * till all terminate.
 * <p>
 * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param sources the array of sources
 * @param <T> the common base value type
 * @return the new Observable instance
 * @throws NullPointerException if sources is null
 */
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  if (sources.length == 0) {
    return empty();
  } else
  if (sources.length == 1) {
    return (Observable<T>)wrap(sources[0]);
  }
  return concatDelayError(fromArray(sources));
}

代码示例来源:origin: redisson/redisson

/**
 * Transform an ObservableSource by applying a particular Transformer function to it.
 * <p>
 * This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
 * Observers.
 * <p>
 * If the operator you are creating is designed to act on the individual items emitted by a source
 * ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
 * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <R> the value type of the output ObservableSource
 * @param composer implements the function that transforms the source ObservableSource
 * @return the source ObservableSource, transformed by the transformer function
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
 */
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
  return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void wrap() {
  Observable.wrap(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onNext(1);
      observer.onNext(2);
      observer.onNext(3);
      observer.onNext(4);
      observer.onNext(5);
      observer.onComplete();
    }
  })
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrash() {
  Observable.wrap(new ObservableSource<Object>() {
    @Override
    public void subscribe(Observer<? super Object> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

return (Observable<T>)wrap(sources[0]);

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates a variable number of ObservableSource sources.
 * <p>
 * Note: named this way because of overload conflict with concat(ObservableSource&lt;ObservableSource&gt;)
 * <p>
 * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param sources the array of sources
 * @param <T> the common base value type
 * @return the new Observable instance
 * @throws NullPointerException if sources is null
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
  if (sources.length == 0) {
    return empty();
  } else
  if (sources.length == 1) {
    return wrap((ObservableSource<T>)sources[0]);
  }
  return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrashConditional() {
  Observable.wrap(new ObservableSource<Object>() {
    @Override
    public void subscribe(Observer<? super Object> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void ignoreCancel() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onNext(1);
        observer.onNext(2);
        observer.onError(new IOException());
        observer.onComplete();
      }
    })
    .doOnNext(new Consumer<Object>() {
      @Override
      public void accept(Object e) throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void ignoreCancel() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onNext(1);
        observer.onNext(2);
        observer.onNext(3);
        observer.onError(new IOException());
        observer.onComplete();
      }
    })
    .distinctUntilChanged(new BiPredicate<Integer, Integer>() {
      @Override
      public boolean test(Integer a, Integer b) throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertFailure(TestException.class, 1);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void ignoreCancelConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onNext(1);
        observer.onNext(2);
        observer.onError(new IOException());
        observer.onComplete();
      }
    })
    .doOnNext(new Consumer<Object>() {
      @Override
      public void accept(Object e) throws Exception {
        throw new TestException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章

Observable类方法