本文整理了Java中io.reactivex.Flowable.blockingFirst()
方法的一些代码示例,展示了Flowable.blockingFirst()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.blockingFirst()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:blockingFirst
[英]Returns the first item emitted by this Flowable, or throws NoSuchElementException if it emits no items. Backpressure: The operator consumes the source Flowable in an unbounded manner (i.e., no backpressure applied to it). Scheduler: blockingFirst does not operate by default on a particular Scheduler. Error handling: If the source signals an error, the operator wraps a checked Exceptioninto RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.
[中]返回此Flowable发出的第一个项,如果它不发出任何项,则抛出NoTouchElementException。背压:操作员以无限制的方式消耗可流动源(即,不施加背压)。调度程序:默认情况下,blockingFirst不会在特定调度程序上运行。错误处理:如果源发出错误信号,操作员将选中的异常包装到RuntimeException中并抛出该异常。否则,运行时异常和错误将按原样重试。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Integer apply(Integer v) throws Exception {
Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingFirst();
return v;
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = TestException.class)
public void firstOnError() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new TestException());
}
});
source.blockingFirst();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NoSuchElementException.class)
public void blockingFirstEmpty() {
Flowable.empty().blockingFirst();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void firstFgnoredCancelAndOnNext() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onNext(1);
s.onNext(2);
}
});
assertEquals(1, source.blockingFirst().intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirstNormal() {
assertEquals(1, Flowable.just(1, 2).blockingFirst(3).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testListWithBlockingFirstFlowable() {
Flowable<String> f = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
List<String> actual = f.toList().toFlowable().blockingFirst();
Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timerDelayZero() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
for (int i = 0; i < 1000; i++) {
Flowable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
}
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithNoInitialValueObservable() throws InterruptedException {
Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable<Integer> reduced = source.reduce(sum).toFlowable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirstDefault() {
assertEquals(1, Flowable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirst() {
assertEquals(1, Flowable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithInitialValueFlowable() throws InterruptedException {
Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable<Integer> reduced = source.reduce(0, sum).toFlowable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithFollowingFirstFlowable() {
Flowable<Integer> f = Flowable.just(1, 3, 2, 5, 4);
assertEquals(Arrays.asList(1, 2, 3, 4, 5), f.toSortedList().toFlowable().blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testElementAtWithIndexOutOfBoundsFlowable() {
assertEquals(-100, Flowable.fromArray(1, 2).elementAt(2).toFlowable().blockingFirst(-100).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 30000)
public void testIssue1527Flowable() throws InterruptedException {
//https://github.com/ReactiveX/RxJava/pull/1527
Flowable<Integer> source = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable<Integer> reduced = source.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer i1, Integer i2) {
return i1 + i2;
}
}).toFlowable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstreamFlowable() {
Flowable<Integer> source = Flowable.just(1).isEmpty()
.flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Boolean t1) {
return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
Flowable<Integer> source = Flowable.just(1).isEmpty()
.flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Boolean t1) {
return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithFollowingFirstFlowable() {
Flowable<Integer> f = Flowable.fromArray(1, 3, 5, 6);
Flowable<Boolean> anyEven = f.any(new Predicate<Integer>() {
@Override
public boolean test(Integer i) {
return i % 2 == 0;
}
}).toFlowable();
assertTrue(anyEven.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFollowingFirstFlowable() {
Flowable<Integer> f = Flowable.fromArray(1, 3, 5, 6);
Flowable<Boolean> allOdd = f.all(new Predicate<Integer>() {
@Override
public boolean test(Integer i) {
return i % 2 == 1;
}
})
.toFlowable()
;
assertFalse(allOdd.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorThrownIssue1685() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
FlowableProcessor<Object> processor = ReplayProcessor.create();
Flowable.error(new RuntimeException("oops"))
.materialize()
.delay(1, TimeUnit.SECONDS)
.dematerialize(Functions.<Notification<Object>>identity())
.subscribe(processor);
processor.subscribe();
processor.materialize().blockingFirst();
System.out.println("Done");
TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
Flowable<Integer> source = Flowable.just(1)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer t1) {
return false;
}
})
.flatMapPublisher(new Function<Boolean, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Boolean t1) {
return Flowable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
内容来源于网络,如有侵权,请联系作者删除!