io.reactivex.Flowable.doOnComplete()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(340)

本文整理了Java中io.reactivex.Flowable.doOnComplete()方法的一些代码示例,展示了Flowable.doOnComplete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnComplete()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnComplete

Flowable.doOnComplete介绍

[英]Modifies the source Publisher so that it invokes an action when it calls onComplete.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onComplete时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnComplete不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Flowable<Integer> w) {
    return w.startWith(indicator)
        .doOnComplete(new Action() {
          @Override
          public void run() {
            System.out.println("inner done: " + wip.incrementAndGet());
          }
        })
        ;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnCompleteNull() {
  just1.doOnComplete(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnCompleted() {
  final AtomicBoolean r = new AtomicBoolean();
  String output = Flowable.just("one").doOnComplete(new Action() {
    @Override
    public void run() {
      r.set(true);
    }
  }).blockingSingle();
  assertEquals("one", output);
  assertTrue(r.get());
}

代码示例来源:origin: ReactiveX/RxJava

private static <T> Flowable<T> composer(Flowable<T> source, final AtomicInteger subscriptionCount, final int m) {
  return source.doOnSubscribe(new Consumer<Subscription>() {
    @Override
    public void accept(Subscription s) {
        int n = subscriptionCount.getAndIncrement();
        if (n >= m) {
          Assert.fail("Too many subscriptions! " + (n + 1));
        }
    }
  }).doOnComplete(new Action() {
    @Override
    public void run() {
        int n = subscriptionCount.decrementAndGet();
        if (n < 0) {
          Assert.fail("Too many unsubscriptions! " + (n - 1));
        }
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

.doOnNext(sourceNext)
.doOnCancel(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.replay();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrash() {
  Flowable.fromPublisher(new Publisher<Object>() {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrashConditional() {
  Flowable.fromPublisher(new Publisher<Object>() {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

.doOnComplete(new Action() {
  @Override
  public void run() {

代码示例来源:origin: ReactiveX/RxJava

.window(300, TimeUnit.MILLISECONDS)
.take(10)
.doOnComplete(new Action() {
  @Override
  public void run() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnCancel(unsub)
  .doOnComplete(completion);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("disposed", "completed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnCancel(unsub)
  .doOnComplete(completion);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("completed", "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedOnErrorCrash() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0 };
  Flowable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertFailure(TestException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fused() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  Flowable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

ConnectableFlowable<Integer> is = Flowable.range(1, Flowable.bufferSize() * 2).publish();
Flowable<Integer> fast = is.observeOn(Schedulers.computation())
.doOnComplete(new Action() {
  @Override
  public void run() {
}).doOnComplete(new Action() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedAsync() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  up
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .subscribe(ts);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
  if (group.getKey() < 3) {
    return group.map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "first groups: " + t1;
      }
    })
        // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
        .take(2).doOnComplete(new Action() {
          @Override
          public void run() {
            first.countDown();
          }
        });
  } else {
    return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "last group: " + t1;
      }
    }).doOnEach(new Consumer<Notification<String>>() {
      @Override
      public void accept(Notification<String> t1) {
        System.err.println("subscribeOn notification => " + t1);
      }
    });
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedOnErrorCrashConditional() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0 };
  Flowable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertFailure(TestException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedConditional() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  Flowable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedAsyncConditional() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  up
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedAsyncConditional2() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  up.hide()
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

相关文章

Flowable类方法