io.reactivex.Observable.subscribeWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(186)

本文整理了Java中io.reactivex.Observable.subscribeWith()方法的一些代码示例,展示了Observable.subscribeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribeWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribeWith

Observable.subscribeWith介绍

[英]Subscribes a given Observer (subclass) to this Observable and returns the given Observer as is.

Usage example:

  1. Observable<Integer> source = Observable.range(1, 10);
  2. CompositeDisposable composite = new CompositeDisposable();
  3. DisposableObserver<Integer> ds = new DisposableObserver<>() {
  4. // ...
  5. };
  6. composite.add(source.subscribeWith(ds));

Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定的观察者(子类)订阅到此可观察对象,并按原样返回给定的观察者。
用法示例:

  1. Observable<Integer> source = Observable.range(1, 10);
  2. CompositeDisposable composite = new CompositeDisposable();
  3. DisposableObserver<Integer> ds = new DisposableObserver<>() {
  4. // ...
  5. };
  6. composite.add(source.subscribeWith(ds));

调度程序:默认情况下,subscribeWith不会在特定调度程序上运行。

代码示例

代码示例来源:origin: android10/Android-CleanArchitecture

  1. /**
  2. * Executes the current use case.
  3. *
  4. * @param observer {@link DisposableObserver} which will be listening to the observable build
  5. * by {@link #buildUseCaseObservable(Params)} ()} method.
  6. * @param params Parameters (Optional) used to build/execute this use case.
  7. */
  8. public void execute(DisposableObserver<T> observer, Params params) {
  9. Preconditions.checkNotNull(observer);
  10. final Observable<T> observable = this.buildUseCaseObservable(params)
  11. .subscribeOn(Schedulers.from(threadExecutor))
  12. .observeOn(postExecutionThread.getScheduler());
  13. addDisposable(observable.subscribeWith(observer));
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void just() {
  3. Observable.just(1)
  4. .doAfterNext(afterNext)
  5. .subscribeWith(to)
  6. .assertResult(1);
  7. assertEquals(Arrays.asList(1, -1), values);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void justHidden() {
  3. Observable.just(1)
  4. .hide()
  5. .doAfterNext(afterNext)
  6. .subscribeWith(to)
  7. .assertResult(1);
  8. assertEquals(Arrays.asList(1, -1), values);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void justConditional() {
  3. Observable.just(1)
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(to)
  7. .assertResult(1);
  8. assertEquals(Arrays.asList(1, -1), values);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void range() {
  3. Observable.range(1, 5)
  4. .doAfterNext(afterNext)
  5. .subscribeWith(to)
  6. .assertResult(1, 2, 3, 4, 5);
  7. assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void empty() {
  3. Observable.<Integer>empty()
  4. .doAfterNext(afterNext)
  5. .subscribeWith(to)
  6. .assertResult();
  7. assertTrue(values.isEmpty());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void withObservable() {
  3. Observable.range(1, 10)
  4. .subscribeWith(new TestObserver<Integer>())
  5. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Observable.<Integer>error(new TestException())
  4. .doAfterNext(afterNext)
  5. .subscribeWith(to)
  6. .assertFailure(TestException.class);
  7. assertTrue(values.isEmpty());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeConditional() {
  3. Observable.range(1, 5)
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(to)
  7. .assertResult(1, 2, 3, 4, 5);
  8. assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emptyConditional() {
  3. Observable.<Integer>empty()
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(to)
  7. .assertResult();
  8. assertTrue(values.isEmpty());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorConditional() {
  3. Observable.<Integer>error(new TestException())
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(to)
  7. .assertFailure(TestException.class);
  8. assertTrue(values.isEmpty());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void outputFusedOneSignal() {
  3. final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
  4. bs.observeOn(ImmediateThinScheduler.INSTANCE)
  5. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer v)
  8. throws Exception {
  9. return Observable.just(v + 1);
  10. }
  11. })
  12. .subscribeWith(new TestObserver<Integer>() {
  13. @Override
  14. public void onNext(Integer t) {
  15. super.onNext(t);
  16. if (t == 2) {
  17. bs.onNext(2);
  18. }
  19. }
  20. })
  21. .assertValuesOnly(2, 3);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposedInOnComplete() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. new Observable<Integer>() {
  5. @Override
  6. protected void subscribeActual(Observer<? super Integer> observer) {
  7. observer.onSubscribe(Disposables.empty());
  8. to.cancel();
  9. observer.onComplete();
  10. }
  11. }
  12. .debounce(Functions.justFunction(Observable.never()))
  13. .subscribeWith(to)
  14. .assertEmpty();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposeInOnNext() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. BehaviorSubject.createDefault(1)
  5. .debounce(new Function<Integer, ObservableSource<Object>>() {
  6. @Override
  7. public ObservableSource<Object> apply(Integer o) throws Exception {
  8. to.cancel();
  9. return Observable.never();
  10. }
  11. })
  12. .subscribeWith(to)
  13. .assertEmpty();
  14. assertTrue(to.isDisposed());
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onSuccessSlowPath() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final SingleSubject<Integer> cs = SingleSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cs.onSuccess(2);
  11. }
  12. }
  13. });
  14. ps.onNext(1);
  15. ps.onNext(3);
  16. ps.onComplete();
  17. to.assertResult(1, 2, 3);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onSuccessSlowPath() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cs.onSuccess(2);
  11. }
  12. }
  13. });
  14. ps.onNext(1);
  15. ps.onNext(3);
  16. ps.onComplete();
  17. to.assertResult(1, 2, 3);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextSlowPath() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final SingleSubject<Integer> cs = SingleSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. ps.onNext(2);
  11. }
  12. }
  13. });
  14. ps.onNext(1);
  15. cs.onSuccess(3);
  16. ps.onNext(4);
  17. ps.onComplete();
  18. to.assertResult(1, 2, 3, 4);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextSlowPath() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. ps.onNext(2);
  11. }
  12. }
  13. });
  14. ps.onNext(1);
  15. cs.onSuccess(3);
  16. ps.onNext(4);
  17. ps.onComplete();
  18. to.assertResult(1, 2, 3, 4);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextSlowPathCreateQueue() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. ps.onNext(2);
  11. ps.onNext(3);
  12. }
  13. }
  14. });
  15. cs.onSuccess(0);
  16. ps.onNext(1);
  17. ps.onNext(4);
  18. ps.onComplete();
  19. to.assertResult(0, 1, 2, 3, 4);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextSlowPathCreateQueue() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. final SingleSubject<Integer> cs = SingleSubject.create();
  5. TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. ps.onNext(2);
  11. ps.onNext(3);
  12. }
  13. }
  14. });
  15. cs.onSuccess(0);
  16. ps.onNext(1);
  17. ps.onNext(4);
  18. ps.onComplete();
  19. to.assertResult(0, 1, 2, 3, 4);
  20. }
  21. }

相关文章

Observable类方法