本文整理了Java中io.reactivex.Flowable.combineLatest()
方法的一些代码示例,展示了Flowable.combineLatest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.combineLatest()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:combineLatest
[英]Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of the source Publishers each time an item is received from any of the source Publishers, where this aggregation is defined by a specified function.
Note on method signature: since Java doesn't allow creating a generic array with new T[], the implementation of this operator has to create an Object[] instead. Unfortunately, a Function passed to the method would trigger a ClassCastException.
If any of the sources never produces an item but only terminates (normally or with an error), the resulting sequence terminates immediately (normally or with all the errors accumulated until that point). If that input source is also synchronous, other sources after it will not be subscribed to.
If there are no source Publishers provided, the resulting sequence completes immediately without emitting any items and without any calls to the combiner function. Backpressure: The returned Publisher honors backpressure from downstream. The source Publishers are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal MissingBackpressureException) and may lead to OutOfMemoryError due to internal buffer bloat. Scheduler: combineLatest does not operate by default on a particular Scheduler.
[中]通过发出一个项来组合源发布服务器的集合,该项在每次从任何源发布服务器接收项时聚合每个源发布服务器的最新值,其中此聚合由指定函数定义。
关于方法签名的注意事项:由于Java不允许使用新的t[]创建泛型数组,因此该操作符的实现必须创建一个对象[]。不幸的是,传递给该方法的函数将触发ClassCastException。
如果任何一个源从未生成一个项,而只是终止(正常或有错误),则生成的序列立即终止(正常或所有错误累积到该点)。如果该输入源也是同步的,则不会订阅它之后的其他源。
如果没有提供源发布程序,则生成的序列将立即完成,而不会发出任何项,也不会调用组合器函数。背压:返回的发布者接受来自下游的背压。源发布服务器是以有限制的方式请求的,但是,它们的背压不会强制执行(操作员不会发出MissingBackpressureException的信号),并且可能由于内部缓冲区膨胀而导致OutOfMemoryError。调度程序:默认情况下,CombineTest不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* This won't compile if super/extends isn't done correctly on generics.
*/
@Test
public void testCovarianceOfCombineLatest() {
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
Flowable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testZeroSources() {
Flowable<Object> result = Flowable.combineLatest(
Collections.<Flowable<Object>> emptyList(), new Function<Object[], Object>() {
@Override
public Object apply(Object[] args) {
return args;
}
});
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onComplete();
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCombineLatest2Types() {
BiFunction<String, Integer, String> combineLatestFunction = getConcatStringIntegerCombineLatestFunction();
/* define an Observer to receive aggregated events */
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> w = Flowable.combineLatest(Flowable.just("one", "two"), Flowable.just(2, 3, 4), combineLatestFunction);
w.subscribe(subscriber);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
verify(subscriber, times(1)).onNext("two2");
verify(subscriber, times(1)).onNext("two3");
verify(subscriber, times(1)).onNext("two4");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCombineLatest3TypesA() {
Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
/* define an Observer to receive aggregated events */
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> w = Flowable.combineLatest(Flowable.just("one", "two"), Flowable.just(2), Flowable.just(new int[] { 4, 5, 6 }), combineLatestFunction);
w.subscribe(subscriber);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
verify(subscriber, times(1)).onNext("two2[4, 5, 6]");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCombineLatest3TypesB() {
Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
/* define an Observer to receive aggregated events */
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> w = Flowable.combineLatest(Flowable.just("one"), Flowable.just(2), Flowable.just(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
w.subscribe(subscriber);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
verify(subscriber, times(1)).onNext("one2[4, 5, 6]");
verify(subscriber, times(1)).onNext("one2[7, 8]");
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.combineLatest(Arrays.asList(
Flowable.just(1L),
Flowable.fromIterable(iterate(elements))
),
new Function<Object[], Long>() {
@Override
public Long apply(Object[] a) throws Exception {
return (Long)a[0];
}
}
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFirstNeverProduces() {
PublishProcessor<Integer> a = PublishProcessor.create();
PublishProcessor<Integer> b = PublishProcessor.create();
Flowable<Integer> source = Flowable.combineLatest(a, b, or);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.subscribe(subscriber);
b.onNext(0x10);
b.onNext(0x20);
a.onComplete();
inOrder.verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void test2SourcesOverload() {
Flowable<Integer> s1 = Flowable.just(1);
Flowable<Integer> s2 = Flowable.just(2);
Flowable<List<Integer>> result = Flowable.combineLatest(s1, s2,
new BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1, Integer t2) {
return Arrays.asList(t1, t2);
}
});
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onNext(Arrays.asList(1, 2));
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSecondNeverProduces() {
PublishProcessor<Integer> a = PublishProcessor.create();
PublishProcessor<Integer> b = PublishProcessor.create();
Flowable<Integer> source = Flowable.combineLatest(a, b, or);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.subscribe(subscriber);
a.onNext(0x1);
a.onNext(0x2);
b.onComplete();
a.onComplete();
inOrder.verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void test3SourcesOverload() {
Flowable<Integer> s1 = Flowable.just(1);
Flowable<Integer> s2 = Flowable.just(2);
Flowable<Integer> s3 = Flowable.just(3);
Flowable<List<Integer>> result = Flowable.combineLatest(s1, s2, s3,
new Function3<Integer, Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1, Integer t2, Integer t3) {
return Arrays.asList(t1, t2, t3);
}
});
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onNext(Arrays.asList(1, 2, 3));
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.combineLatest(
new Function<Object[], Long>() {
@Override
public Long apply(Object[] a) throws Exception {
return (Long)a[0];
}
},
Flowable.just(1L),
Flowable.fromIterable(iterate(elements))
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void combineLatestEmpty() {
assertSame(Flowable.empty(), Flowable.combineLatest(new Flowable[0], Functions.<Object[]>identity(), 16));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposed() {
TestHelper.checkDisposed(Flowable.combineLatest(Flowable.never(), Flowable.never(), new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return a;
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void test4SourcesOverload() {
Flowable<Integer> s1 = Flowable.just(1);
Flowable<Integer> s2 = Flowable.just(2);
Flowable<Integer> s3 = Flowable.just(3);
Flowable<Integer> s4 = Flowable.just(4);
Flowable<List<Integer>> result = Flowable.combineLatest(s1, s2, s3, s4,
new Function4<Integer, Integer, Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1, Integer t2, Integer t3, Integer t4) {
return Arrays.asList(t1, t2, t3, t4);
}
});
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onNext(Arrays.asList(1, 2, 3, 4));
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void combineLatestObject() {
@SuppressWarnings("unchecked")
final List<Flowable<Integer>> flowables = Arrays.asList(Flowable.just(1, 2, 3), Flowable.just(1, 2, 3));
Flowable.combineLatest(flowables, new Function<Object[], Object>() {
@Override
public Object apply(final Object[] o) throws Exception {
int sum = 1;
for (Object i : o) {
sum *= (Integer) i;
}
return sum;
}
}).test().assertResult(3, 6, 9);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void combineLatestArrayOfSources() {
Flowable.combineLatest(new Flowable[] {
Flowable.just(1), Flowable.just(2)
}, new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertResult("[1, 2]");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.combineLatest(Flowable.never(), Flowable.error(new TestException()), new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return a;
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void combineLatestIterable() {
Flowable<Integer> source = Flowable.just(1);
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.combineLatest(Arrays.asList(source, source),
new Function<Object[], Integer>() {
@Override
public Integer apply(Object[] args) {
return (Integer)args[0] + (Integer)args[1];
}
})
.subscribe(ts);
ts.assertValue(2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedNullCheck() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ASYNC);
Flowable.combineLatest(Flowable.just(1), Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return null;
}
})
.subscribe(ts);
ts
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void combineAsync() {
Flowable<Integer> source = Flowable.range(1, 1000).subscribeOn(Schedulers.computation());
Flowable.combineLatest(source, source, new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return a;
}
})
.take(500)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!