io.reactivex.Flowable.blockingFirst()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(315)

本文整理了Java中io.reactivex.Flowable.blockingFirst()方法的一些代码示例,展示了Flowable.blockingFirst()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.blockingFirst()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:blockingFirst

Flowable.blockingFirst介绍

[英]Returns the first item emitted by this Flowable, or throws NoSuchElementException if it emits no items. Backpressure: The operator consumes the source Flowable in an unbounded manner (i.e., no backpressure applied to it). Scheduler: blockingFirst does not operate by default on a particular Scheduler. Error handling: If the source signals an error, the operator wraps a checked Exceptioninto RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.
[中]返回此Flowable发出的第一个项,如果它不发出任何项,则抛出NoTouchElementException。背压:操作员以无限制的方式消耗可流动源(即,不施加背压)。调度程序:默认情况下,blockingFirst不会在特定调度程序上运行。错误处理:如果源发出错误信号,操作员将选中的异常包装到RuntimeException中并抛出该异常。否则,运行时异常和错误将按原样重试。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Integer apply(Integer v) throws Exception {
    Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingFirst();
    return v;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void firstOnError() {
  Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onError(new TestException());
    }
  });
  source.blockingFirst();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NoSuchElementException.class)
public void blockingFirstEmpty() {
  Flowable.empty().blockingFirst();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void firstFgnoredCancelAndOnNext() {
  Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onNext(1);
      s.onNext(2);
    }
  });
  assertEquals(1, source.blockingFirst().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirstNormal() {
  assertEquals(1, Flowable.just(1, 2).blockingFirst(3).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testListWithBlockingFirstFlowable() {
  Flowable<String> f = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  List<String> actual = f.toList().toFlowable().blockingFirst();
  Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timerDelayZero() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    for (int i = 0; i < 1000; i++) {
      Flowable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
    }
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureWithNoInitialValueObservable() throws InterruptedException {
  Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
  Flowable<Integer> reduced = source.reduce(sum).toFlowable();
  Integer r = reduced.blockingFirst();
  assertEquals(21, r.intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirstDefault() {
  assertEquals(1, Flowable.<Integer>empty()
      .subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirst() {
  assertEquals(1, Flowable.range(1, 10)
      .subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureWithInitialValueFlowable() throws InterruptedException {
  Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
  Flowable<Integer> reduced = source.reduce(0, sum).toFlowable();
  Integer r = reduced.blockingFirst();
  assertEquals(21, r.intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithFollowingFirstFlowable() {
  Flowable<Integer> f = Flowable.just(1, 3, 2, 5, 4);
  assertEquals(Arrays.asList(1, 2, 3, 4, 5), f.toSortedList().toFlowable().blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testElementAtWithIndexOutOfBoundsFlowable() {
  assertEquals(-100, Flowable.fromArray(1, 2).elementAt(2).toFlowable().blockingFirst(-100).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 30000)
public void testIssue1527Flowable() throws InterruptedException {
  //https://github.com/ReactiveX/RxJava/pull/1527
  Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
  Flowable<Integer> reduced = source.reduce(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer i1, Integer i2) {
      return i1 + i2;
    }
  }).toFlowable();
  Integer r = reduced.blockingFirst();
  assertEquals(21, r.intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstreamFlowable() {
  Flowable<Integer> source = Flowable.just(1).isEmpty()
    .flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
      @Override
      public Publisher<Integer> apply(Boolean t1) {
        return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
  Flowable<Integer> source = Flowable.just(1).isEmpty()
    .flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
      @Override
      public Publisher<Integer> apply(Boolean t1) {
        return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithFollowingFirstFlowable() {
  Flowable<Integer> f = Flowable.fromArray(1, 3, 5, 6);
  Flowable<Boolean> anyEven = f.any(new Predicate<Integer>() {
    @Override
    public boolean test(Integer i) {
      return i % 2 == 0;
    }
  }).toFlowable();
  assertTrue(anyEven.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFollowingFirstFlowable() {
  Flowable<Integer> f = Flowable.fromArray(1, 3, 5, 6);
  Flowable<Boolean> allOdd = f.all(new Predicate<Integer>() {
    @Override
    public boolean test(Integer i) {
      return i % 2 == 1;
    }
  })
  .toFlowable()
  ;
  assertFalse(allOdd.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorThrownIssue1685() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    FlowableProcessor<Object> processor = ReplayProcessor.create();
    Flowable.error(new RuntimeException("oops"))
      .materialize()
      .delay(1, TimeUnit.SECONDS)
      .dematerialize(Functions.<Notification<Object>>identity())
      .subscribe(processor);
    processor.subscribe();
    processor.materialize().blockingFirst();
    System.out.println("Done");
    TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
  Flowable<Integer> source = Flowable.just(1)
    .all(new Predicate<Integer>() {
      @Override
      public boolean test(Integer t1) {
        return false;
      }
    })
    .flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
      @Override
      public Publisher<Integer> apply(Boolean t1) {
        return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

相关文章

Flowable类方法