本文整理了Java中io.reactivex.Flowable.zipWith()
方法的一些代码示例,展示了Flowable.zipWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.zipWith()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:zipWith
[英]Returns a Flowable that emits items that are the result of applying a specified function to pairs of values, one each from the source Publisher and a specified Iterable sequence.
Note that the other Iterable is evaluated as items are observed from the source Publisher; it is not pre-consumed. This allows you to zip infinite streams on either side. Backpressure: The operator expects backpressure from the sources and honors backpressure from the downstream. (I.e., zipping with #interval(long,TimeUnit) may result in MissingBackpressureException, use one of the onBackpressureX to handle similar, backpressure-ignoring sources. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个可流动项,该可流动项是将指定函数应用于值对的结果,每个值对来自源发布服务器和指定的Iterable序列。
请注意,另一个Iterable是在从源发布服务器观察项目时进行评估的;它不是预先消费的。这允许您在任意一侧压缩无限流。背压:操作员期望来自源头的背压,并尊重来自下游的背压。(即,使用#间隔(长,时间单位)压缩可能会导致背压缺失异常,使用OnBackPressureEx之一处理类似的背压源。Scheduler:zipWith默认情况下不会在特定的计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Integer> f) throws Exception {
return f.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableNull() {
just1.zipWith((Iterable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithPublisherNull() {
just1.zipWith((Publisher<Integer>)null, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithFlowableNull() {
just1.zipWith((Flowable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> attempt) {
return attempt.zipWith(Flowable.just(1), new BiFunction<Throwable, Integer, Object>() {
@Override
public Object apply(Throwable o, Integer integer) {
return 0;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithCombinerNull() {
just1.zipWith(just1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerNull() {
just1.zipWith(Arrays.asList(1), null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerReturnsNull() {
just1.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithCombinerReturnsNull() {
just1.zipWith(just1, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements)
.zipWith(Flowable.range((int)elements, (int)elements), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a + b;
}
})
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements)
.zipWith(iterate(elements), new BiFunction<Integer, Long, Integer>() {
@Override
public Integer apply(Integer a, Long b) throws Exception {
return a + b.intValue();
}
})
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1).zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableIteratorNull() {
just1.zipWith(new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return null;
}
}, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer a, Object b) {
return 1;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableOneIsNull() {
Flowable.just(1, 2).zipWith(Arrays.asList(1, null), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake2() {
Flowable<Integer> f = Flowable.just(1, 2, 3, 4, 5);
Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
SquareStr squareStr = new SquareStr();
f.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
assertEquals(2, squareStr.counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void iteratorThrows() {
Flowable.just(1).zipWith(new CrashingIterable(100, 1, 100), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void zipWithDelayError() {
Flowable.just(1)
.zipWith(Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a + b;
}
}, true)
.test()
.assertResult(3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void zipWithDelayErrorBufferSize() {
Flowable.just(1)
.zipWith(Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a + b;
}
}, true, 16)
.test()
.assertResult(3);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testStartAsync() throws InterruptedException {
Flowable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer()
.zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer(), new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer a, Integer b) {
return a + "-" + b;
}
}).take(5);
TestSubscriber<String> ts = new TestSubscriber<String>();
os.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(5, ts.valueCount());
assertEquals("1-1", ts.values().get(0));
assertEquals("2-2", ts.values().get(1));
assertEquals("5-5", ts.values().get(4));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnboundedDownstreamOverrequesting() {
Flowable<Integer> source = Flowable.range(1, 2).zipWith(Flowable.range(1, 2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + 10 * t2;
}
});
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
request(5);
}
};
source.subscribe(ts);
ts.assertNoErrors();
ts.assertTerminated();
ts.assertValues(11, 22);
}
内容来源于网络,如有侵权,请联系作者删除!