io.reactivex.Flowable.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中io.reactivex.Flowable.zipWith()方法的一些代码示例,展示了Flowable.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.zipWith()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:zipWith

Flowable.zipWith介绍

[英]Returns a Flowable that emits items that are the result of applying a specified function to pairs of values, one each from the source Publisher and a specified Iterable sequence.

Note that the other Iterable is evaluated as items are observed from the source Publisher; it is not pre-consumed. This allows you to zip infinite streams on either side. Backpressure: The operator expects backpressure from the sources and honors backpressure from the downstream. (I.e., zipping with #interval(long,TimeUnit) may result in MissingBackpressureException, use one of the onBackpressureX to handle similar, backpressure-ignoring sources. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个可流动项,该可流动项是将指定函数应用于值对的结果,每个值对来自源发布服务器和指定的Iterable序列。
请注意,另一个Iterable是在从源发布服务器观察项目时进行评估的;它不是预先消费的。这允许您在任意一侧压缩无限流。背压:操作员期望来自源头的背压,并尊重来自下游的背压。(即,使用#间隔(长,时间单位)压缩可能会导致背压缺失异常,使用OnBackPressureEx之一处理类似的背压源。Scheduler:zipWith默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Integer> f) throws Exception {
    return f.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b) throws Exception {
        return a + b;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableNull() {
  just1.zipWith((Iterable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithPublisherNull() {
  just1.zipWith((Publisher<Integer>)null, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithFlowableNull() {
  just1.zipWith((Flowable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<? extends Throwable> attempt) {
    return attempt.zipWith(Flowable.just(1), new BiFunction<Throwable, Integer, Object>() {
      @Override
      public Object apply(Throwable o, Integer integer) {
        return 0;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithCombinerNull() {
  just1.zipWith(just1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerNull() {
  just1.zipWith(Arrays.asList(1), null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerReturnsNull() {
  just1.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithCombinerReturnsNull() {
  just1.zipWith(just1, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements)
        .zipWith(Flowable.range((int)elements, (int)elements), new BiFunction<Integer, Integer, Integer>() {
          @Override
          public Integer apply(Integer a, Integer b) throws Exception {
            return a + b;
          }
        })
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements)
        .zipWith(iterate(elements), new BiFunction<Integer, Long, Integer>() {
          @Override
          public Integer apply(Integer a, Long b) throws Exception {
            return a + b.intValue();
          }
        })
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1).zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableIteratorNull() {
  just1.zipWith(new Iterable<Object>() {
    @Override
    public Iterator<Object> iterator() {
      return null;
    }
  }, new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b) {
      return 1;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableOneIsNull() {
  Flowable.just(1, 2).zipWith(Arrays.asList(1, null), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Flowable<Integer> f = Flowable.just(1, 2, 3, 4, 5);
  Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
  SquareStr squareStr = new SquareStr();
  f.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
  assertEquals(2, squareStr.counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void iteratorThrows() {
  Flowable.just(1).zipWith(new CrashingIterable(100, 1, 100), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zipWithDelayError() {
  Flowable.just(1)
  .zipWith(Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }, true)
  .test()
  .assertResult(3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void zipWithDelayErrorBufferSize() {
    Flowable.just(1)
    .zipWith(Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
      @Override
      public Integer apply(Integer a, Integer b) throws Exception {
        return a + b;
      }
    }, true, 16)
    .test()
    .assertResult(3);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStartAsync() throws InterruptedException {
  Flowable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer()
      .zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer(), new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer a, Integer b) {
          return a + "-" + b;
        }
      }).take(5);
  TestSubscriber<String> ts = new TestSubscriber<String>();
  os.subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(5, ts.valueCount());
  assertEquals("1-1", ts.values().get(0));
  assertEquals("2-2", ts.values().get(1));
  assertEquals("5-5", ts.values().get(4));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnboundedDownstreamOverrequesting() {
  Flowable<Integer> source = Flowable.range(1, 2).zipWith(Flowable.range(1, 2), new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + 10 * t2;
    }
  });
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      request(5);
    }
  };
  source.subscribe(ts);
  ts.assertNoErrors();
  ts.assertTerminated();
  ts.assertValues(11, 22);
}

相关文章

Flowable类方法