本文整理了Java中io.reactivex.Flowable.subscribeWith()
方法的一些代码示例,展示了Flowable.subscribeWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribeWith()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribeWith
[英]Subscribes a given Subscriber (subclass) to this Flowable and returns the given Subscriber as is.
Usage example:
Flowable<Integer> source = Flowable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() {
// ...
};
composite.add(source.subscribeWith(rs));
Backpressure: The backpressure behavior/expectation is determined by the supplied Subscriber. Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定订阅者(子类)订阅到此Flowable并按原样返回给定订阅者。
用法示例:
Flowable<Integer> source = Flowable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() {
// ...
};
composite.add(source.subscribeWith(rs));
背压:背压行为/预期由提供的订户决定。调度程序:默认情况下,SubscribeWidth不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test
public void range() {
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
Flowable.<Integer>empty()
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void just() {
Flowable.just(1)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withFlowable() {
Flowable.range(1, 10)
.subscribeWith(new TestSubscriber<Integer>())
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeConditional() {
Flowable.range(1, 5)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyConditional() {
Flowable.<Integer>empty()
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justConditional() {
Flowable.just(1)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorConditional() {
Flowable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusionCrash() {
MulticastProcessor<Integer> mp = Flowable.range(1, 5)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new IOException();
}
})
.subscribeWith(MulticastProcessor.<Integer>create());
mp.test().assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalSlowPathCancel() {
Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
.filter(Functions.alwaysTrue())
.subscribeWith(new TestSubscriber<Integer>(5L) {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cancel();
onComplete();
}
}
})
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposeInOnNext() {
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
BehaviorProcessor.createDefault(1)
.debounce(new Function<Integer, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Integer o) throws Exception {
ts.cancel();
return Flowable.never();
}
})
.subscribeWith(ts)
.assertEmpty();
assertTrue(ts.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposedInOnComplete() {
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
ts.cancel();
subscriber.onComplete();
}
}
.debounce(Functions.justFunction(Flowable.never()))
.subscribeWith(ts)
.assertEmpty();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSuccessSlowPath() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final SingleSubject<Integer> cs = SingleSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
pp.onNext(1);
pp.onNext(3);
pp.onComplete();
ts.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextWindowMissingBackpressureDrainOnSize() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Flowable<Integer>> ts = pp.window(1, TimeUnit.MINUTES, 1)
.subscribeWith(new TestSubscriber<Flowable<Integer>>(2) {
int calls;
@Override
public void onNext(Flowable<Integer> t) {
super.onNext(t);
if (++calls == 2) {
pp.onNext(2);
}
}
});
pp.onNext(1);
ts.assertValueCount(2)
.assertError(MissingBackpressureException.class)
.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSuccessSlowPath() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
pp.onNext(1);
pp.onNext(3);
pp.onComplete();
ts.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void drainExactRequestCancel() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final SingleSubject<Integer> cs = SingleSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs)
.limit(2)
.subscribeWith(new TestSubscriber<Integer>(2) {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
pp.onNext(1);
pp.onComplete();
ts.request(2);
ts.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void drainExactRequestCancel() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs)
.limit(2)
.subscribeWith(new TestSubscriber<Integer>(2) {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
pp.onNext(1);
pp.onComplete();
ts.request(2);
ts.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onSuccessSlowPathBackpressured() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>(1) {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
cs.onSuccess(2);
}
}
});
pp.onNext(1);
pp.onNext(3);
pp.onComplete();
ts.request(2);
ts.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextSlowPath() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final MaybeSubject<Integer> cs = MaybeSubject.create();
TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
pp.onNext(2);
}
}
});
pp.onNext(1);
cs.onSuccess(3);
pp.onNext(4);
pp.onComplete();
ts.assertResult(1, 2, 3, 4);
}
内容来源于网络,如有侵权,请联系作者删除!