io.reactivex.Observable.flatMapIterable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(136)

本文整理了Java中io.reactivex.Observable.flatMapIterable()方法的一些代码示例,展示了Observable.flatMapIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapIterable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMapIterable

Observable.flatMapIterable介绍

[英]Returns an Observable that merges each item emitted by the source ObservableSource with the values in an Iterable corresponding to that item that is generated by a selector.

Scheduler: flatMapIterable does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Observable<Integer> o) throws Exception {
  3. return o.flatMapIterable(new Function<Object, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Object v) throws Exception {
  6. return Arrays.asList(10, 20);
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 10, 20);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableCombinerNull() {
  3. just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Integer v) {
  6. return Arrays.asList(1);
  7. }
  8. }, null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableMapperNull() {
  3. just1.flatMapIterable(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableMapperIterableOneNull() {
  3. just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Integer v) {
  6. return Arrays.asList(1, null);
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableMapperReturnsNull() {
  3. just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableCombinerReturnsNull() {
  3. just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Integer v) {
  6. return Arrays.asList(1);
  7. }
  8. }, new BiFunction<Integer, Integer, Object>() {
  9. @Override
  10. public Object apply(Integer a, Integer b) {
  11. return null;
  12. }
  13. }).blockingSubscribe();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapIterableMapperIteratorNull() {
  3. just1.flatMapIterable(new Function<Integer, Iterable<Object>>() {
  4. @Override
  5. public Iterable<Object> apply(Integer v) {
  6. return new Iterable<Object>() {
  7. @Override
  8. public Iterator<Object> iterator() {
  9. return null;
  10. }
  11. };
  12. }
  13. }).blockingSubscribe();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testResultFunctionThrows() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. final List<Integer> list = Arrays.asList(1, 2, 3);
  5. Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
  6. @Override
  7. public List<Integer> apply(Integer t1) {
  8. return list;
  9. }
  10. };
  11. BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
  12. @Override
  13. public Integer apply(Integer t1, Integer t2) {
  14. throw new TestException();
  15. }
  16. };
  17. List<Integer> source = Arrays.asList(16, 32, 64);
  18. Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  19. verify(o, never()).onComplete();
  20. verify(o, never()).onNext(any());
  21. verify(o).onError(any(TestException.class));
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCollectionFunctionThrows() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
  5. @Override
  6. public List<Integer> apply(Integer t1) {
  7. throw new TestException();
  8. }
  9. };
  10. BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
  11. @Override
  12. public Integer apply(Integer t1, Integer t2) {
  13. return t1 | t2;
  14. }
  15. };
  16. List<Integer> source = Arrays.asList(16, 32, 64);
  17. Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  18. verify(o, never()).onComplete();
  19. verify(o, never()).onNext(any());
  20. verify(o).onError(any(TestException.class));
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void iterableMapperFunctionReturnsNull() {
  3. Observable.just(1)
  4. .flatMapIterable(new Function<Integer, Iterable<Object>>() {
  5. @Override
  6. public Iterable<Object> apply(Integer v) throws Exception {
  7. return null;
  8. }
  9. }, new BiFunction<Integer, Object, Object>() {
  10. @Override
  11. public Object apply(Integer v, Object w) throws Exception {
  12. return v;
  13. }
  14. })
  15. .test()
  16. .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable");
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that emits the events emitted by source ObservableSource, in a
  3. * sorted order based on a specified comparison function.
  4. *
  5. * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
  6. * might cause {@link OutOfMemoryError}
  7. *
  8. * <dl>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param sortFunction
  14. * a function that compares two items emitted by the source ObservableSource and returns an Integer
  15. * that indicates their sort order
  16. * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public final Observable<T> sorted(Comparator<? super T> sortFunction) {
  21. ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
  22. return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that emits the events emitted by source ObservableSource, in a
  3. * sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
  4. * other items in the sequence.
  5. * <p>
  6. * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
  7. * <p>
  8. * If any item emitted by this Observable does not implement {@link Comparable} with respect to
  9. * all other items emitted by this Observable, no items will be emitted and the
  10. * sequence is terminated with a {@link ClassCastException}.
  11. *
  12. * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
  13. * might cause {@link OutOfMemoryError}
  14. *
  15. * <dl>
  16. * <dt><b>Scheduler:</b></dt>
  17. * <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
  18. * </dl>
  19. * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final Observable<T> sorted() {
  24. return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNormal() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. final List<Integer> list = Arrays.asList(1, 2, 3);
  5. Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
  6. @Override
  7. public List<Integer> apply(Integer t1) {
  8. return list;
  9. }
  10. };
  11. BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
  12. @Override
  13. public Integer apply(Integer t1, Integer t2) {
  14. return t1 | t2;
  15. }
  16. };
  17. List<Integer> source = Arrays.asList(16, 32, 64);
  18. Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  19. for (Integer s : source) {
  20. for (Integer v : list) {
  21. verify(o).onNext(s | v);
  22. }
  23. }
  24. verify(o).onComplete();
  25. verify(o, never()).onError(any(Throwable.class));
  26. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Returns an Observable that emits the events emitted by source ObservableSource, in a
  3. * sorted order based on a specified comparison function.
  4. *
  5. * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
  6. * might cause {@link OutOfMemoryError}
  7. *
  8. * <dl>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param sortFunction
  14. * a function that compares two items emitted by the source ObservableSource and returns an Integer
  15. * that indicates their sort order
  16. * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public final Observable<T> sorted(Comparator<? super T> sortFunction) {
  21. ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
  22. return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
  23. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Returns an Observable that emits the events emitted by source ObservableSource, in a
  3. * sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
  4. * other items in the sequence.
  5. * <p>
  6. * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
  7. * <p>
  8. * If any item emitted by this Observable does not implement {@link Comparable} with respect to
  9. * all other items emitted by this Observable, no items will be emitted and the
  10. * sequence is terminated with a {@link ClassCastException}.
  11. *
  12. * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
  13. * might cause {@link OutOfMemoryError}
  14. *
  15. * <dl>
  16. * <dt><b>Scheduler:</b></dt>
  17. * <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
  18. * </dl>
  19. * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final Observable<T> sorted() {
  24. return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. .flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  2. @Override
  3. public Iterable<Integer> apply(Integer v)

代码示例来源:origin: bitrich-info/xchange-stream

  1. public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
  2. String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
  3. return subscribeChannel(channelName)
  4. .flatMapIterable(s -> {
  5. PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
  6. return Arrays.asList(transaction.getEvents());
  7. }).share();
  8. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. @Override
  2. public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
  3. Observable<GeminiTrade[]> subscribedTrades = service.subscribeChannel(currencyPair, args)
  4. .filter(s -> filterEventsByReason(s, "trade", null))
  5. .map((JsonNode s) -> {
  6. GeminiWebSocketTransaction transaction = mapper.treeToValue(s, GeminiWebSocketTransaction.class);
  7. return transaction.toGeminiTrades();
  8. });
  9. return subscribedTrades.flatMapIterable(s -> adaptTrades(s, currencyPair).getTrades());
  10. }
  11. }

代码示例来源:origin: sczyh30/vertx-blueprint-todo-backend

  1. @Override
  2. public Maybe<Todo> getCertain(String todoID) {
  3. return client.rxQueryWithParams(SQL_QUERY, new JsonArray().add(todoID))
  4. .map(ResultSet::getRows)
  5. .toObservable()
  6. .flatMapIterable(e -> e)
  7. .singleElement()
  8. .map(Todo::new);
  9. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. @Override
  2. public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
  3. String channelName = "lightning_executions_" + currencyPair.base.toString() + "_" + currencyPair.counter.toString();
  4. Observable<BitflyerTrade> tradeTransactions = streamingService.subscribeChannel(channelName).flatMapIterable(s -> {
  5. BitflyerPubNubTradesTransaction transaction = new BitflyerPubNubTradesTransaction(s);
  6. return transaction.toBitflyerTrades();
  7. });
  8. return tradeTransactions.map(s -> s.toTrade(currencyPair));
  9. }
  10. }

相关文章

Observable类方法