本文整理了Java中io.reactivex.Flowable.bufferSize()
方法的一些代码示例,展示了Flowable.bufferSize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.bufferSize()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:bufferSize
[英]Returns the default internal buffer size used by most async operators.
The value can be overridden via system parameter rx2.buffer-sizebefore the Flowable class is loaded.
[中]返回大多数异步运算符使用的默认内部缓冲区大小。
该值可通过系统参数rx2覆盖。加载可流动类之前的缓冲区大小*。
代码示例来源:origin: ReactiveX/RxJava
SpscLinkedArrayQueue<R> getOrCreateQueue() {
for (;;) {
SpscLinkedArrayQueue<R> current = queue.get();
if (current != null) {
return current;
}
current = new SpscLinkedArrayQueue<R>(Flowable.bufferSize());
if (queue.compareAndSet(null, current)) {
return current;
}
}
}
代码示例来源:origin: ReactiveX/RxJava
SimplePlainQueue<T> getOrCreateQueue() {
SimplePlainQueue<T> q = queue;
if (q == null) {
q = new SpscArrayQueue<T>(bufferSize());
queue = q;
}
return q;
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<? extends R> apply(List<Publisher<? extends T>> list) {
return Flowable.zipIterable(list, zipper, false, Flowable.bufferSize());
}
}
代码示例来源:origin: ReactiveX/RxJava
SpscLinkedArrayQueue<R> getOrCreateQueue() {
for (;;) {
SpscLinkedArrayQueue<R> current = queue.get();
if (current != null) {
return current;
}
current = new SpscLinkedArrayQueue<R>(Flowable.bufferSize());
if (queue.compareAndSet(null, current)) {
return current;
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Integer i) {
return i % (Flowable.bufferSize() + 2);
}
})
代码示例来源:origin: ReactiveX/RxJava
MergeWithObserver(Subscriber<? super T> downstream) {
this.downstream = downstream;
this.mainSubscription = new AtomicReference<Subscription>();
this.otherObserver = new OtherObserver<T>(this);
this.error = new AtomicThrowable();
this.requested = new AtomicLong();
this.prefetch = bufferSize();
this.limit = prefetch - (prefetch >> 2);
}
代码示例来源:origin: ReactiveX/RxJava
MergeWithObserver(Subscriber<? super T> downstream) {
this.downstream = downstream;
this.mainSubscription = new AtomicReference<Subscription>();
this.otherObserver = new OtherObserver<T>(this);
this.error = new AtomicThrowable();
this.requested = new AtomicLong();
this.prefetch = bufferSize();
this.limit = prefetch - (prefetch >> 2);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
* in a round-robin fashion.
* @param <T> the value type
* @param source the source Publisher
* @return the ParallelFlowable instance
*/
@CheckReturnValue
public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) {
return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Take a Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
* @param <T> the value type
* @param source the source Publisher
* @param parallelism the number of parallel rails
* @return the new ParallelFlowable instance
*/
@CheckReturnValue
public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism) {
return from(source, parallelism, Flowable.bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoBackpressure() {
ArrayList<Long> list = new ArrayList<Long>(Flowable.bufferSize() * 2);
for (long i = 1; i <= Flowable.bufferSize() * 2 + 1; i++) {
list.add(i);
}
Observable<Long> o = Observable.rangeLong(1, list.size());
TestObserver<Long> to = new TestObserver<Long>();
o.subscribe(to);
to.assertValueSequence(list);
to.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 1) {
for (int i = 0; i < Flowable.bufferSize() - 1; i++) {
pp.onNext(i + 2);
}
}
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t) {
return Flowable.range(1, Flowable.bufferSize() * 2)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
}
}).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipLastWithBackpressure() {
Flowable<Integer> f = Flowable.range(0, Flowable.bufferSize() * 2).skipLast(Flowable.bufferSize() + 10);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
f.observeOn(Schedulers.computation()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((Flowable.bufferSize()) - 10, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testObserveOn() {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
incrementingIntegers(c).observeOn(Schedulers.computation()).take(num).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
System.out.println("testObserveOn => Received: " + ts.valueCount() + " Emitted: " + c.get());
assertEquals(num, ts.valueCount());
assertTrue(c.get() < Flowable.bufferSize() * 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 2)
.ambWith(Flowable.range(0, Flowable.bufferSize() * 2))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.delay(1, TimeUnit.MICROSECONDS) // make it a slightly slow consumer
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 2, ts.values().size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testInnerBackpressureWithAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 2)
.concatWith(Flowable.range(0, Flowable.bufferSize() * 2))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testInnerBackpressureWithoutAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, (Flowable.bufferSize() * 2) + 10)
.concatWith(Flowable.range(0, (Flowable.bufferSize() * 2) + 10))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((Flowable.bufferSize() * 4) + 20, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 500)
public void testWithObserveOn() throws InterruptedException {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 10).onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
ts.awaitTerminalEvent();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure2() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
.observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void longSequenceEquals() {
Flowable<Integer> source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation());
Flowable.sequenceEqual(source, source)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(true);
}
内容来源于网络,如有侵权,请联系作者删除!