
x33g5p2x  于2022-01-25 转载在 其他  



[英]Converts an ObservableSource that emits ObservableSources into an ObservableSource that emits the items emitted by the most recently emitted of those ObservableSources.

switchOnNext subscribes to an ObservableSource that emits ObservableSources. Each time it observes one of these emitted ObservableSources, the ObservableSource returned by switchOnNext begins emitting the items emitted by that ObservableSource. When a new ObservableSource is emitted, switchOnNext stops emitting items from the earlier-emitted ObservableSource and begins emitting items from the new one.

The resulting ObservableSource completes if both the outer ObservableSource and the last inner ObservableSource, if any, complete. If the outer ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence. Scheduler: switchOnNext does not operate by default on a particular Scheduler.


代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void switchOnNextNull() {

代码示例来源:origin: ReactiveX/RxJava

 * Converts an ObservableSource that emits ObservableSources into an ObservableSource that emits the items emitted by the
 * most recently emitted of those ObservableSources.
 * <p>
 * <img width="640" height="370" src="" alt="">
 * <p>
 * {@code switchOnNext} subscribes to an ObservableSource that emits ObservableSources. Each time it observes one of
 * these emitted ObservableSources, the ObservableSource returned by {@code switchOnNext} begins emitting the items
 * emitted by that ObservableSource. When a new ObservableSource is emitted, {@code switchOnNext} stops emitting items
 * from the earlier-emitted ObservableSource and begins emitting items from the new one.
 * <p>
 * The resulting ObservableSource completes if both the outer ObservableSource and the last inner ObservableSource, if any, complete.
 * If the outer ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the item type
 * @param sources
 *            the source ObservableSource that emits ObservableSources
 * @return an Observable that emits the items emitted by the ObservableSource most recently emitted by the source
 *         ObservableSource
 * @see <a href="">ReactiveX operators documentation: Switch</a>
public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return switchOnNext(sources, bufferSize());

代码示例来源:origin: ReactiveX/RxJava

public void scalarMap() {

代码示例来源:origin: ReactiveX/RxJava

public void dispose() {

代码示例来源:origin: ReactiveX/RxJava

public void testSwitchWhenOuterCompleteBeforeInner() {
  Observable<Observable<String>> source = Observable.unsafeCreate(new ObservableSource<Observable<String>>() {
    public void subscribe(Observer<? super Observable<String>> outerObserver) {
      publishNext(outerObserver, 50, Observable.unsafeCreate(new ObservableSource<String>() {
        public void subscribe(Observer<? super String> innerObserver) {
          publishNext(innerObserver, 70, "one");
          publishNext(innerObserver, 100, "two");
          publishCompleted(innerObserver, 200);
      publishCompleted(outerObserver, 60);
  Observable<String> sampled = Observable.switchOnNext(source);
  InOrder inOrder = inOrder(observer);
  scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
  inOrder.verify(observer, times(2)).onNext(anyString());
  inOrder.verify(observer, times(1)).onComplete();

代码示例来源:origin: ReactiveX/RxJava

public void testUnsubscribe() {
  final AtomicBoolean isUnsubscribed = new AtomicBoolean();
      Observable.unsafeCreate(new ObservableSource<Observable<Integer>>() {
        public void subscribe(final Observer<? super Observable<Integer>> observer) {
          Disposable bs = Disposables.empty();
  assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
/** The upstream producer hijacked the switch producer stopping the requests aimed at the inner observables. */

代码示例来源:origin: redisson/redisson

 * Converts an ObservableSource that emits ObservableSources into an ObservableSource that emits the items emitted by the
 * most recently emitted of those ObservableSources.
 * <p>
 * <img width="640" height="370" src="" alt="">
 * <p>
 * {@code switchOnNext} subscribes to an ObservableSource that emits ObservableSources. Each time it observes one of
 * these emitted ObservableSources, the ObservableSource returned by {@code switchOnNext} begins emitting the items
 * emitted by that ObservableSource. When a new ObservableSource is emitted, {@code switchOnNext} stops emitting items
 * from the earlier-emitted ObservableSource and begins emitting items from the new one.
 * <p>
 * The resulting ObservableSource completes if both the outer ObservableSource and the last inner ObservableSource, if any, complete.
 * If the outer ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the item type
 * @param sources
 *            the source ObservableSource that emits ObservableSources
 * @return an Observable that emits the items emitted by the ObservableSource most recently emitted by the source
 *         ObservableSource
 * @see <a href="">ReactiveX operators documentation: Switch</a>
public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return switchOnNext(sources, bufferSize());

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

代码示例来源:origin: ReactiveX/RxJava

Observable<String> sampled = Observable.switchOnNext(source);

