io.reactivex.Observable.wrap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.5k)|赞(0)|评价(0)|浏览(184)

本文整理了Java中io.reactivex.Observable.wrap()方法的一些代码示例,展示了Observable.wrap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.wrap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:wrap

Observable.wrap介绍

[英]Wraps an ObservableSource into an Observable if not already an Observable. Scheduler: wrap does not operate by default on a particular Scheduler.
[中]将可观察资源包装为可观察的(如果尚未是可观察的)。调度程序:默认情况下,wrap不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<R> apply(Observable<T> t) throws Exception {
  3. ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
  4. return Observable.wrap(apply).observeOn(scheduler);
  5. }
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Iterator<T> iterator() {
  3. BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  4. Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  5. materialized.subscribe(lio);
  6. return lio;
  7. }

代码示例来源:origin: redisson/redisson

  1. @Override
  2. public ObservableSource<R> apply(Observable<T> t) throws Exception {
  3. ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
  4. return Observable.wrap(apply).observeOn(scheduler);
  5. }
  6. }

代码示例来源:origin: redisson/redisson

  1. @Override
  2. public Iterator<T> iterator() {
  3. BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  4. Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  5. materialized.subscribe(lio);
  6. return lio;
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Transform an ObservableSource by applying a particular Transformer function to it.
  3. * <p>
  4. * This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
  5. * Observers.
  6. * <p>
  7. * If the operator you are creating is designed to act on the individual items emitted by a source
  8. * ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
  9. * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param <R> the value type of the output ObservableSource
  16. * @param composer implements the function that transforms the source ObservableSource
  17. * @return the source ObservableSource, transformed by the transformer function
  18. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
  19. */
  20. @SuppressWarnings("unchecked")
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
  24. return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
  6. * ObservableSources and then drains them in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; Integer.MAX_VALUE
  16. * is interpreted as all inner ObservableSources can be active at the same time
  17. * @param prefetch the number of elements to prefetch from each inner ObservableSource source
  18. * @return the new ObservableSource instance with the specified concatenation behavior
  19. * @since 2.0
  20. */
  21. @SuppressWarnings({ "unchecked", "rawtypes" })
  22. @CheckReturnValue
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch) {
  25. return wrap(sources).concatMapEager((Function)Functions.identity(), maxConcurrency, prefetch);
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a variable number of ObservableSource sources and delays errors from any of them
  3. * till all terminate.
  4. * <p>
  5. * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param sources the array of sources
  11. * @param <T> the common base value type
  12. * @return the new Observable instance
  13. * @throws NullPointerException if sources is null
  14. */
  15. @SuppressWarnings({ "unchecked" })
  16. @CheckReturnValue
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  19. if (sources.length == 0) {
  20. return empty();
  21. } else
  22. if (sources.length == 1) {
  23. return (Observable<T>)wrap(sources[0]);
  24. }
  25. return concatDelayError(fromArray(sources));
  26. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Transform an ObservableSource by applying a particular Transformer function to it.
  3. * <p>
  4. * This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
  5. * Observers.
  6. * <p>
  7. * If the operator you are creating is designed to act on the individual items emitted by a source
  8. * ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
  9. * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param <R> the value type of the output ObservableSource
  16. * @param composer implements the function that transforms the source ObservableSource
  17. * @return the source ObservableSource, transformed by the transformer function
  18. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
  19. */
  20. @SuppressWarnings("unchecked")
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
  24. return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void wrap() {
  3. Observable.wrap(new ObservableSource<Integer>() {
  4. @Override
  5. public void subscribe(Observer<? super Integer> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onNext(1);
  8. observer.onNext(2);
  9. observer.onNext(3);
  10. observer.onNext(4);
  11. observer.onNext(5);
  12. observer.onComplete();
  13. }
  14. })
  15. .test()
  16. .assertResult(1, 2, 3, 4, 5);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrash() {
  3. Observable.wrap(new ObservableSource<Object>() {
  4. @Override
  5. public void subscribe(Observer<? super Object> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .test()
  17. .assertFailure(IOException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. return (Observable<T>)wrap(sources[0]);

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a variable number of ObservableSource sources.
  3. * <p>
  4. * Note: named this way because of overload conflict with concat(ObservableSource&lt;ObservableSource&gt;)
  5. * <p>
  6. * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. * @param sources the array of sources
  12. * @param <T> the common base value type
  13. * @return the new Observable instance
  14. * @throws NullPointerException if sources is null
  15. */
  16. @SuppressWarnings({ "unchecked", "rawtypes" })
  17. @CheckReturnValue
  18. @SchedulerSupport(SchedulerSupport.NONE)
  19. public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
  20. if (sources.length == 0) {
  21. return empty();
  22. } else
  23. if (sources.length == 1) {
  24. return wrap((ObservableSource<T>)sources[0]);
  25. }
  26. return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrashConditional() {
  3. Observable.wrap(new ObservableSource<Object>() {
  4. @Override
  5. public void subscribe(Observer<? super Object> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .filter(Functions.alwaysTrue())
  17. .test()
  18. .assertFailure(IOException.class);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorAfterCrash() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onError(new TestException());
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .test()
  19. .assertFailure(TestException.class);
  20. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  21. } finally {
  22. RxJavaPlugins.reset();
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteAfterCrash() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onComplete();
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .test()
  19. .assertResult();
  20. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  21. } finally {
  22. RxJavaPlugins.reset();
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ignoreCancel() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onNext(1);
  10. observer.onNext(2);
  11. observer.onError(new IOException());
  12. observer.onComplete();
  13. }
  14. })
  15. .doOnNext(new Consumer<Object>() {
  16. @Override
  17. public void accept(Object e) throws Exception {
  18. throw new TestException();
  19. }
  20. })
  21. .test()
  22. .assertFailure(TestException.class);
  23. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  24. } finally {
  25. RxJavaPlugins.reset();
  26. }
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ignoreCancel() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Integer>() {
  6. @Override
  7. public void subscribe(Observer<? super Integer> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onNext(1);
  10. observer.onNext(2);
  11. observer.onNext(3);
  12. observer.onError(new IOException());
  13. observer.onComplete();
  14. }
  15. })
  16. .distinctUntilChanged(new BiPredicate<Integer, Integer>() {
  17. @Override
  18. public boolean test(Integer a, Integer b) throws Exception {
  19. throw new TestException();
  20. }
  21. })
  22. .test()
  23. .assertFailure(TestException.class, 1);
  24. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  25. } finally {
  26. RxJavaPlugins.reset();
  27. }
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorAfterCrashConditional() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onError(new TestException());
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .test()
  20. .assertFailure(TestException.class);
  21. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  22. } finally {
  23. RxJavaPlugins.reset();
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteAfterCrashConditional() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onComplete();
  10. }
  11. })
  12. .doAfterTerminate(new Action() {
  13. @Override
  14. public void run() throws Exception {
  15. throw new IOException();
  16. }
  17. })
  18. .filter(Functions.alwaysTrue())
  19. .test()
  20. .assertResult();
  21. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  22. } finally {
  23. RxJavaPlugins.reset();
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ignoreCancelConditional() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.wrap(new ObservableSource<Object>() {
  6. @Override
  7. public void subscribe(Observer<? super Object> observer) {
  8. observer.onSubscribe(Disposables.empty());
  9. observer.onNext(1);
  10. observer.onNext(2);
  11. observer.onError(new IOException());
  12. observer.onComplete();
  13. }
  14. })
  15. .doOnNext(new Consumer<Object>() {
  16. @Override
  17. public void accept(Object e) throws Exception {
  18. throw new TestException();
  19. }
  20. })
  21. .filter(Functions.alwaysTrue())
  22. .test()
  23. .assertFailure(TestException.class);
  24. TestHelper.assertUndeliverable(errors, 0, IOException.class);
  25. } finally {
  26. RxJavaPlugins.reset();
  27. }
  28. }

相关文章

Observable类方法