本文整理了Java中io.reactivex.Observable.flatMapIterable()
方法的一些代码示例,展示了Observable.flatMapIterable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapIterable()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMapIterable
[英]Returns an Observable that merges each item emitted by the source ObservableSource with the values in an Iterable corresponding to that item that is generated by a selector.
Scheduler: flatMapIterable does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Integer> o) throws Exception {
return o.flatMapIterable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return Arrays.asList(10, 20);
}
});
}
}, false, 1, 1, 10, 20);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableCombinerNull() {
just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) {
return Arrays.asList(1);
}
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableMapperNull() {
just1.flatMapIterable(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableMapperIterableOneNull() {
just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) {
return Arrays.asList(1, null);
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableMapperReturnsNull() {
just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableCombinerReturnsNull() {
just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) {
return Arrays.asList(1);
}
}, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapIterableMapperIteratorNull() {
just1.flatMapIterable(new Function<Integer, Iterable<Object>>() {
@Override
public Iterable<Object> apply(Integer v) {
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return null;
}
};
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testResultFunctionThrows() {
Observer<Object> o = TestHelper.mockObserver();
final List<Integer> list = Arrays.asList(1, 2, 3);
Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1) {
return list;
}
};
BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
throw new TestException();
}
};
List<Integer> source = Arrays.asList(16, 32, 64);
Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
verify(o, never()).onComplete();
verify(o, never()).onNext(any());
verify(o).onError(any(TestException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectionFunctionThrows() {
Observer<Object> o = TestHelper.mockObserver();
Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1) {
throw new TestException();
}
};
BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 | t2;
}
};
List<Integer> source = Arrays.asList(16, 32, 64);
Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
verify(o, never()).onComplete();
verify(o, never()).onNext(any());
verify(o).onError(any(TestException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void iterableMapperFunctionReturnsNull() {
Observable.just(1)
.flatMapIterable(new Function<Integer, Iterable<Object>>() {
@Override
public Iterable<Object> apply(Integer v) throws Exception {
return null;
}
}, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer v, Object w) throws Exception {
return v;
}
})
.test()
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable");
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that emits the events emitted by source ObservableSource, in a
* sorted order based on a specified comparison function.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sortFunction
* a function that compares two items emitted by the source ObservableSource and returns an Integer
* that indicates their sort order
* @return an Observable that emits the items emitted by the source ObservableSource in sorted order
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted(Comparator<? super T> sortFunction) {
ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that emits the events emitted by source ObservableSource, in a
* sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
* other items in the sequence.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
* <p>
* If any item emitted by this Observable does not implement {@link Comparable} with respect to
* all other items emitted by this Observable, no items will be emitted and the
* sequence is terminated with a {@link ClassCastException}.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return an Observable that emits the items emitted by the source ObservableSource in sorted order
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted() {
return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNormal() {
Observer<Object> o = TestHelper.mockObserver();
final List<Integer> list = Arrays.asList(1, 2, 3);
Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer t1) {
return list;
}
};
BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 | t2;
}
};
List<Integer> source = Arrays.asList(16, 32, 64);
Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
for (Integer s : source) {
for (Integer v : list) {
verify(o).onNext(s | v);
}
}
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: redisson/redisson
/**
* Returns an Observable that emits the events emitted by source ObservableSource, in a
* sorted order based on a specified comparison function.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sortFunction
* a function that compares two items emitted by the source ObservableSource and returns an Integer
* that indicates their sort order
* @return an Observable that emits the items emitted by the source ObservableSource in sorted order
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted(Comparator<? super T> sortFunction) {
ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
}
代码示例来源:origin: redisson/redisson
/**
* Returns an Observable that emits the events emitted by source ObservableSource, in a
* sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
* other items in the sequence.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
* <p>
* If any item emitted by this Observable does not implement {@link Comparable} with respect to
* all other items emitted by this Observable, no items will be emitted and the
* sequence is terminated with a {@link ClassCastException}.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return an Observable that emits the items emitted by the source ObservableSource in sorted order
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted() {
return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
}
代码示例来源:origin: ReactiveX/RxJava
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v)
代码示例来源:origin: bitrich-info/xchange-stream
public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
return subscribeChannel(channelName)
.flatMapIterable(s -> {
PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
return Arrays.asList(transaction.getEvents());
}).share();
}
代码示例来源:origin: bitrich-info/xchange-stream
@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
Observable<GeminiTrade[]> subscribedTrades = service.subscribeChannel(currencyPair, args)
.filter(s -> filterEventsByReason(s, "trade", null))
.map((JsonNode s) -> {
GeminiWebSocketTransaction transaction = mapper.treeToValue(s, GeminiWebSocketTransaction.class);
return transaction.toGeminiTrades();
});
return subscribedTrades.flatMapIterable(s -> adaptTrades(s, currencyPair).getTrades());
}
}
代码示例来源:origin: sczyh30/vertx-blueprint-todo-backend
@Override
public Maybe<Todo> getCertain(String todoID) {
return client.rxQueryWithParams(SQL_QUERY, new JsonArray().add(todoID))
.map(ResultSet::getRows)
.toObservable()
.flatMapIterable(e -> e)
.singleElement()
.map(Todo::new);
}
代码示例来源:origin: bitrich-info/xchange-stream
@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "lightning_executions_" + currencyPair.base.toString() + "_" + currencyPair.counter.toString();
Observable<BitflyerTrade> tradeTransactions = streamingService.subscribeChannel(channelName).flatMapIterable(s -> {
BitflyerPubNubTradesTransaction transaction = new BitflyerPubNubTradesTransaction(s);
return transaction.toBitflyerTrades();
});
return tradeTransactions.map(s -> s.toTrade(currencyPair));
}
}
内容来源于网络,如有侵权,请联系作者删除!