本文整理了Java中io.reactivex.Flowable
类的一些代码示例,展示了Flowable
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable
类的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
[英]The Flowable class that implements the Reactive-Streams Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
Reactive-Streams operates with Publishers which Flowable extends. Many operators therefore accept general Publishers directly and allow direct interoperation with other Reactive-Streams implementations.
The Flowable hosts the default buffer size of 128 elements for operators, accessible via #bufferSize(), that can be overridden globally via the system parameter rx2.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
For more information see the ReactiveX documentation.
[中]实现反应流模式并提供工厂方法、中间运算符和使用反应数据流的能力的可流动类。
反应流与可流动扩展的发布服务器一起运行。因此,许多运营商直接接受通用发布服务器,并允许与其他反应流实现直接互操作。
Flowable为运算符承载128个元素的默认缓冲区大小,可通过#bufferSize()访问,可通过系统参数rx2全局覆盖。缓冲区大小。但是,大多数运算符都有重载,允许显式设置其内部缓冲区大小。
这个类的文档使用大理石图。以下图例说明了这些图表:
有关更多信息,请参阅ReactiveX documentation。
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFirstOrElseWithPredicateOfSomeFlowable() {
Flowable<String> src = Flowable.just("a", "b", "c", "d", "e", "f");
src.filter(IS_D).first("default").toFlowable().subscribe(w);
verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("d");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure2() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
.observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatTake() {
Flowable<Integer> xs = Flowable.just(1, 2);
Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer v) throws Exception {
return Flowable.range(1, 2).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer w) throws Exception {
throw new TestException();
}
});
}
}, true)
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Publisher<List<Long>> createPublisher(long elements) {
return
Flowable.fromIterable(iterate(elements))
.window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
.onBackpressureBuffer()
.flatMap((Function)Functions.identity())
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.switchOnNext(Flowable.just(
Flowable.fromIterable(iterate(elements)))
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalError() {
Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertNoValues();
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nonFusedCancelAfterRequestConditional2() {
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(2L);
Flowable.range(1, 2).hide()
.observeOn(Schedulers.single())
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()).subscribe(ts);
assertEquals(1, s.disposedCount.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerEscapeCompleted() {
Flowable<Integer> source = Flowable.just(0);
Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestSubscriber<Object> ts = new TestSubscriber<Object>();
m.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
System.out.println(ts.values());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelyNormalInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
Flowable.concat(
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testListFlowable() {
Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
Flowable<List<String>> flowable = w.toList().toFlowable();
Subscriber<List<String>> subscriber = TestHelper.mockSubscriber();
flowable.subscribe(subscriber);
verify(subscriber, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(subscriber, Mockito.never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake2() {
Flowable<Integer> f = Flowable.just(1, 2, 3, 4, 5);
Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
SquareStr squareStr = new SquareStr();
f.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
assertEquals(2, squareStr.counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorDelayedAsync() {
Flowable<Integer> source = Flowable.just(1, 2, 3)
.concatWith(Flowable.<Integer>error(new TestException()));
TestSubscriber<Integer> ts = TestSubscriber.create();
source.observeOn(Schedulers.computation(), true).subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
ts.assertValues(1, 2, 3);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testError2() {
Flowable<Integer> source = Flowable.concat(Flowable.just(0),
Flowable.<Integer> error(new TestException("Forced failure")));
Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestSubscriber<Object> ts = new TestSubscriber<Object>();
m.subscribe(ts);
ts.awaitTerminalEvent();
assertEquals(1, ts.errorCount());
ts.assertValueCount(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() {
Flowable<String> src = Flowable.just("a", "b", "c");
src.filter(IS_D).first("default").subscribe(wo);
verify(wo, times(1)).onSuccess(anyString());
verify(wo, times(1)).onSuccess("default");
verify(wo, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsMergeException() {
Flowable<Integer> onNext = Flowable.error(new TestException());
Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
Flowable<Integer> source = Flowable.fromIterable(Arrays.asList(10, 20, 30));
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void flatMapMaxConcurrentJustJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), 5).subscribe(ts);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backFusedErrorConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.<Integer>error(new TestException())
.observeOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.ASYNC)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void repeatScheduled() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1).subscribeOn(Schedulers.computation()).repeat(5).subscribe(ts);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertValues(1, 1, 1, 1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!