io.reactivex.Flowable.blockingForEach()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(377)

本文整理了Java中io.reactivex.Flowable.blockingForEach()方法的一些代码示例,展示了Flowable.blockingForEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.blockingForEach()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:blockingForEach

Flowable.blockingForEach介绍

[英]Invokes a method on each item emitted by this Flowable and blocks until the Flowable completes.

Note: This will block even if the underlying Flowable is asynchronous.

This is similar to Flowable#subscribe(Subscriber), but it blocks. Because it blocks it does not need the Subscriber#onComplete() or Subscriber#onError(Throwable) methods. If the underlying Flowable terminates with an error, rather than calling onError, this method will throw an exception.

The difference between this method and #subscribe(Consumer) is that the onNext action is executed on the emission thread instead of the current thread. Backpressure: The operator consumes the source Flowable in an unbounded manner (i.e., no backpressure applied to it). Scheduler: blockingForEach does not operate by default on a particular Scheduler. Error handling: If the source signals an error, the operator wraps a checked Exceptioninto RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.
[中]对该可流动项发出的每个项调用一个方法并阻塞,直到该可流动项完成。
*注意:*即使底层可流动是异步的,这也会阻塞。
这类似于Flowable#Subscriber(订阅者),但它会阻塞。因为它会阻塞,所以不需要Subscriber#onComplete()或Subscriber#onError(Throwable)方法。如果基础Flowable终止时出现错误,而不是调用onError,则此方法将引发异常。
此方法与#subscribe(Consumer)之间的区别在于onNext操作是在发射线程而不是当前线程上执行的。背压:操作员以无限制的方式消耗可流动源(即,不施加背压)。Scheduler:blockingForEach默认情况下不会在特定计划程序上运行。错误处理:如果源发出错误信号,操作员将选中的异常包装到RuntimeException中并抛出该异常。否则,运行时异常和错误将按原样重试。

代码示例

代码示例来源:origin: ReactiveX/RxJava

