io.reactivex.Flowable.concatMapEager()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中io.reactivex.Flowable.concatMapEager()方法的一些代码示例,展示了Flowable.concatMapEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatMapEager()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatMapEager

Flowable.concatMapEager介绍

[英]Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single Publisher.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source Publishers. The operator buffers the values emitted by these Publishers and then drains them in order, each one after the previous one completes. Backpressure: Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources are subscribed to in unbounded mode and their values are queued up in an unbounded buffer. Scheduler: This method does not operate by default on a particular Scheduler.
[中]将一系列值映射到发布服务器,并将这些发布服务器连接到单个发布服务器。
即时连接意味着一旦订阅者订阅,该操作员就订阅所有源发布者。运算符缓冲这些发布服务器发出的值,然后依次将其排出,在前一个发布服务器完成后再排出。背压:背压向下游移动,但是,由于迫切需要,源以无界模式订阅,其值在无界缓冲区中排队。调度程序:默认情况下,此方法不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.concatMapEager(new Function<Object, Flowable<Object>>() {
  4. @Override
  5. public Flowable<Object> apply(Object v) throws Exception {
  6. return Flowable.just(v);
  7. }
  8. });
  9. }
  10. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidMaxConcurrent() {
  3. Flowable.just(1).concatMapEager(toJust, 0, Flowable.bufferSize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidCapacityHint() {
  3. Flowable.just(1).concatMapEager(toJust, Flowable.bufferSize(), 0);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void mappingBadCapacityHint() throws Exception {
  4. Flowable<Integer> source = Flowable.just(1);
  5. try {
  6. Flowable.just(source, source, source).concatMapEager((Function)Functions.identity(), 10, -99);
  7. } catch (IllegalArgumentException ex) {
  8. assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  9. }
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. Flowable.range(1, 5)
  4. .concatMapEager(new Function<Integer, Publisher<Integer>>() {
  5. @Override
  6. public Publisher<Integer> apply(Integer t) {
  7. return Flowable.range(t, 2);
  8. }
  9. })
  10. .test()
  11. .assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.just(1).hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.range(1, 2);
  7. }
  8. }));
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorMaxConcurrency() {
  3. Flowable.<Integer>just(1).hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.error(new TestException());
  7. }
  8. }, 1, 128)
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Flowable.<Integer>just(1).hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.error(new TestException());
  7. }
  8. })
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void empty() {
  3. Flowable.<Integer>empty().hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.range(1, 2);
  7. }
  8. })
  9. .test()
  10. .assertResult();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorFused() {
  3. Flowable.<Integer>just(1).hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.range(1, 2).map(new Function<Integer, Integer>() {
  7. @Override
  8. public Integer apply(Integer v) throws Exception {
  9. throw new TestException();
  10. }
  11. });
  12. }
  13. })
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerCallableThrows() {
  3. Flowable.<Integer>just(1).hide().concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer v) throws Exception {
  6. return Flowable.fromCallable(new Callable<Integer>() {
  7. @Override
  8. public Integer call() throws Exception {
  9. throw new TestException();
  10. }
  11. });
  12. }
  13. })
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSimple() {
  3. Flowable.range(1, 100).concatMapEager(toJust).subscribe(ts);
  4. ts.assertNoErrors();
  5. ts.assertValueCount(100);
  6. ts.assertComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSimple2() {
  3. Flowable.range(1, 100).concatMapEager(toRange).subscribe(ts);
  4. ts.assertNoErrors();
  5. ts.assertValueCount(200);
  6. ts.assertComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMapperThrows() {
  3. Flowable.just(1).concatMapEager(new Function<Integer, Flowable<Integer>>() {
  4. @Override
  5. public Flowable<Integer> apply(Integer t) {
  6. throw new TestException();
  7. }
  8. }).subscribe(ts);
  9. ts.assertNoValues();
  10. ts.assertNotComplete();
  11. ts.assertError(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperCancels() {
  3. final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. Flowable.just(1).hide()
  5. .concatMapEager(new Function<Integer, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(Integer v) throws Exception {
  8. ts.cancel();
  9. return Flowable.never();
  10. }
  11. }, 1, 128)
  12. .subscribe(ts);
  13. ts.assertEmpty();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMainError() {
  3. Flowable.<Integer>error(new TestException()).concatMapEager(toJust).subscribe(ts);
  4. ts.assertNoValues();
  5. ts.assertError(TestException.class);
  6. ts.assertNotComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Null values are not allowed in RS")
  3. public void testInnerNull() {
  4. Flowable.just(1).concatMapEager(new Function<Integer, Flowable<Integer>>() {
  5. @Override
  6. public Flowable<Integer> apply(Integer t) {
  7. return Flowable.just(null);
  8. }
  9. }).subscribe(ts);
  10. ts.assertNoErrors();
  11. ts.assertComplete();
  12. ts.assertValue(null);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void unboundedIn() {
  3. int n = Flowable.bufferSize() * 2;
  4. Flowable.range(1, n)
  5. .concatMapEager(new Function<Integer, Publisher<Integer>>() {
  6. @Override
  7. public Publisher<Integer> apply(Integer v) throws Exception {
  8. return Flowable.just(1);
  9. }
  10. }, Integer.MAX_VALUE, 16)
  11. .test()
  12. .assertValueCount(n)
  13. .assertComplete()
  14. .assertNoErrors();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void longEager() {
  3. Flowable.range(1, 2 * Flowable.bufferSize())
  4. .concatMapEager(new Function<Integer, Publisher<Integer>>() {
  5. @Override
  6. public Publisher<Integer> apply(Integer v) {
  7. return Flowable.just(1);
  8. }
  9. })
  10. .test()
  11. .assertValueCount(2 * Flowable.bufferSize())
  12. .assertNoErrors()
  13. .assertComplete();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerLong() {
  3. int n = Flowable.bufferSize() * 2;
  4. Flowable.just(1).hide()
  5. .concatMapEager(Functions.justFunction(Flowable.range(1, n).hide()))
  6. .rebatchRequests(1)
  7. .test()
  8. .assertValueCount(n)
  9. .assertComplete()
  10. .assertNoErrors();
  11. }

相关文章

Flowable类方法