本文整理了Java中io.reactivex.Observable.subscribeWith()
方法的一些代码示例,展示了Observable.subscribeWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribeWith()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribeWith
[英]Subscribes a given Observer (subclass) to this Observable and returns the given Observer as is.
Usage example:
Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
DisposableObserver<Integer> ds = new DisposableObserver<>() {
// ...
};
composite.add(source.subscribeWith(ds));
Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定的观察者(子类)订阅到此可观察对象,并按原样返回给定的观察者。
用法示例:
Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
DisposableObserver<Integer> ds = new DisposableObserver<>() {
// ...
};
composite.add(source.subscribeWith(ds));
调度程序:默认情况下,subscribeWith不会在特定调度程序上运行。
代码示例来源:origin: android10/Android-CleanArchitecture
/**
* Executes the current use case.
*
* @param observer {@link DisposableObserver} which will be listening to the observable build
* by {@link #buildUseCaseObservable(Params)} ()} method.
* @param params Parameters (Optional) used to build/execute this use case.
*/
public void execute(DisposableObserver<T> observer, Params params) {
Preconditions.checkNotNull(observer);
final Observable<T> observable = this.buildUseCaseObservable(params)
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler());
addDisposable(observable.subscribeWith(observer));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void just() {
Observable.just(1)
.doAfterNext(afterNext)
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justHidden() {
Observable.just(1)
.hide()
.doAfterNext(afterNext)
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justConditional() {
Observable.just(1)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void range() {
Observable.range(1, 5)
.doAfterNext(afterNext)
.subscribeWith(to)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
Observable.<Integer>empty()
.doAfterNext(afterNext)
.subscribeWith(to)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withObservable() {
Observable.range(1, 10)
.subscribeWith(new TestObserver<Integer>())
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Observable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.subscribeWith(to)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeConditional() {
Observable.range(1, 5)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(to)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyConditional() {
Observable.<Integer>empty()
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(to)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorConditional() {
Observable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(to)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void outputFusedOneSignal() {
final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
bs.observeOn(ImmediateThinScheduler.INSTANCE)
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v)
throws Exception {
return Observable.just(v + 1);
}
})
.subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 2) {
bs.onNext(2);
}
}
})
.assertValuesOnly(2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposedInOnComplete() {
final TestObserver<Integer> to = new TestObserver<Integer>();
new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());
to.cancel();
observer.onComplete();
}
}
.debounce(Functions.justFunction(Observable.never()))
.subscribeWith(to)
.assertEmpty();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposeInOnNext() {
final TestObserver<Integer> to = new TestObserver<Integer>();
BehaviorSubject.createDefault(1)
.debounce(new Function<Integer, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Integer o) throws Exception {
to.cancel();
return Observable.never();
}
})
.subscribeWith(to)
.assertEmpty();
assertTrue(to.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSuccessSlowPath() {
final PublishSubject<Integer> ps = PublishSubject.create();
final SingleSubject<Integer> cs = SingleSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
ps.onNext(1);
ps.onNext(3);
ps.onComplete();
to.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSuccessSlowPath() {
final PublishSubject<Integer> ps = PublishSubject.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
ps.onNext(1);
ps.onNext(3);
ps.onComplete();
to.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextSlowPath() {
final PublishSubject<Integer> ps = PublishSubject.create();
final SingleSubject<Integer> cs = SingleSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
ps.onNext(2);
}
}
});
ps.onNext(1);
cs.onSuccess(3);
ps.onNext(4);
ps.onComplete();
to.assertResult(1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextSlowPath() {
final PublishSubject<Integer> ps = PublishSubject.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
ps.onNext(2);
}
}
});
ps.onNext(1);
cs.onSuccess(3);
ps.onNext(4);
ps.onComplete();
to.assertResult(1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextSlowPathCreateQueue() {
final PublishSubject<Integer> ps = PublishSubject.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
ps.onNext(2);
ps.onNext(3);
}
}
});
cs.onSuccess(0);
ps.onNext(1);
ps.onNext(4);
ps.onComplete();
to.assertResult(0, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextSlowPathCreateQueue() {
final PublishSubject<Integer> ps = PublishSubject.create();
final SingleSubject<Integer> cs = SingleSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
ps.onNext(2);
ps.onNext(3);
}
}
});
cs.onSuccess(0);
ps.onNext(1);
ps.onNext(4);
ps.onComplete();
to.assertResult(0, 1, 2, 3, 4);
}
}
内容来源于网络,如有侵权,请联系作者删除!