private static <K, V> Map<K, Collection<V>> toMap(Flowable<GroupedFlowable<K, V>> flowable) {
  final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
  flowable.blockingForEach(new Consumer<GroupedFlowable<K, V>>() {
    @Override
    public void accept(final GroupedFlowable<K, V> f) {
      result.put(f.getKey(), new ConcurrentLinkedQueue<V>());
      f.subscribe(new Consumer<V>() {
        @Override
        public void accept(V v) {
          result.get(f.getKey()).add(v);
        }
      });
    }
  });
  return result;
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void blockingForEachThrows() {
  Flowable.just(1)
  .blockingForEach(new Consumer<Integer>() {
    @Override
    public void accept(Integer e) throws Exception {
      throw new TestException();
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

private static <T> List<List<T>> toLists(Flowable<Flowable<T>> observables) {
  final List<List<T>> lists = new ArrayList<List<T>>();
  Flowable.concat(observables.map(new Function<Flowable<T>, Flowable<List<T>>>() {
    @Override
    public Flowable<List<T>> apply(Flowable<T> xs) {
      return xs.toList().toFlowable();
    }
  }))
      .blockingForEach(new Consumer<List<T>>() {
        @Override
        public void accept(List<T> xs) {
          lists.add(xs);
        }
      });
  return lists;
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This won't compile if super/extends isn't done correctly on generics.
 */
@Test
public void testCovarianceOfCombineLatest() {
  Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
  Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
  Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
  Flowable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This won't compile if super/extends isn't done correctly on generics.
 */
@Test
public void testCovarianceOfZip() {
  Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
  Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
  Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
  Flowable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
  Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Integer apply(Integer v) throws Exception {
    Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer());
    return v;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000, expected = TestException.class)
public void toFlowableError() {
  error.completable.toFlowable().blockingForEach(Functions.emptyConsumer());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void toFlowableNormal() {
  normal.completable.toFlowable().blockingForEach(Functions.emptyConsumer());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeScan() {
  FlowableEventStream.getEventStream("HTTP-ClusterB", 20)
  .scan(new HashMap<String, String>(), new BiFunction<HashMap<String, String>, Event, HashMap<String, String>>() {
    @Override
    public HashMap<String, String> apply(HashMap<String, String> accum, Event perInstanceEvent) {
      accum.put("instance", perInstanceEvent.instanceId);
      return accum;
    }
  })
  .take(10)
  .blockingForEach(new Consumer<HashMap<String, String>>() {
    @Override
    public void accept(HashMap<String, String> v) {
      System.out.println(v);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testMultiTake() {
  final AtomicInteger count = new AtomicInteger();
  Flowable.unsafeCreate(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> s) {
      BooleanSubscription bs = new BooleanSubscription();
      s.onSubscribe(bs);
      for (int i = 0; !bs.isCancelled(); i++) {
        System.out.println("Emit: " + i);
        count.incrementAndGet();
        s.onNext(i);
      }
    }
  }).take(100).take(1).blockingForEach(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      System.out.println("Receive: " + t1);
    }
  });
  assertEquals(1, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream.
 */
@Test
public void testObserveOnWithNewThreadScheduler() {
  final AtomicInteger count = new AtomicInteger();
  final int _multiple = 99;
  Flowable.range(1, 100000).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      return t1 * _multiple;
    }
  }).observeOn(Schedulers.newThread())
  .blockingForEach(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
      // FIXME toBlocking methods run on the current thread
      String name = Thread.currentThread().getName();
      assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
 */
@Test
public void testObserveOnWithThreadPoolScheduler() {
  final AtomicInteger count = new AtomicInteger();
  final int _multiple = 99;
  Flowable.range(1, 100000).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      return t1 * _multiple;
    }
  }).observeOn(Schedulers.computation())
  .blockingForEach(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
      // FIXME toBlocking methods run on the caller's thread
      String name = Thread.currentThread().getName();
      assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testWindow() {
    final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();

    Flowable.concat(
      Flowable.just(1, 2, 3, 4, 5, 6)
      .window(3)
      .map(new Function<Flowable<Integer>, Flowable<List<Integer>>>() {
        @Override
        public Flowable<List<Integer>> apply(Flowable<Integer> xs) {
          return xs.toList().toFlowable();
        }
      })
    )
    .blockingForEach(new Consumer<List<Integer>>() {
      @Override
      public void accept(List<Integer> xs) {
        lists.add(xs);
      }
    });

    assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
    assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
    assertEquals(2, lists.size());

  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public final void testMergeWithCurrentThreadScheduler1() {
  final String currentThreadName = Thread.currentThread().getName();
  Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
  Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
  Flowable<String> f = Flowable.<Integer> merge(f1, f2).subscribeOn(Schedulers.trampoline()).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) {
      assertTrue(Thread.currentThread().getName().equals(currentThreadName));
      return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
    }
  });
  f.blockingForEach(new Consumer<String>() {
    @Override
    public void accept(String t) {
      System.out.println("t: " + t);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public final void testMergeWithExecutorScheduler() {
  final String currentThreadName = Thread.currentThread().getName();
  Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
  Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
  Flowable<String> f = Flowable.<Integer> merge(f1, f2).subscribeOn(Schedulers.computation()).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) {
      assertFalse(Thread.currentThread().getName().equals(currentThreadName));
      assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
      return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
    }
  });
  f.blockingForEach(new Consumer<String>() {
    @Override
    public void accept(String t) {
      System.out.println("t: " + t);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * IO scheduler defaults to using CachedThreadScheduler.
 */
@Test
public final void testIOScheduler() {
  Flowable<Integer> f1 = Flowable.just(1, 2, 3, 4, 5);
  Flowable<Integer> f2 = Flowable.just(6, 7, 8, 9, 10);
  Flowable<String> f = Flowable.merge(f1, f2).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) {
      assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
      return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
    }
  });
  f.subscribeOn(Schedulers.io()).blockingForEach(new Consumer<String>() {
    @Override
    public void accept(String t) {
      System.out.println("t: " + t);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

.blockingForEach(new Consumer<HashMap<String, String>>() {
  @Override
  public void accept(HashMap<String, String> v) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeUnsubscribesOnGroupBy() {
  Flowable.merge(
    FlowableEventStream.getEventStream("HTTP-ClusterA", 50),
    FlowableEventStream.getEventStream("HTTP-ClusterB", 20)
  )
  // group by type (2 clusters)
  .groupBy(new Function<Event, Object>() {
    @Override
    public Object apply(Event event) {
      return event.type;
    }
  })
  .take(1)
  .blockingForEach(new Consumer<GroupedFlowable<Object, Event>>() {
    @Override
    public void accept(GroupedFlowable<Object, Event> v) {
      System.out.println(v);
      v.take(1).subscribe();  // FIXME groups need consumption to a certain degree to cancel upstream
    }
  });
  System.out.println("**** finished");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public final void testComputationThreadPool1() {
  Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
  Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
  Flowable<String> f = Flowable.<Integer> merge(f1, f2).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) {
      assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
      return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
    }
  });
  f.subscribeOn(Schedulers.computation()).blockingForEach(new Consumer<String>() {
    @Override
    public void accept(String t) {
      System.out.println("t: " + t);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

.blockingForEach(new Consumer<Object>() {
  @Override
  public void accept(Object v) {

相关文章

Flowable类方法