本文整理了Java中io.reactivex.Flowable.concatArrayEager()
方法的一些代码示例,展示了Flowable.concatArrayEager()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatArrayEager()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatArrayEager
[英]Concatenates a sequence of Publishers eagerly into a single stream of values.
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source Publishers. The operator buffers the values emitted by these Publishers and then drains them in order, each one after the previous one completes. Backpressure: The operator honors backpressure from downstream. The Publishersources are expected to honor backpressure as well. If any of the source Publishers violate this, the operator will signal a MissingBackpressureException. Scheduler: This method does not operate by default on a particular Scheduler.
[中]将一系列发布者急切地连接成一个单一的值流。
即时连接意味着一旦订阅者订阅,该操作员就订阅所有源发布者。运算符缓冲这些发布服务器发出的值,然后依次将其排出,在前一个发布服务器完成后再排出。背压:操作员接受来自下游的背压。预计出版商资源也将承受背压。如果任何源发布者违反此规定,操作员将发出MissingBackpressureException信号。调度程序:默认情况下,此方法不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates an array of Publishers eagerly into a single stream of values.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources) {
return concatArrayEager(bufferSize(), bufferSize(), sources);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.concatArrayEager(
Flowable.fromIterable(iterate(elements / 2)),
Flowable.fromIterable(iterate(elements - elements / 2))
)
;
}
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates a sequence of Publishers eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources) {
return concatArrayEager(bufferSize(), bufferSize(), sources);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testInnerEmpty() {
Flowable.concatArrayEager(Flowable.empty(), Flowable.empty()).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness5() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source, source).subscribe(tsBp);
Assert.assertEquals(5, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness8() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(tsBp);
Assert.assertEquals(8, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness9() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(tsBp);
Assert.assertEquals(9, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testInnerError() {
Flowable.concatArrayEager(Flowable.just(1), Flowable.error(new TestException())).subscribe(ts);
ts.assertValue(1);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness2() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source).subscribe(tsBp);
Assert.assertEquals(2, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness3() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source).subscribe(tsBp);
Assert.assertEquals(3, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness4() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source).subscribe(tsBp);
Assert.assertEquals(4, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness6() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source, source, source).subscribe(tsBp);
Assert.assertEquals(6, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness7() {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Flowable.concatArrayEager(source, source, source, source, source, source, source).subscribe(tsBp);
Assert.assertEquals(7, count.get());
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.assertNoValues();
tsBp.request(Long.MAX_VALUE);
tsBp.assertValueCount(count.get());
tsBp.assertNoErrors();
tsBp.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void testBackpressure() {
Flowable.concatArrayEager(Flowable.just(1), Flowable.just(1)).subscribe(tsBp);
tsBp.assertNoErrors();
tsBp.assertNoValues();
tsBp.assertNotComplete();
tsBp.request(1);
tsBp.assertValue(1);
tsBp.assertNoErrors();
tsBp.assertNotComplete();
tsBp.request(1);
tsBp.assertValues(1, 1);
tsBp.assertNoErrors();
tsBp.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!