io.reactivex.Observable.blockingForEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中io.reactivex.Observable.blockingForEach()方法的一些代码示例,展示了Observable.blockingForEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingForEach()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingForEach

Observable.blockingForEach介绍

[英]Invokes a method on each item emitted by this Observable and blocks until the Observable completes.

Note: This will block even if the underlying Observable is asynchronous.

This is similar to Observable#subscribe(Observer), but it blocks. Because it blocks it does not need the Observer#onComplete() or Observer#onError(Throwable) methods. If the underlying Observable terminates with an error, rather than calling onError, this method will throw an exception.

The difference between this method and #subscribe(Consumer) is that the onNext action is executed on the emission thread instead of the current thread. Scheduler: blockingForEach does not operate by default on a particular Scheduler. Error handling: If the source signals an error, the operator wraps a checked Exceptioninto RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.
[中]对该可观察对象发出的每个项调用一个方法,并阻塞,直到该可观察对象完成。
*注意:*即使基础的可观察对象是异步的,这也会阻塞。
这类似于Observable#subscribe(观察者),但它会阻止。因为它会阻塞,所以不需要Observer#onComplete()或Observer#onError(可丢弃)方法。如果基础Observable以错误终止,而不是调用OneError,则此方法将抛出异常。
此方法与#subscribe(Consumer)之间的区别在于,onNext操作是在emission线程而不是当前线程上执行的。Scheduler:blockingForEach默认情况下不会在特定计划程序上运行。错误处理:如果源发出错误信号,操作员将选中的异常包装到RuntimeException中并抛出该异常。否则,运行时异常和错误将按原样重新启动。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
  2. final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
  3. observable.blockingForEach(new Consumer<GroupedObservable<K, V>>() {
  4. @Override
  5. public void accept(final GroupedObservable<K, V> o) {
  6. result.put(o.getKey(), new ConcurrentLinkedQueue<V>());
  7. o.subscribe(new Consumer<V>() {
  8. @Override
  9. public void accept(V v) {
  10. result.get(o.getKey()).add(v);
  11. }
  12. });
  13. }
  14. });
  15. return result;
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = TestException.class)
  2. public void blockingForEachThrows() {
  3. Observable.just(1)
  4. .blockingForEach(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer e) throws Exception {
  7. throw new TestException();
  8. }
  9. });
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {
  2. final List<List<T>> lists = new ArrayList<List<T>>();
  3. Observable.concat(observables.map(new Function<Observable<T>, Observable<List<T>>>() {
  4. @Override
  5. public Observable<List<T>> apply(Observable<T> xs) {
  6. return xs.toList().toObservable();
  7. }
  8. }))
  9. .blockingForEach(new Consumer<List<T>>() {
  10. @Override
  11. public void accept(List<T> xs) {
  12. lists.add(xs);
  13. }
  14. });
  15. return lists;
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Integer apply(Integer v) throws Exception {
  3. Observable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer());
  4. return v;
  5. }
  6. })

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * This won't compile if super/extends isn't done correctly on generics.
  3. */
  4. @Test
  5. public void testCovarianceOfZip() {
  6. Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  7. Observable<CoolRating> ratings = Observable.just(new CoolRating());
  8. Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  9. Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  10. Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
  11. Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  12. Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
  13. Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * This won't compile if super/extends isn't done correctly on generics.
  3. */
  4. @Test
  5. public void testCovarianceOfCombineLatest() {
  6. Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  7. Observable<CoolRating> ratings = Observable.just(new CoolRating());
  8. Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  9. Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  10. Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
  11. Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  12. Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
  13. Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream.
  3. */
  4. @Test
  5. public void testObserveOnWithNewThreadScheduler() {
  6. final AtomicInteger count = new AtomicInteger();
  7. final int _multiple = 99;
  8. Observable.range(1, 100000).map(new Function<Integer, Integer>() {
  9. @Override
  10. public Integer apply(Integer t1) {
  11. return t1 * _multiple;
  12. }
  13. }).observeOn(Schedulers.newThread())
  14. .blockingForEach(new Consumer<Integer>() {
  15. @Override
  16. public void accept(Integer t1) {
  17. assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
  18. // FIXME toBlocking methods run on the current thread
  19. String name = Thread.currentThread().getName();
  20. assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
  21. }
  22. });
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
  3. */
  4. @Test
  5. public void testObserveOnWithThreadPoolScheduler() {
  6. final AtomicInteger count = new AtomicInteger();
  7. final int _multiple = 99;
  8. Observable.range(1, 100000).map(new Function<Integer, Integer>() {
  9. @Override
  10. public Integer apply(Integer t1) {
  11. return t1 * _multiple;
  12. }
  13. }).observeOn(Schedulers.computation())
  14. .blockingForEach(new Consumer<Integer>() {
  15. @Override
  16. public void accept(Integer t1) {
  17. assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
  18. // FIXME toBlocking methods run on the caller's thread
  19. String name = Thread.currentThread().getName();
  20. assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
  21. }
  22. });
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. .blockingForEach(new Consumer<Integer>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWindow() {
  3. final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
  4. Observable.concat(
  5. Observable.just(1, 2, 3, 4, 5, 6)
  6. .window(3)
  7. .map(new Function<Observable<Integer>, Observable<List<Integer>>>() {
  8. @Override
  9. public Observable<List<Integer>> apply(Observable<Integer> xs) {
  10. return xs.toList().toObservable();
  11. }
  12. })
  13. )
  14. .blockingForEach(new Consumer<List<Integer>>() {
  15. @Override
  16. public void accept(List<Integer> xs) {
  17. lists.add(xs);
  18. }
  19. });
  20. assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
  21. assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
  22. assertEquals(2, lists.size());
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testMultiTake() {
  3. final AtomicInteger count = new AtomicInteger();
  4. Observable.unsafeCreate(new ObservableSource<Integer>() {
  5. @Override
  6. public void subscribe(Observer<? super Integer> observer) {
  7. Disposable bs = Disposables.empty();
  8. observer.onSubscribe(bs);
  9. for (int i = 0; !bs.isDisposed(); i++) {
  10. System.out.println("Emit: " + i);
  11. count.incrementAndGet();
  12. observer.onNext(i);
  13. }
  14. }
  15. }).take(100).take(1).blockingForEach(new Consumer<Integer>() {
  16. @Override
  17. public void accept(Integer t1) {
  18. System.out.println("Receive: " + t1);
  19. }
  20. });
  21. assertEquals(1, count.get());
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 5000)
  2. public void toObservableNormal() {
  3. normal.completable.toObservable().blockingForEach(Functions.emptyConsumer());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 5000, expected = TestException.class)
  2. public void toObservableError() {
  3. error.completable.toObservable().blockingForEach(Functions.emptyConsumer());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUnsubscribeScan() throws Exception {
  3. ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
  4. .scan(new HashMap<String, String>(), new BiFunction<HashMap<String, String>, Event, HashMap<String, String>>() {
  5. @Override
  6. public HashMap<String, String> apply(HashMap<String, String> accum, Event perInstanceEvent) {
  7. accum.put("instance", perInstanceEvent.instanceId);
  8. return accum;
  9. }
  10. })
  11. .take(10)
  12. .blockingForEach(new Consumer<HashMap<String, String>>() {
  13. @Override
  14. public void accept(HashMap<String, String> pv) {
  15. System.out.println(pv);
  16. }
  17. });
  18. Thread.sleep(200); // make sure the event streams receive their interrupt
  19. }
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. Observable.merge(source).take(6).blockingForEach(new Consumer<Long>() {

代码示例来源:origin: ReactiveX/RxJava

  1. }).blockingForEach(new Consumer<String>() {

代码示例来源:origin: ReactiveX/RxJava

  1. }).blockingForEach(new Consumer<String>() {

代码示例来源:origin: ReactiveX/RxJava

  1. .blockingForEach(new Consumer<Object>() {
  2. @Override
  3. public void accept(Object pv) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeUnsubscribesOnGroupBy() throws Exception {
  3. Observable.merge(
  4. ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
  5. ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
  6. )
  7. // group by type (2 clusters)
  8. .groupBy(new Function<Event, String>() {
  9. @Override
  10. public String apply(Event event) {
  11. return event.type;
  12. }
  13. })
  14. .take(1)
  15. .blockingForEach(new Consumer<GroupedObservable<String, Event>>() {
  16. @Override
  17. public void accept(GroupedObservable<String, Event> v) {
  18. System.out.println(v);
  19. v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
  20. }
  21. });
  22. System.out.println("**** finished");
  23. Thread.sleep(200); // make sure the event streams receive their interrupt
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. .blockingForEach(new Consumer<Object>() {
  2. @Override
  3. public void accept(Object pv) {

相关文章

Observable类方法