io.reactivex.Observable.blockingIterable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中io.reactivex.Observable.blockingIterable()方法的一些代码示例,展示了Observable.blockingIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingIterable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingIterable

Observable.blockingIterable介绍

[英]Converts this Observable into an Iterable.

Scheduler: blockingIterable does not operate by default on a particular Scheduler.
[中]将此可观察项转换为可观察项。
调度器:blockingIterable默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Integer apply(Integer v) throws Exception {
  3. Observable.just(1).delay(10, TimeUnit.SECONDS).blockingIterable().iterator().next();
  4. return v;
  5. }
  6. })

代码示例来源:origin: ReactiveX/RxJava

  1. Iterator<T> it = blockingIterable().iterator();
  2. while (it.hasNext()) {
  3. try {

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Converts this {@code Observable} into an {@link Iterable}.
  3. * <p>
  4. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @return an {@link Iterable} version of this {@code Observable}
  11. * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
  12. */
  13. @CheckReturnValue
  14. @SchedulerSupport(SchedulerSupport.NONE)
  15. public final Iterable<T> blockingIterable() {
  16. return blockingIterable(bufferSize());
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWhenMaxConcurrentIsOne() {
  3. for (int i = 0; i < 100; i++) {
  4. List<Observable<String>> os = new ArrayList<Observable<String>>();
  5. os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
  6. os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
  7. os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
  8. List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
  9. Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
  10. List<String> actual = new ArrayList<String>();
  11. while (iter.hasNext()) {
  12. actual.add(iter.next());
  13. }
  14. assertEquals(expected, actual);
  15. }
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testRangeWithOverflow5() {
  3. assertFalse(Observable.rangeLong(Long.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testRangeWithOverflow5() {
  3. assertFalse(Observable.range(Integer.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
  4. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Converts this {@code Observable} into an {@link Iterable}.
  3. * <p>
  4. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @return an {@link Iterable} version of this {@code Observable}
  11. * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
  12. */
  13. @CheckReturnValue
  14. @SchedulerSupport(SchedulerSupport.NONE)
  15. public final Iterable<T> blockingIterable() {
  16. return blockingIterable(bufferSize());
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testToIterator() {
  3. Observable<String> obs = Observable.just("one", "two", "three");
  4. Iterator<String> it = obs.blockingIterable().iterator();
  5. assertEquals(true, it.hasNext());
  6. assertEquals("one", it.next());
  7. assertEquals(true, it.hasNext());
  8. assertEquals("two", it.next());
  9. assertEquals(true, it.hasNext());
  10. assertEquals("three", it.next());
  11. assertEquals(false, it.hasNext());
  12. }

代码示例来源:origin: redisson/redisson

  1. Iterator<T> it = blockingIterable().iterator();
  2. while (it.hasNext()) {
  3. try {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMergeALotOfSourcesOneByOneSynchronously() {
  3. int n = 10000;
  4. List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
  5. for (int i = 0; i < n; i++) {
  6. sourceList.add(Observable.just(i));
  7. }
  8. Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).blockingIterable().iterator();
  9. int j = 0;
  10. while (it.hasNext()) {
  11. assertEquals((Integer)j, it.next());
  12. j++;
  13. }
  14. assertEquals(j, n);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Ignore("subscribe() should not throw")
  2. @Test(expected = TestException.class)
  3. public void testExceptionThrownFromOnSubscribe() {
  4. Iterable<String> strings = Observable.unsafeCreate(new ObservableSource<String>() {
  5. @Override
  6. public void subscribe(Observer<? super String> observer) {
  7. throw new TestException("intentional");
  8. }
  9. }).blockingIterable();
  10. for (String string : strings) {
  11. // never reaches here
  12. System.out.println(string);
  13. }
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMergeALotOfSourcesOneByOneSynchronouslyTakeHalf() {
  3. int n = 10000;
  4. List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
  5. for (int i = 0; i < n; i++) {
  6. sourceList.add(Observable.just(i));
  7. }
  8. Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).take(n / 2).blockingIterable().iterator();
  9. int j = 0;
  10. while (it.hasNext()) {
  11. assertEquals((Integer)j, it.next());
  12. j++;
  13. }
  14. assertEquals(j, n / 2);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = TestException.class)
  2. public void testToIteratorWithException() {
  3. Observable<String> obs = Observable.unsafeCreate(new ObservableSource<String>() {
  4. @Override
  5. public void subscribe(Observer<? super String> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. observer.onNext("one");
  8. observer.onError(new TestException());
  9. }
  10. });
  11. Iterator<String> it = obs.blockingIterable().iterator();
  12. assertEquals(true, it.hasNext());
  13. assertEquals("one", it.next());
  14. assertEquals(true, it.hasNext());
  15. it.next();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaxConcurrent() {
  3. for (int times = 0; times < 100; times++) {
  4. int observableCount = 100;
  5. // Test maxConcurrent from 2 to 12
  6. int maxConcurrent = 2 + (times % 10);
  7. AtomicInteger subscriptionCount = new AtomicInteger(0);
  8. List<Observable<String>> os = new ArrayList<Observable<String>>();
  9. List<SubscriptionCheckObservable> scos = new ArrayList<SubscriptionCheckObservable>();
  10. for (int i = 0; i < observableCount; i++) {
  11. SubscriptionCheckObservable sco = new SubscriptionCheckObservable(subscriptionCount, maxConcurrent);
  12. scos.add(sco);
  13. os.add(Observable.unsafeCreate(sco));
  14. }
  15. Iterator<String> iter = Observable.merge(os, maxConcurrent).blockingIterable().iterator();
  16. List<String> actual = new ArrayList<String>();
  17. while (iter.hasNext()) {
  18. actual.add(iter.next());
  19. }
  20. // System.out.println("actual: " + actual);
  21. assertEquals(5 * observableCount, actual.size());
  22. for (SubscriptionCheckObservable sco : scos) {
  23. assertFalse(sco.failed);
  24. }
  25. }
  26. }

代码示例来源:origin: akarnokd/RxJava2Jdk8Interop

  1. /**
  2. * Returns a blocking Stream of the elements of the Observable.
  3. * <p>
  4. * Closing the Stream will cancel the flow.
  5. * @param <T> the value type
  6. * @return the Function to be used with {@code Observable.to}.
  7. */
  8. public static <T> Function<Observable<T>, Stream<T>> toStream() {
  9. return f -> ZeroOneIterator.toStream(f.blockingIterable().iterator());
  10. }

代码示例来源:origin: PacktPublishing/Learning-RxJava

  1. @Test
  2. public void testIterable() {
  3. Observable<String> source =
  4. Observable.just("Alpha", "Beta", "Gamma", "Delta",
  5. "Zeta");
  6. Iterable<String> allWithLengthFive = source.filter(s ->
  7. s.length() == 5)
  8. .blockingIterable();
  9. for (String s: allWithLengthFive) {
  10. assertTrue(s.length() == 5);
  11. }
  12. }
  13. }

代码示例来源:origin: xjdr/xio

  1. @Test
  2. public void testHttp1toHttp1ServerPostMany() throws Exception {
  3. setupClient(NUM_REQUESTS, true);
  4. setupFrontBack(false, false);
  5. verify(multipleAsyncRequests(true).blockingIterable());
  6. assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
  7. assertProxiedRequests(NUM_REQUESTS * 2);
  8. }

代码示例来源:origin: xjdr/xio

  1. @Test
  2. public void testHttp2toHttp1ServerGetMany() throws Exception {
  3. setupClient(NUM_REQUESTS, true);
  4. setupFrontBack(true, false);
  5. verify(multipleAsyncRequests(false).blockingIterable());
  6. assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
  7. assertProxiedRequests(NUM_REQUESTS * 2);
  8. }

代码示例来源:origin: xjdr/xio

  1. @Test
  2. public void testHttp1toHttp2ServerPostMany() throws Exception {
  3. setupClient(NUM_REQUESTS, false);
  4. setupFrontBack(false, true);
  5. verify(multipleAsyncRequests(true).blockingIterable());
  6. assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
  7. assertProxiedRequests(NUM_REQUESTS * 2);
  8. }

代码示例来源:origin: xjdr/xio

  1. @Test
  2. public void testHttp1toHttp1ServerGetMany() throws Exception {
  3. setupClient(NUM_REQUESTS, true);
  4. setupFrontBack(false, false);
  5. verify(multipleAsyncRequests(false).blockingIterable());
  6. assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
  7. assertProxiedRequests(NUM_REQUESTS * 2);
  8. }

相关文章

Observable类方法