本文整理了Java中io.reactivex.Observable.combineLatestDelayError()
方法的一些代码示例,展示了Observable.combineLatestDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatestDelayError()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:combineLatestDelayError
[英]Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of the source ObservableSources each time an item is received from any of the source ObservableSources, where this aggregation is defined by a specified function and delays any error from the sources until all source ObservableSources terminate.
Note on method signature: since Java doesn't allow creating a generic array with new T[], the implementation of this operator has to create an Object[] instead. Unfortunately, a Function passed to the method would trigger a ClassCastException.
If any of the sources never produces an item but only terminates (normally or with an error), the resulting sequence terminates immediately (normally or with all the errors accumulated till that point). If that input source is also synchronous, other sources after it will not be subscribed to.
If there are no ObservableSources provided, the resulting sequence completes immediately without emitting any items and without any calls to the combiner function. Scheduler: combineLatestDelayError does not operate by default on a particular Scheduler.
[中]通过在每次从任何源可观测资源接收到一个项目时,发送一个项目来聚合每个源可观测资源的最新值,从而组合源可观测资源的集合,其中,该聚合由指定函数定义,并延迟源的任何错误,直到所有源可观测源终止。
关于方法签名的注意事项:由于Java不允许使用新的t[]创建泛型数组,因此该操作符的实现必须创建一个对象[]。不幸的是,传递给该方法的函数将触发ClassCastException。
如果任何一个源从未生成一个项,但只终止(正常或有错误),则生成的序列立即终止(正常或所有错误累积到该点)。如果该输入源也是同步的,则不会订阅它之后的其他源。
如果没有提供可观察的资源,则生成的序列将立即完成,而不会发出任何项,也不会调用组合器函数。调度程序:CombineRelatestDelayError默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsNull() {
Observable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128, (Observable<Object>[])null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableNull() {
Observable.combineLatestDelayError((Iterable<Observable<Object>>)null, new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableFunctionNull() {
Observable.combineLatestDelayError(Arrays.asList(just1), null, 128);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableFunctionReturnsNull() {
Observable.combineLatestDelayError(Arrays.asList(just1), new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return null;
}
}, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionNull() {
Observable.combineLatestDelayError(null, 128, Observable.never());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorIterableOfSources() {
Observable.combineLatestDelayError(Arrays.asList(
Observable.just(1), Observable.just(2)
), new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertResult("[1, 2]");
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
Observable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return null;
}
}, 128, just1).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
代码示例来源:origin: ReactiveX/RxJava
public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? extends T>[] sources,
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorArrayOfSources() {
Observable.combineLatestDelayError(new ObservableSource[] {
Observable.just(1), Observable.just(2)
}, new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertResult("[1, 2]");
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableIteratorNull() {
Observable.combineLatestDelayError(new Iterable<Observable<Object>>() {
@Override
public Iterator<Observable<Object>> iterator() {
return null;
}
}, new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableOneIsNull() {
Observable.combineLatestDelayError(Arrays.asList(Observable.never(), null), new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsOneIsNull() {
Observable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128, Observable.never(), null).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorIterableOfSourcesWithError() {
Observable.combineLatestDelayError(Arrays.asList(
Observable.just(1), Observable.just(2).concatWith(Observable.<Integer>error(new TestException()))
), new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertFailure(TestException.class, "[1, 2]");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorArrayOfSourcesWithError() {
Observable.combineLatestDelayError(new ObservableSource[] {
Observable.just(1), Observable.just(2).concatWith(Observable.<Integer>error(new TestException()))
}, new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertFailure(TestException.class, "[1, 2]");
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void combineLatestDelayErrorEmpty() {
assertSame(Observable.empty(), Observable.combineLatestDelayError(new ObservableSource[0], Functions.<Object[]>identity(), 16));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorDelayed() {
Observable.combineLatestDelayError(
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Observable.error(new TestException()),
Observable.just(1)
)
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void syncFirstErrorsAfterItemDelayError() {
Observable.combineLatestDelayError(Arrays.asList(
Observable.just(21).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(21).delay(100, TimeUnit.MILLISECONDS)
),
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return (Integer)a[0] + (Integer)a[1];
}
}
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 42);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void dontSubscribeIfDone2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final int[] count = { 0 };
Observable.combineLatestDelayError(
Arrays.asList(Observable.empty(),
Observable.error(new TestException())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
count[0]++;
}
})
),
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return 0;
}
})
.test()
.assertResult();
assertEquals(0, count[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorDelayed2() {
Observable.combineLatestDelayError(
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Observable.error(new TestException()).startWith(1),
Observable.empty()
)
.test()
.assertFailure(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!