本文整理了Java中io.reactivex.Observable.bufferSize()
方法的一些代码示例,展示了Observable.bufferSize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.bufferSize()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:bufferSize
[英]Returns the default 'island' size or capacity-increment hint for unbounded buffers.
Delegates to Flowable#bufferSize but is public for convenience.
The value can be overridden via system parameter rx2.buffer-sizebefore the Flowable class is loaded.
[中]返回无边界缓冲区的默认“孤岛”大小或容量增量提示。
代表可流动#缓冲区大小,但为方便起见是公开的。
该值可通过系统参数rx2覆盖。加载可流动类之前的缓冲区大小。
代码示例来源:origin: ReactiveX/RxJava
SimplePlainQueue<T> getOrCreateQueue() {
SimplePlainQueue<T> q = queue;
if (q == null) {
q = new SpscLinkedArrayQueue<T>(bufferSize());
queue = q;
}
return q;
}
代码示例来源:origin: ReactiveX/RxJava
SimplePlainQueue<T> getOrCreateQueue() {
SimplePlainQueue<T> q = queue;
if (q == null) {
q = new SpscLinkedArrayQueue<T>(bufferSize());
queue = q;
}
return q;
}
代码示例来源:origin: ReactiveX/RxJava
SpscLinkedArrayQueue<R> getOrCreateQueue() {
for (;;) {
SpscLinkedArrayQueue<R> current = queue.get();
if (current != null) {
return current;
}
current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
if (queue.compareAndSet(null, current)) {
return current;
}
}
}
代码示例来源:origin: ReactiveX/RxJava
SpscLinkedArrayQueue<R> getOrCreateQueue() {
for (;;) {
SpscLinkedArrayQueue<R> current = queue.get();
if (current != null) {
return current;
}
current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
if (queue.compareAndSet(null, current)) {
return current;
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<? extends R> apply(List<ObservableSource<? extends T>> list) {
return Observable.zipIterable(list, zipper, false, Observable.bufferSize());
}
}
代码示例来源:origin: ReactiveX/RxJava
JoinDisposable(Observer<? super R> actual,
Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
this.downstream = actual;
this.disposables = new CompositeDisposable();
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
this.lefts = new LinkedHashMap<Integer, TLeft>();
this.rights = new LinkedHashMap<Integer, TRight>();
this.error = new AtomicReference<Throwable>();
this.leftEnd = leftEnd;
this.rightEnd = rightEnd;
this.resultSelector = resultSelector;
this.active = new AtomicInteger(2);
}
代码示例来源:origin: ReactiveX/RxJava
GroupJoinDisposable(
Observer<? super R> actual,
Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector) {
this.downstream = actual;
this.disposables = new CompositeDisposable();
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
this.lefts = new LinkedHashMap<Integer, UnicastSubject<TRight>>();
this.rights = new LinkedHashMap<Integer, TRight>();
this.error = new AtomicReference<Throwable>();
this.leftEnd = leftEnd;
this.rightEnd = rightEnd;
this.resultSelector = resultSelector;
this.active = new AtomicInteger(2);
}
代码示例来源:origin: ReactiveX/RxJava
BufferBoundaryObserver(Observer<? super C> actual,
ObservableSource<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose,
Callable<C> bufferSupplier
) {
this.downstream = actual;
this.bufferSupplier = bufferSupplier;
this.bufferOpen = bufferOpen;
this.bufferClose = bufferClose;
this.queue = new SpscLinkedArrayQueue<C>(bufferSize());
this.observers = new CompositeDisposable();
this.upstream = new AtomicReference<Disposable>();
this.buffers = new LinkedHashMap<Long, C>();
this.errors = new AtomicThrowable();
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Converts this {@code Observable} into an {@link Iterable}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an {@link Iterable} version of this {@code Observable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
return blockingIterable(bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
* one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the ObservableSource sequence of ObservableSources
* @return the new ObservableSource with the concatenating behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = IllegalArgumentException.class)
public void testInvalidMaxConcurrent() {
Observable.just(1).concatMapEager(toJust, 0, Observable.bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = IllegalArgumentException.class)
public void testInvalidCapacityHint() {
Observable.just(1).concatMapEager(toJust, Observable.bufferSize(), 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
assertEquals(false, didRunOnTerminate.get());
us.onError(new RuntimeException("some error"));
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
assertEquals(false, didRunOnTerminate.get());
us.onError(new RuntimeException("some error"));
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenCanceled() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
final Disposable subscribe = us.subscribe();
assertEquals(false, didRunOnTerminate.get());
subscribe.dispose();
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenCanceled() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
final Disposable subscribe = us.subscribe();
assertEquals(false, didRunOnTerminate.get());
subscribe.dispose();
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenOnComplete() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
assertEquals(false, didRunOnTerminate.get());
us.onComplete();
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onTerminateCalledWhenOnComplete() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});
assertEquals(false, didRunOnTerminate.get());
us.onComplete();
assertEquals(true, didRunOnTerminate.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
return Observable.range(1, Observable.bufferSize() * 2)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
}
}).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void longEager() {
Observable.range(1, 2 * Observable.bufferSize())
.concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) {
return Observable.just(1);
}
})
.test()
.assertValueCount(2 * Observable.bufferSize())
.assertNoErrors()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!