本文整理了Java中io.reactivex.Observable.wrap()
方法的一些代码示例,展示了Observable.wrap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.wrap()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:wrap
[英]Wraps an ObservableSource into an Observable if not already an Observable. Scheduler: wrap does not operate by default on a particular Scheduler.
[中]将可观察资源包装为可观察的(如果尚未是可观察的)。调度程序:默认情况下,wrap不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<R> apply(Observable<T> t) throws Exception {
ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
return Observable.wrap(apply).observeOn(scheduler);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
materialized.subscribe(lio);
return lio;
}
代码示例来源:origin: redisson/redisson
@Override
public ObservableSource<R> apply(Observable<T> t) throws Exception {
ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
return Observable.wrap(apply).observeOn(scheduler);
}
}
代码示例来源:origin: redisson/redisson
@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
materialized.subscribe(lio);
return lio;
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Transform an ObservableSource by applying a particular Transformer function to it.
* <p>
* This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
* Observers.
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the value type of the output ObservableSource
* @param composer implements the function that transforms the source ObservableSource
* @return the source ObservableSource, transformed by the transformer function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
* ObservableSources and then drains them in order, each one after the previous one completes.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrently running inner ObservableSources; Integer.MAX_VALUE
* is interpreted as all inner ObservableSources can be active at the same time
* @param prefetch the number of elements to prefetch from each inner ObservableSource source
* @return the new ObservableSource instance with the specified concatenation behavior
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch) {
return wrap(sources).concatMapEager((Function)Functions.identity(), maxConcurrency, prefetch);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates a variable number of ObservableSource sources and delays errors from any of them
* till all terminate.
* <p>
* <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of sources
* @param <T> the common base value type
* @return the new Observable instance
* @throws NullPointerException if sources is null
*/
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return (Observable<T>)wrap(sources[0]);
}
return concatDelayError(fromArray(sources));
}
代码示例来源:origin: redisson/redisson
/**
* Transform an ObservableSource by applying a particular Transformer function to it.
* <p>
* This method operates on the ObservableSource itself whereas {@link #lift} operates on the ObservableSource's
* Observers.
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* ObservableSource, use {@link #lift}. If your operator is designed to transform the source ObservableSource as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the value type of the output ObservableSource
* @param composer implements the function that transforms the source ObservableSource
* @return the source ObservableSource, transformed by the transformer function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void wrap() {
Observable.wrap(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onNext(4);
observer.onNext(5);
observer.onComplete();
}
})
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrash() {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
return (Observable<T>)wrap(sources[0]);
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates a variable number of ObservableSource sources.
* <p>
* Note: named this way because of overload conflict with concat(ObservableSource<ObservableSource>)
* <p>
* <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of sources
* @param <T> the common base value type
* @return the new Observable instance
* @throws NullPointerException if sources is null
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return wrap((ObservableSource<T>)sources[0]);
}
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrashConditional() {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onError(new TestException());
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void ignoreCancel() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext(1);
observer.onNext(2);
observer.onError(new IOException());
observer.onComplete();
}
})
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object e) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void ignoreCancel() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onError(new IOException());
observer.onComplete();
}
})
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer a, Integer b) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class, 1);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorAfterCrashConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onError(new TestException());
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrashConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void ignoreCancelConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext(1);
observer.onNext(2);
observer.onError(new IOException());
observer.onComplete();
}
})
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object e) throws Exception {
throw new TestException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!