io.reactivex.Observable.groupBy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(170)

本文整理了Java中io.reactivex.Observable.groupBy()方法的一些代码示例,展示了Observable.groupBy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.groupBy()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:groupBy

Observable.groupBy介绍

[英]Groups the items emitted by an ObservableSource according to a specified criterion, and emits these grouped items as GroupedObservables. The emitted GroupedObservableSource allows only a single Observer during its lifetime and if this Observer calls dispose() before the source terminates, the next emission by the source having the same key will trigger a new GroupedObservableSource emission.

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservableSources that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like #ignoreElements to them. Scheduler: groupBy does not operate by default on a particular Scheduler.
[中]根据指定的条件对可观测资源发出的项进行分组,并将这些分组的项作为GroupedObservable发出。发出的GroupedObservableSource在其生存期内仅允许一个观察者,如果此观察者在源终止之前调用dispose(),则具有相同密钥的源的下一次发射将触发新的GroupedObservableSource发射。
*注:*GroupedObservable将缓存它要发送的项目,直到订阅为止。因此,为了避免内存泄漏,您不应该简单地忽略那些与您无关的GroupedObservable源。相反,您可以向他们发出信号,他们可以通过对其应用类似于#ignoreElements的运算符来丢弃缓冲区。调度器:默认情况下,groupBy不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. public void groupByKeyNull() {
  2. just1.groupBy(new Function<Integer, Object>() {
  3. @Override
  4. public Object apply(Integer v) {
  5. return null;
  6. }
  7. }).blockingSubscribe();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void groupByValueNull() {
  3. just1.groupBy(new Function<Integer, Object>() {
  4. @Override
  5. public Object apply(Integer v) {
  6. return v;
  7. }
  8. }, null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void groupByNull() {
  3. just1.groupBy(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupByWithElementSelector() {
  3. Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  4. Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
  5. Map<Integer, Collection<Integer>> map = toMap(grouped);
  6. assertEquals(3, map.size());
  7. assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
  8. assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
  9. assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupByWithElementSelector2() {
  3. Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  4. Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
  5. Map<Integer, Collection<Integer>> map = toMap(grouped);
  6. assertEquals(3, map.size());
  7. assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
  8. assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
  9. assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupByWithNullKey() {
  3. final String[] key = new String[]{"uninitialized"};
  4. final List<String> values = new ArrayList<String>();
  5. Observable.just("a", "b", "c").groupBy(new Function<String, String>() {
  6. @Override
  7. public String apply(String value) {
  8. return null;
  9. }
  10. }).subscribe(new Consumer<GroupedObservable<String, String>>() {
  11. @Override
  12. public void accept(GroupedObservable<String, String> groupedObservable) {
  13. key[0] = groupedObservable.getKey();
  14. groupedObservable.subscribe(new Consumer<String>() {
  15. @Override
  16. public void accept(String s) {
  17. values.add(s);
  18. }
  19. });
  20. }
  21. });
  22. assertEquals(null, key[0]);
  23. assertEquals(Arrays.asList("a", "b", "c"), values);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupBy() {
  3. Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  4. Observable<GroupedObservable<Integer, String>> grouped = source.groupBy(length);
  5. Map<Integer, Collection<String>> map = toMap(grouped);
  6. assertEquals(3, map.size());
  7. assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), map.get(3).toArray());
  8. assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
  9. assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void groupByValueReturnsNull() {
  3. just1.groupBy(new Function<Integer, Object>() {
  4. @Override
  5. public Object apply(Integer v) {
  6. return v;
  7. }
  8. }, new Function<Integer, Object>() {
  9. @Override
  10. public Object apply(Integer v) {
  11. return null;
  12. }
  13. }).blockingSubscribe();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).groupBy(Functions.justFunction(1)));
  4. Observable.just(1)
  5. .groupBy(Functions.justFunction(1))
  6. .doOnNext(new Consumer<GroupedObservable<Integer, Integer>>() {
  7. @Override
  8. public void accept(GroupedObservable<Integer, Integer> g) throws Exception {
  9. TestHelper.checkDisposed(g);
  10. }
  11. })
  12. .test();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
  3. // choose an asynchronous source
  4. Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
  5. // apply groupBy to the source
  6. Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
  7. // create two observers
  8. Observer<GroupedObservable<Boolean, Long>> o1 = TestHelper.mockObserver();
  9. Observer<GroupedObservable<Boolean, Long>> o2 = TestHelper.mockObserver();
  10. // subscribe with the observers
  11. stream.subscribe(o1);
  12. stream.subscribe(o2);
  13. // check that subscriptions were successful
  14. verify(o1, never()).onError(Mockito.<Throwable> any());
  15. verify(o2, never()).onError(Mockito.<Throwable> any());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Assert we get an IllegalStateException if trying to subscribe to an inner GroupedObservable more than once.
  3. */
  4. @Test
  5. public void testExceptionIfSubscribeToChildMoreThanOnce() {
  6. Observable<Integer> source = Observable.just(0);
  7. final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
  8. Observable<GroupedObservable<Integer, Integer>> m = source.groupBy(identity, dbl);
  9. m.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
  10. @Override
  11. public void accept(GroupedObservable<Integer, Integer> t1) {
  12. inner.set(t1);
  13. }
  14. });
  15. inner.get().subscribe();
  16. Observer<Integer> o2 = TestHelper.mockObserver();
  17. inner.get().subscribe(o2);
  18. verify(o2, never()).onComplete();
  19. verify(o2, never()).onNext(anyInt());
  20. verify(o2).onError(any(IllegalStateException.class));
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testGroupByUnsubscribe() {
  3. final Disposable upstream = mock(Disposable.class);
  4. Observable<Integer> o = Observable.unsafeCreate(
  5. new ObservableSource<Integer>() {
  6. @Override
  7. public void subscribe(Observer<? super Integer> observer) {
  8. observer.onSubscribe(upstream);
  9. }
  10. }
  11. );
  12. TestObserver<Object> to = new TestObserver<Object>();
  13. o.groupBy(new Function<Integer, Integer>() {
  14. @Override
  15. public Integer apply(Integer integer) {
  16. return null;
  17. }
  18. }).subscribe(to);
  19. to.dispose();
  20. verify(upstream).dispose();
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testEmpty() {
  3. Observable<String> source = Observable.empty();
  4. Observable<GroupedObservable<Integer, String>> grouped = source.groupBy(length);
  5. Map<Integer, Collection<String>> map = toMap(grouped);
  6. assertTrue(map.isEmpty());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerEscapeCompleted() {
  3. Observable<Integer> source = Observable.just(0);
  4. Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  5. TestObserver<Object> to = new TestObserver<Object>();
  6. m.subscribe(to);
  7. to.awaitTerminalEvent();
  8. to.assertNoErrors();
  9. System.out.println(to.values());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayErrorSimpleComplete() {
  3. Observable.just(1)
  4. .groupBy(Functions.justFunction(1), true)
  5. .flatMap(Functions.<Observable<Integer>>identity())
  6. .test()
  7. .assertResult(1);
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void keySelectorThrows() {
  3. Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  4. Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. m.subscribe(to);
  7. to.awaitTerminalEvent();
  8. assertEquals(1, to.errorCount());
  9. to.assertNoValues();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void valueSelectorThrows() {
  3. Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  4. Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. m.subscribe(to);
  7. to.awaitTerminalEvent();
  8. assertEquals(1, to.errorCount());
  9. to.assertNoValues();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testError2() {
  3. Observable<Integer> source = Observable.concat(Observable.just(0),
  4. Observable.<Integer> error(new TestException("Forced failure")));
  5. Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  6. TestObserver<Object> to = new TestObserver<Object>();
  7. m.subscribe(to);
  8. to.awaitTerminalEvent();
  9. assertEquals(1, to.errorCount());
  10. to.assertValueCount(1);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void keySelectorAndDelayError() {
  3. Observable.just(1).concatWith(Observable.<Integer>error(new TestException()))
  4. .groupBy(Functions.<Integer>identity(), true)
  5. .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(GroupedObservable<Integer, Integer> g) throws Exception {
  8. return g;
  9. }
  10. })
  11. .test()
  12. .assertFailure(TestException.class, 1);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void keyAndValueSelectorAndDelayError() {
  3. Observable.just(1).concatWith(Observable.<Integer>error(new TestException()))
  4. .groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), true)
  5. .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(GroupedObservable<Integer, Integer> g) throws Exception {
  8. return g;
  9. }
  10. })
  11. .test()
  12. .assertFailure(TestException.class, 1);
  13. }

相关文章

Observable类方法