本文整理了Java中io.reactivex.Observable.doOnSubscribe()
方法的一些代码示例,展示了Observable.doOnSubscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnSubscribe()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnSubscribe
[英]Modifies the source ObservableSource so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source ObservableSource is reference counted, in which case the source ObservableSource will invoke the given action for the first subscription.
Scheduler: doOnSubscribe does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,使其在从订阅者订阅时调用给定的操作。每个订阅都会导致调用给定的操作,除非源ObservableSource被引用计数,在这种情况下,源ObservableSource将为第一个订阅调用给定的操作。
调度器:默认情况下,doOnSubscribe不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnSubscribeNull() {
just1.doOnSubscribe(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnSubscribe() throws Exception {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> o = Observable.just(1).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
count.incrementAndGet();
}
});
o.subscribe();
o.subscribe();
o.subscribe();
assertEquals(3, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnSubscribe2() throws Exception {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> o = Observable.just(1).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
count.incrementAndGet();
}
}).take(1).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
count.incrementAndGet();
}
});
o.subscribe();
assertEquals(2, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSwitchWhenNotEmpty() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Observable<Integer> o = Observable.just(4)
.switchIfEmpty(Observable.just(2)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.set(true);
}
}));
assertEquals(4, o.blockingSingle().intValue());
assertFalse(subscribed.get());
}
代码示例来源:origin: ReactiveX/RxJava
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
代码示例来源:origin: ReactiveX/RxJava
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
代码示例来源:origin: ReactiveX/RxJava
final AtomicInteger nextCount = new AtomicInteger();
Observable<Integer> r = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
代码示例来源:origin: ReactiveX/RxJava
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
final AtomicLong count = new AtomicLong();
Consumer<Disposable> incrementer = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
count.incrementAndGet();
}
};
//this aync stream should emit first
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.ambArray(o1, o2).subscribe(to);
to.awaitTerminalEvent(5, TimeUnit.SECONDS);
to.assertNoErrors();
assertEquals(2, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
private static <T> Observable<T> composer(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
return source.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
int n = subscriptionCount.getAndIncrement();
if (n >= m) {
Assert.fail("Too many subscriptions! " + (n + 1));
}
}
}).doOnComplete(new Action() {
@Override
public void run() {
int n = subscriptionCount.decrementAndGet();
if (n < 0) {
Assert.fail("Too many unsubscriptions! " + (n - 1));
}
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposedUpfront() {
PublishSubject<Integer> ps = PublishSubject.create();
final AtomicInteger counter = new AtomicInteger();
Observable<Object> timeoutAndFallback = Observable.never().doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
counter.incrementAndGet();
}
});
ps
.timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback))
.test(true)
.assertEmpty();
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposedUpfrontFallback() {
PublishSubject<Object> ps = PublishSubject.create();
final AtomicInteger counter = new AtomicInteger();
Observable<Object> timeoutAndFallback = Observable.never().doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
counter.incrementAndGet();
}
});
ps
.timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback), timeoutAndFallback)
.test(true)
.assertEmpty();
assertEquals(0, counter.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompleteTriggersSubscription() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoPrematureSubscription() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onNext(1);
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoMultipleSubscriptions() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onNext(1);
other.onNext(2);
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void dontSubscribeIfDone2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final int[] count = { 0 };
Observable.combineLatestDelayError(
Arrays.asList(Observable.empty(),
Observable.error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
count[0]++;
}
})
),
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return 0;
}
})
.test()
.assertResult();
assertEquals(0, count[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoPrematureSubscriptionToError() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.<Integer>error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onComplete();
Assert.assertEquals("No subscription", 1, subscribed.get());
to.assertNoValues();
to.assertNotComplete();
to.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoSubscriptionIfOtherErrors() {
PublishSubject<Object> other = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger subscribed = new AtomicInteger();
Observable.<Integer>error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(to);
to.assertNotComplete();
to.assertNoErrors();
to.assertNoValues();
Assert.assertEquals("Premature subscription", 0, subscribed.get());
other.onError(new TestException());
Assert.assertEquals("Premature subscription", 0, subscribed.get());
to.assertNoValues();
to.assertNotComplete();
to.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dontSubscribeIfDone() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final int[] count = { 0 };
Observable.combineLatest(Observable.empty(),
Observable.error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
count[0]++;
}
}),
new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return 0;
}
})
.test()
.assertResult();
assertEquals(0, count[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void byCount() {
final int[] subscriptions = { 0 };
Observable<Integer> source = Observable.range(1, 5)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
subscriptions[0]++;
}
})
.publish()
.refCount(2);
for (int i = 0; i < 3; i++) {
TestObserver<Integer> to1 = source.test();
to1.assertEmpty();
TestObserver<Integer> to2 = source.test();
to1.assertResult(1, 2, 3, 4, 5);
to2.assertResult(1, 2, 3, 4, 5);
}
assertEquals(3, subscriptions[0]);
}
内容来源于网络,如有侵权,请联系作者删除!