io.reactivex.Observable.subscribeWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(164)

本文整理了Java中io.reactivex.Observable.subscribeWith()方法的一些代码示例,展示了Observable.subscribeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribeWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribeWith

Observable.subscribeWith介绍

[英]Subscribes a given Observer (subclass) to this Observable and returns the given Observer as is.

Usage example:

Observable<Integer> source = Observable.range(1, 10); 
CompositeDisposable composite = new CompositeDisposable(); 
DisposableObserver<Integer> ds = new DisposableObserver<>() { 
// ... 
}; 
composite.add(source.subscribeWith(ds));

Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定的观察者(子类)订阅到此可观察对象,并按原样返回给定的观察者。
用法示例:

Observable<Integer> source = Observable.range(1, 10); 
CompositeDisposable composite = new CompositeDisposable(); 
DisposableObserver<Integer> ds = new DisposableObserver<>() { 
// ... 
}; 
composite.add(source.subscribeWith(ds));

调度程序:默认情况下,subscribeWith不会在特定调度程序上运行。

代码示例

代码示例来源:origin: android10/Android-CleanArchitecture

/**
 * Executes the current use case.
 *
 * @param observer {@link DisposableObserver} which will be listening to the observable build
 * by {@link #buildUseCaseObservable(Params)} ()} method.
 * @param params Parameters (Optional) used to build/execute this use case.
 */
public void execute(DisposableObserver<T> observer, Params params) {
 Preconditions.checkNotNull(observer);
 final Observable<T> observable = this.buildUseCaseObservable(params)
   .subscribeOn(Schedulers.from(threadExecutor))
   .observeOn(postExecutionThread.getScheduler());
 addDisposable(observable.subscribeWith(observer));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Observable.just(1)
  .doAfterNext(afterNext)
  .subscribeWith(to)
  .assertResult(1);
  assertEquals(Arrays.asList(1, -1), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void justHidden() {
  Observable.just(1)
  .hide()
  .doAfterNext(afterNext)
  .subscribeWith(to)
  .assertResult(1);
  assertEquals(Arrays.asList(1, -1), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void justConditional() {
  Observable.just(1)
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertResult(1);
  assertEquals(Arrays.asList(1, -1), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void range() {
  Observable.range(1, 5)
  .doAfterNext(afterNext)
  .subscribeWith(to)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void empty() {
  Observable.<Integer>empty()
  .doAfterNext(afterNext)
  .subscribeWith(to)
  .assertResult();
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withObservable() {
  Observable.range(1, 10)
  .subscribeWith(new TestObserver<Integer>())
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Observable.<Integer>error(new TestException())
  .doAfterNext(afterNext)
  .subscribeWith(to)
  .assertFailure(TestException.class);
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeConditional() {
  Observable.range(1, 5)
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emptyConditional() {
  Observable.<Integer>empty()
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertResult();
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorConditional() {
  Observable.<Integer>error(new TestException())
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertFailure(TestException.class);
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void outputFusedOneSignal() {
  final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
  bs.observeOn(ImmediateThinScheduler.INSTANCE)
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v)
        throws Exception {
      return Observable.just(v + 1);
    }
  })
  .subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 2) {
        bs.onNext(2);
      }
    }
  })
  .assertValuesOnly(2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedInOnComplete() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  new Observable<Integer>() {
    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
      observer.onSubscribe(Disposables.empty());
      to.cancel();
      observer.onComplete();
    }
  }
  .debounce(Functions.justFunction(Observable.never()))
  .subscribeWith(to)
  .assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposeInOnNext() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  BehaviorSubject.createDefault(1)
  .debounce(new Function<Integer, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Integer o) throws Exception {
      to.cancel();
      return Observable.never();
    }
  })
  .subscribeWith(to)
  .assertEmpty();
  assertTrue(to.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onSuccessSlowPath() {
  final PublishSubject<Integer> ps = PublishSubject.create();
  final SingleSubject<Integer> cs = SingleSubject.create();
  TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  ps.onNext(1);
  ps.onNext(3);
  ps.onComplete();
  to.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onSuccessSlowPath() {
  final PublishSubject<Integer> ps = PublishSubject.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  ps.onNext(1);
  ps.onNext(3);
  ps.onComplete();
  to.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextSlowPath() {
  final PublishSubject<Integer> ps = PublishSubject.create();
  final SingleSubject<Integer> cs = SingleSubject.create();
  TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        ps.onNext(2);
      }
    }
  });
  ps.onNext(1);
  cs.onSuccess(3);
  ps.onNext(4);
  ps.onComplete();
  to.assertResult(1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextSlowPath() {
  final PublishSubject<Integer> ps = PublishSubject.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        ps.onNext(2);
      }
    }
  });
  ps.onNext(1);
  cs.onSuccess(3);
  ps.onNext(4);
  ps.onComplete();
  to.assertResult(1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextSlowPathCreateQueue() {
  final PublishSubject<Integer> ps = PublishSubject.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        ps.onNext(2);
        ps.onNext(3);
      }
    }
  });
  cs.onSuccess(0);
  ps.onNext(1);
  ps.onNext(4);
  ps.onComplete();
  to.assertResult(0, 1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void onNextSlowPathCreateQueue() {
    final PublishSubject<Integer> ps = PublishSubject.create();
    final SingleSubject<Integer> cs = SingleSubject.create();

    TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
      @Override
      public void onNext(Integer t) {
        super.onNext(t);
        if (t == 1) {
          ps.onNext(2);
          ps.onNext(3);
        }
      }
    });

    cs.onSuccess(0);
    ps.onNext(1);

    ps.onNext(4);
    ps.onComplete();

    to.assertResult(0, 1, 2, 3, 4);
  }
}

相关文章

Observable类方法