本文整理了Java中io.reactivex.Observable.doOnComplete()
方法的一些代码示例,展示了Observable.doOnComplete()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnComplete()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnComplete
[英]Modifies the source ObservableSource so that it invokes an action when it calls onComplete.
Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onComplete时调用操作。
调度器:默认情况下,doOnComplete不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Observable<Integer> w) {
return w.startWith(indicator)
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("inner done: " + wip.incrementAndGet());
}
})
;
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnCompleteNull() {
just1.doOnComplete(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnCompleted() {
final AtomicBoolean r = new AtomicBoolean();
String output = Observable.just("one").doOnComplete(new Action() {
@Override
public void run() {
r.set(true);
}
}).blockingSingle();
assertEquals("one", output);
assertTrue(r.get());
}
代码示例来源:origin: ReactiveX/RxJava
private static <T> Observable<T> composer(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
return source.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
int n = subscriptionCount.getAndIncrement();
if (n >= m) {
Assert.fail("Too many subscriptions! " + (n + 1));
}
}
}).doOnComplete(new Action() {
@Override
public void run() {
int n = subscriptionCount.decrementAndGet();
if (n < 0) {
Assert.fail("Too many unsubscriptions! " + (n - 1));
}
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrash() {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
.doOnNext(sourceNext)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();
代码示例来源:origin: ReactiveX/RxJava
.doOnNext(sourceNext)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.replay();
代码示例来源:origin: ReactiveX/RxJava
.window(300, TimeUnit.MILLISECONDS)
.take(10)
.doOnComplete(new Action() {
@Override
public void run() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrashConditional() {
Observable.wrap(new ObservableSource<Object>() {
@Override
public void subscribe(Observer<? super Object> observer) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnDispose(unsub)
.doOnComplete(completion);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("disposed", "completed" /* , "unsub" */), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnDispose(unsub)
.doOnComplete(completion);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("completed", /*"unsub",*/ "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedOnErrorCrash() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0 };
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
})
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertFailure(TestException.class);
assertEquals(0, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fused() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
ConnectableObservable<Integer> is = Observable.range(1, Flowable.bufferSize() * 2).publish();
Observable<Integer> fast = is.observeOn(Schedulers.computation())
.doOnComplete(new Action() {
@Override
public void run() {
}).doOnComplete(new Action() {
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
if (group.getKey() < 3) {
return group.map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "first groups: " + t1;
}
})
// must take(2) so an onComplete + unsubscribe happens on these first 2 groups
.take(2).doOnComplete(new Action() {
@Override
public void run() {
first.countDown();
}
});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "last group: " + t1;
}
}).doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> t1) {
System.err.println("subscribeOn notification => " + t1);
}
});
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsync() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastSubject<Integer> up = UnicastSubject.create();
up
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.subscribe(to);
TestHelper.emit(up, 1, 2, 3, 4, 5);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedOnErrorCrashConditional() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0 };
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertFailure(TestException.class);
assertEquals(0, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedConditional() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
Observable.range(1, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsyncConditional() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastSubject<Integer> up = UnicastSubject.create();
up
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(to);
TestHelper.emit(up, 1, 2, 3, 4, 5);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsyncConditional2() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
final int[] call = { 0, 0 };
UnicastSubject<Integer> up = UnicastSubject.create();
up.hide()
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
call[0]++;
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
call[1]++;
}
})
.filter(Functions.alwaysTrue())
.subscribe(to);
TestHelper.emit(up, 1, 2, 3, 4, 5);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, call[0]);
assertEquals(1, call[1]);
}
内容来源于网络,如有侵权,请联系作者删除!