io.reactivex.Observable.collect()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(147)

本文整理了Java中io.reactivex.Observable.collect()方法的一些代码示例,展示了Observable.collect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.collect()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:collect

Observable.collect介绍

[英]Collects items emitted by the finite source ObservableSource into a single mutable data structure and returns a Single that emits this structure.

This is a simplified version of reduce that does not need to return the state on each pass.

Note that this operator requires the upstream to signal onComplete for the accumulator object to be emitted. Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError. Scheduler: collect does not operate by default on a particular Scheduler.
[中]将有限源ObservableSource发出的项收集到单个可变数据结构中,并返回发出此结构的单个数据结构。
这是reduce的简化版本,不需要在每次传递时返回状态。
请注意,此运算符要求上游发出信号,通知要发射的累加器对象完成。无限且永远不完整的源永远不会通过该运算符发出任何信息,而无限源可能会导致致命的OutOfMemory错误。调度程序:默认情况下,collect不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public SingleSource<List<Integer>> apply(Observable<Integer> o) throws Exception {
  3. return o.collect(new Callable<List<Integer>>() {
  4. @Override
  5. public List<Integer> call() throws Exception {
  6. return new ArrayList<Integer>();
  7. }
  8. }, new BiConsumer<List<Integer>, Integer>() {
  9. @Override
  10. public void accept(List<Integer> a, Integer b) throws Exception {
  11. a.add(b);
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void collectInitialCollectorNull() {
  3. just1.collect(new Callable<Object>() {
  4. @Override
  5. public Object call() {
  6. return 1;
  7. }
  8. }, null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void collectInitialSupplierNull() {
  3. just1.collect((Callable<Integer>)null, new BiConsumer<Integer, Integer>() {
  4. @Override
  5. public void accept(Integer a, Integer b) { }
  6. });
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<List<Integer>> apply(Observable<Integer> o) throws Exception {
  3. return o.collect(new Callable<List<Integer>>() {
  4. @Override
  5. public List<Integer> call() throws Exception {
  6. return new ArrayList<Integer>();
  7. }
  8. }, new BiConsumer<List<Integer>, Integer>() {
  9. @Override
  10. public void accept(List<Integer> a, Integer b) throws Exception {
  11. a.add(b);
  12. }
  13. }).toObservable();
  14. }
  15. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Observable<Integer> o) throws Exception {
  3. return o.collect(new Callable<List<Integer>>() {
  4. @Override
  5. public List<Integer> call() throws Exception {
  6. return new ArrayList<Integer>();
  7. }
  8. }, new BiConsumer<List<Integer>, Integer>() {
  9. @Override
  10. public void accept(List<Integer> a, Integer b) throws Exception {
  11. a.add(b);
  12. }
  13. }).toObservable();
  14. }
  15. }, false, 1, 2, Arrays.asList(1));

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void collectInitialSupplierReturnsNull() {
  3. just1.collect(new Callable<Object>() {
  4. @Override
  5. public Object call() {
  6. return null;
  7. }
  8. }, new BiConsumer<Object, Integer>() {
  9. @Override
  10. public void accept(Object a, Integer b) { }
  11. }).blockingGet();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectToString() {
  3. String value = Observable.just(1, 2, 3).collect(new Callable<StringBuilder>() {
  4. @Override
  5. public StringBuilder call() {
  6. return new StringBuilder();
  7. }
  8. },
  9. new BiConsumer<StringBuilder, Integer>() {
  10. @Override
  11. public void accept(StringBuilder sb, Integer v) {
  12. if (sb.length() > 0) {
  13. sb.append("-");
  14. }
  15. sb.append(v);
  16. }
  17. }).blockingGet().toString();
  18. assertEquals("1-2-3", value);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectToStringObservable() {
  3. String value = Observable.just(1, 2, 3).collect(new Callable<StringBuilder>() {
  4. @Override
  5. public StringBuilder call() {
  6. return new StringBuilder();
  7. }
  8. },
  9. new BiConsumer<StringBuilder, Integer>() {
  10. @Override
  11. public void accept(StringBuilder sb, Integer v) {
  12. if (sb.length() > 0) {
  13. sb.append("-");
  14. }
  15. sb.append(v);
  16. }
  17. }).toObservable().blockingLast().toString();
  18. assertEquals("1-2-3", value);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectToList() {
  3. Single<List<Integer>> o = Observable.just(1, 2, 3)
  4. .collect(new Callable<List<Integer>>() {
  5. @Override
  6. public List<Integer> call() {
  7. return new ArrayList<Integer>();
  8. }
  9. }, new BiConsumer<List<Integer>, Integer>() {
  10. @Override
  11. public void accept(List<Integer> list, Integer v) {
  12. list.add(v);
  13. }
  14. });
  15. List<Integer> list = o.blockingGet();
  16. assertEquals(3, list.size());
  17. assertEquals(1, list.get(0).intValue());
  18. assertEquals(2, list.get(1).intValue());
  19. assertEquals(3, list.get(2).intValue());
  20. // test multiple subscribe
  21. List<Integer> list2 = o.blockingGet();
  22. assertEquals(3, list2.size());
  23. assertEquals(1, list2.get(0).intValue());
  24. assertEquals(2, list2.get(1).intValue());
  25. assertEquals(3, list2.get(2).intValue());
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns a Single that emits a single HashMap containing all items emitted by the
  3. * finite source ObservableSource, mapped by the keys returned by a specified
  4. * {@code keySelector} function.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.2.png" alt="">
  7. * <p>
  8. * If more than one source item maps to the same key, the HashMap will contain the latest of those items.
  9. * <p>
  10. * Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
  11. * be emitted. Sources that are infinite and never complete will never emit anything through this
  12. * operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
  13. * <dl>
  14. * <dt><b>Scheduler:</b></dt>
  15. * <dd>{@code toMap} does not operate by default on a particular {@link Scheduler}.</dd>
  16. * </dl>
  17. *
  18. * @param <K> the key type of the Map
  19. * @param keySelector
  20. * the function that extracts the key from a source item to be used in the HashMap
  21. * @return a Single that emits a single item: a HashMap containing the mapped items from the source
  22. * ObservableSource
  23. * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
  24. */
  25. @CheckReturnValue
  26. @SchedulerSupport(SchedulerSupport.NONE)
  27. public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector) {
  28. ObjectHelper.requireNonNull(keySelector, "keySelector is null");
  29. return collect(HashMapSupplier.<K, T>asCallable(), Functions.toMapKeySelector(keySelector));
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectToListObservable() {
  3. Observable<List<Integer>> o = Observable.just(1, 2, 3)
  4. .collect(new Callable<List<Integer>>() {
  5. @Override
  6. public List<Integer> call() {
  7. return new ArrayList<Integer>();
  8. }
  9. }, new BiConsumer<List<Integer>, Integer>() {
  10. @Override
  11. public void accept(List<Integer> list, Integer v) {
  12. list.add(v);
  13. }
  14. }).toObservable();
  15. List<Integer> list = o.blockingLast();
  16. assertEquals(3, list.size());
  17. assertEquals(1, list.get(0).intValue());
  18. assertEquals(2, list.get(1).intValue());
  19. assertEquals(3, list.get(2).intValue());
  20. // test multiple subscribe
  21. List<Integer> list2 = o.blockingLast();
  22. assertEquals(3, list2.size());
  23. assertEquals(1, list2.get(0).intValue());
  24. assertEquals(2, list2.get(1).intValue());
  25. assertEquals(3, list2.get(2).intValue());
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.range(1, 3).collect(new Callable<List<Integer>>() {
  4. @Override
  5. public List<Integer> call() throws Exception {
  6. return new ArrayList<Integer>();
  7. }
  8. }, new BiConsumer<List<Integer>, Integer>() {
  9. @Override
  10. public void accept(List<Integer> a, Integer b) throws Exception {
  11. a.add(b);
  12. }
  13. }));
  14. TestHelper.checkDisposed(Observable.range(1, 3).collect(new Callable<List<Integer>>() {
  15. @Override
  16. public List<Integer> call() throws Exception {
  17. return new ArrayList<Integer>();
  18. }
  19. }, new BiConsumer<List<Integer>, Integer>() {
  20. @Override
  21. public void accept(List<Integer> a, Integer b) throws Exception {
  22. a.add(b);
  23. }
  24. }).toObservable());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. ObjectHelper.requireNonNull(mapSupplier, "mapSupplier is null");
  2. ObjectHelper.requireNonNull(collectionFactory, "collectionFactory is null");
  3. return collect(mapSupplier, Functions.toMultimapKeyValueSelector(keySelector, valueSelector, collectionFactory));

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
  3. try {
  4. final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
  5. RxJavaPlugins.setErrorHandler(addToList(list));
  6. final RuntimeException e1 = new RuntimeException();
  7. final RuntimeException e2 = new RuntimeException();
  8. Burst.items(1).error(e2) //
  9. .collect(callableListCreator(), biConsumerThrows(e1)) //
  10. .test() //
  11. .assertError(e1) //
  12. .assertNotComplete();
  13. assertEquals(1, list.size());
  14. assertEquals(e2, list.get(0).getCause());
  15. } finally {
  16. RxJavaPlugins.reset();
  17. }
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInTwoErrorEmissionsObservable() {
  3. try {
  4. final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
  5. RxJavaPlugins.setErrorHandler(addToList(list));
  6. final RuntimeException e1 = new RuntimeException();
  7. final RuntimeException e2 = new RuntimeException();
  8. Burst.items(1).error(e2) //
  9. .collect(callableListCreator(), biConsumerThrows(e1)) //
  10. .toObservable()
  11. .test() //
  12. .assertError(e1) //
  13. .assertNotComplete();
  14. assertEquals(1, list.size());
  15. assertEquals(e2, list.get(0).getCause());
  16. } finally {
  17. RxJavaPlugins.reset();
  18. }
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
  3. final RuntimeException e = new RuntimeException();
  4. Burst.item(1).create() //
  5. .collect(callableListCreator(), biConsumerThrows(e)) //
  6. .test() //
  7. .assertError(e) //
  8. .assertNotComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * This uses the public API collect which uses scan under the covers.
  3. */
  4. @Test
  5. public void testSeedFactory() {
  6. Observable<List<Integer>> o = Observable.range(1, 10)
  7. .collect(new Callable<List<Integer>>() {
  8. @Override
  9. public List<Integer> call() {
  10. return new ArrayList<Integer>();
  11. }
  12. }, new BiConsumer<List<Integer>, Integer>() {
  13. @Override
  14. public void accept(List<Integer> list, Integer t2) {
  15. list.add(t2);
  16. }
  17. }).toObservable().takeLast(1);
  18. assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.blockingSingle());
  19. assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.blockingSingle());
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissionsObservable() {
  3. final RuntimeException e = new RuntimeException();
  4. final AtomicBoolean added = new AtomicBoolean();
  5. BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
  6. boolean once = true;
  7. @Override
  8. public void accept(Object o, Integer t) {
  9. if (once) {
  10. once = false;
  11. throw e;
  12. } else {
  13. added.set(true);
  14. }
  15. }
  16. };
  17. Burst.items(1, 2).create() //
  18. .collect(callableListCreator(), throwOnFirstOnly)//
  19. .test() //
  20. .assertError(e) //
  21. .assertNoValues() //
  22. .assertNotComplete();
  23. assertFalse(added.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
  3. final RuntimeException e = new RuntimeException();
  4. final AtomicBoolean added = new AtomicBoolean();
  5. BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
  6. boolean once = true;
  7. @Override
  8. public void accept(Object o, Integer t) {
  9. if (once) {
  10. once = false;
  11. throw e;
  12. } else {
  13. added.set(true);
  14. }
  15. }
  16. };
  17. Burst.items(1, 2).create() //
  18. .collect(callableListCreator(), throwOnFirstOnly)//
  19. .test() //
  20. .assertError(e) //
  21. .assertNoValues() //
  22. .assertNotComplete();
  23. assertFalse(added.get());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissionsObservable() {
  3. final RuntimeException e = new RuntimeException();
  4. Burst.item(1).create() //
  5. .collect(callableListCreator(), biConsumerThrows(e)) //
  6. .toObservable()
  7. .test() //
  8. .assertError(e) //
  9. .assertNotComplete();
  10. }

相关文章

Observable类方法