io.reactivex.Observable.doOnComplete()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(206)

本文整理了Java中io.reactivex.Observable.doOnComplete()方法的一些代码示例,展示了Observable.doOnComplete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnComplete()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnComplete

Observable.doOnComplete介绍

[英]Modifies the source ObservableSource so that it invokes an action when it calls onComplete.

Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onComplete时调用操作。
调度器:默认情况下,doOnComplete不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Integer> apply(Observable<Integer> w) {
  3. return w.startWith(indicator)
  4. .doOnComplete(new Action() {
  5. @Override
  6. public void run() {
  7. System.out.println("inner done: " + wip.incrementAndGet());
  8. }
  9. })
  10. ;
  11. }
  12. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnCompleteNull() {
  3. just1.doOnComplete(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnCompleted() {
  3. final AtomicBoolean r = new AtomicBoolean();
  4. String output = Observable.just("one").doOnComplete(new Action() {
  5. @Override
  6. public void run() {
  7. r.set(true);
  8. }
  9. }).blockingSingle();
  10. assertEquals("one", output);
  11. assertTrue(r.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. private static <T> Observable<T> composer(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
  2. return source.doOnSubscribe(new Consumer<Disposable>() {
  3. @Override
  4. public void accept(Disposable d) {
  5. int n = subscriptionCount.getAndIncrement();
  6. if (n >= m) {
  7. Assert.fail("Too many subscriptions! " + (n + 1));
  8. }
  9. }
  10. }).doOnComplete(new Action() {
  11. @Override
  12. public void run() {
  13. int n = subscriptionCount.decrementAndGet();
  14. if (n < 0) {
  15. Assert.fail("Too many unsubscriptions! " + (n - 1));
  16. }
  17. }
  18. });
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrash() {
  3. Observable.wrap(new ObservableSource<Object>() {
  4. @Override
  5. public void subscribe(Observer<? super Object> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .test()
  17. .assertFailure(IOException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnNext(sourceNext)
  2. .doOnDispose(sourceUnsubscribed)
  3. .doOnComplete(sourceCompleted)
  4. .doOnError(sourceError)
  5. .subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnNext(sourceNext)
  2. .doOnDispose(sourceUnsubscribed)
  3. .doOnComplete(sourceCompleted)
  4. .replay();

代码示例来源:origin: ReactiveX/RxJava

  1. .window(300, TimeUnit.MILLISECONDS)
  2. .take(10)
  3. .doOnComplete(new Action() {
  4. @Override
  5. public void run() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteCrashConditional() {
  3. Observable.wrap(new ObservableSource<Object>() {
  4. @Override
  5. public void subscribe(Observer<? super Object> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onComplete();
  8. }
  9. })
  10. .doOnComplete(new Action() {
  11. @Override
  12. public void run() throws Exception {
  13. throw new IOException();
  14. }
  15. })
  16. .filter(Functions.alwaysTrue())
  17. .test()
  18. .assertFailure(IOException.class);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
  8. @Override
  9. public Observable<String> apply(Resource resource) {
  10. return Observable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Observer<String> observer = TestHelper.mockObserver();
  14. Observable<String> o = Observable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), true)
  16. .doOnDispose(unsub)
  17. .doOnComplete(completion);
  18. o.safeSubscribe(observer);
  19. assertEquals(Arrays.asList("disposed", "completed" /* , "unsub" */), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
  8. @Override
  9. public Observable<String> apply(Resource resource) {
  10. return Observable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Observer<String> observer = TestHelper.mockObserver();
  14. Observable<String> o = Observable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), false)
  16. .doOnDispose(unsub)
  17. .doOnComplete(completion);
  18. o.safeSubscribe(observer);
  19. assertEquals(Arrays.asList("completed", /*"unsub",*/ "disposed"), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedOnErrorCrash() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0 };
  6. Observable.range(1, 5)
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. throw new TestException();
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[0]++;
  17. }
  18. })
  19. .subscribe(to);
  20. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  21. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  22. .assertFailure(TestException.class);
  23. assertEquals(0, call[0]);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fused() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0, 0 };
  6. Observable.range(1, 5)
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. call[0]++;
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[1]++;
  17. }
  18. })
  19. .subscribe(to);
  20. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  21. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  22. .assertResult(1, 2, 3, 4, 5);
  23. assertEquals(5, call[0]);
  24. assertEquals(1, call[1]);
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. ConnectableObservable<Integer> is = Observable.range(1, Flowable.bufferSize() * 2).publish();
  2. Observable<Integer> fast = is.observeOn(Schedulers.computation())
  3. .doOnComplete(new Action() {
  4. @Override
  5. public void run() {
  6. }).doOnComplete(new Action() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  3. if (group.getKey() < 3) {
  4. return group.map(new Function<Integer, String>() {
  5. @Override
  6. public String apply(Integer t1) {
  7. return "first groups: " + t1;
  8. }
  9. })
  10. // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
  11. .take(2).doOnComplete(new Action() {
  12. @Override
  13. public void run() {
  14. first.countDown();
  15. }
  16. });
  17. } else {
  18. return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
  19. @Override
  20. public String apply(Integer t1) {
  21. return "last group: " + t1;
  22. }
  23. }).doOnEach(new Consumer<Notification<String>>() {
  24. @Override
  25. public void accept(Notification<String> t1) {
  26. System.err.println("subscribeOn notification => " + t1);
  27. }
  28. });
  29. }
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedAsync() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0, 0 };
  6. UnicastSubject<Integer> up = UnicastSubject.create();
  7. up
  8. .doOnNext(new Consumer<Integer>() {
  9. @Override
  10. public void accept(Integer v) throws Exception {
  11. call[0]++;
  12. }
  13. })
  14. .doOnComplete(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. call[1]++;
  18. }
  19. })
  20. .subscribe(to);
  21. TestHelper.emit(up, 1, 2, 3, 4, 5);
  22. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  23. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  24. .assertResult(1, 2, 3, 4, 5);
  25. assertEquals(5, call[0]);
  26. assertEquals(1, call[1]);
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedOnErrorCrashConditional() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0 };
  6. Observable.range(1, 5)
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. throw new TestException();
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[0]++;
  17. }
  18. })
  19. .filter(Functions.alwaysTrue())
  20. .subscribe(to);
  21. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  22. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  23. .assertFailure(TestException.class);
  24. assertEquals(0, call[0]);
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedConditional() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0, 0 };
  6. Observable.range(1, 5)
  7. .doOnNext(new Consumer<Integer>() {
  8. @Override
  9. public void accept(Integer v) throws Exception {
  10. call[0]++;
  11. }
  12. })
  13. .doOnComplete(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. call[1]++;
  17. }
  18. })
  19. .filter(Functions.alwaysTrue())
  20. .subscribe(to);
  21. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  22. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  23. .assertResult(1, 2, 3, 4, 5);
  24. assertEquals(5, call[0]);
  25. assertEquals(1, call[1]);
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedAsyncConditional() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0, 0 };
  6. UnicastSubject<Integer> up = UnicastSubject.create();
  7. up
  8. .doOnNext(new Consumer<Integer>() {
  9. @Override
  10. public void accept(Integer v) throws Exception {
  11. call[0]++;
  12. }
  13. })
  14. .doOnComplete(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. call[1]++;
  18. }
  19. })
  20. .filter(Functions.alwaysTrue())
  21. .subscribe(to);
  22. TestHelper.emit(up, 1, 2, 3, 4, 5);
  23. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  24. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  25. .assertResult(1, 2, 3, 4, 5);
  26. assertEquals(5, call[0]);
  27. assertEquals(1, call[1]);
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Fusion not supported yet") // TODO decide/implement fusion
  3. public void fusedAsyncConditional2() {
  4. TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  5. final int[] call = { 0, 0 };
  6. UnicastSubject<Integer> up = UnicastSubject.create();
  7. up.hide()
  8. .doOnNext(new Consumer<Integer>() {
  9. @Override
  10. public void accept(Integer v) throws Exception {
  11. call[0]++;
  12. }
  13. })
  14. .doOnComplete(new Action() {
  15. @Override
  16. public void run() throws Exception {
  17. call[1]++;
  18. }
  19. })
  20. .filter(Functions.alwaysTrue())
  21. .subscribe(to);
  22. TestHelper.emit(up, 1, 2, 3, 4, 5);
  23. to.assertOf(ObserverFusion.<Integer>assertFuseable())
  24. .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  25. .assertResult(1, 2, 3, 4, 5);
  26. assertEquals(5, call[0]);
  27. assertEquals(1, call[1]);
  28. }

相关文章

Observable类方法