io.reactivex.Observable.bufferSize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(205)

本文整理了Java中io.reactivex.Observable.bufferSize()方法的一些代码示例,展示了Observable.bufferSize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.bufferSize()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:bufferSize

Observable.bufferSize介绍

[英]Returns the default 'island' size or capacity-increment hint for unbounded buffers.

Delegates to Flowable#bufferSize but is public for convenience.

The value can be overridden via system parameter rx2.buffer-sizebefore the Flowable class is loaded.
[中]返回无边界缓冲区的默认“孤岛”大小或容量增量提示。
代表可流动#缓冲区大小,但为方便起见是公开的。
该值可通过系统参数rx2覆盖。加载可流动类之前的缓冲区大小。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. SimplePlainQueue<T> getOrCreateQueue() {
  2. SimplePlainQueue<T> q = queue;
  3. if (q == null) {
  4. q = new SpscLinkedArrayQueue<T>(bufferSize());
  5. queue = q;
  6. }
  7. return q;
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. SimplePlainQueue<T> getOrCreateQueue() {
  2. SimplePlainQueue<T> q = queue;
  3. if (q == null) {
  4. q = new SpscLinkedArrayQueue<T>(bufferSize());
  5. queue = q;
  6. }
  7. return q;
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. SpscLinkedArrayQueue<R> getOrCreateQueue() {
  2. for (;;) {
  3. SpscLinkedArrayQueue<R> current = queue.get();
  4. if (current != null) {
  5. return current;
  6. }
  7. current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
  8. if (queue.compareAndSet(null, current)) {
  9. return current;
  10. }
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. SpscLinkedArrayQueue<R> getOrCreateQueue() {
  2. for (;;) {
  3. SpscLinkedArrayQueue<R> current = queue.get();
  4. if (current != null) {
  5. return current;
  6. }
  7. current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
  8. if (queue.compareAndSet(null, current)) {
  9. return current;
  10. }
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<? extends R> apply(List<ObservableSource<? extends T>> list) {
  3. return Observable.zipIterable(list, zipper, false, Observable.bufferSize());
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. JoinDisposable(Observer<? super R> actual,
  2. Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
  3. Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
  4. BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
  5. this.downstream = actual;
  6. this.disposables = new CompositeDisposable();
  7. this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
  8. this.lefts = new LinkedHashMap<Integer, TLeft>();
  9. this.rights = new LinkedHashMap<Integer, TRight>();
  10. this.error = new AtomicReference<Throwable>();
  11. this.leftEnd = leftEnd;
  12. this.rightEnd = rightEnd;
  13. this.resultSelector = resultSelector;
  14. this.active = new AtomicInteger(2);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. GroupJoinDisposable(
  2. Observer<? super R> actual,
  3. Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
  4. Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
  5. BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector) {
  6. this.downstream = actual;
  7. this.disposables = new CompositeDisposable();
  8. this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
  9. this.lefts = new LinkedHashMap<Integer, UnicastSubject<TRight>>();
  10. this.rights = new LinkedHashMap<Integer, TRight>();
  11. this.error = new AtomicReference<Throwable>();
  12. this.leftEnd = leftEnd;
  13. this.rightEnd = rightEnd;
  14. this.resultSelector = resultSelector;
  15. this.active = new AtomicInteger(2);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. BufferBoundaryObserver(Observer<? super C> actual,
  2. ObservableSource<? extends Open> bufferOpen,
  3. Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose,
  4. Callable<C> bufferSupplier
  5. ) {
  6. this.downstream = actual;
  7. this.bufferSupplier = bufferSupplier;
  8. this.bufferOpen = bufferOpen;
  9. this.bufferClose = bufferClose;
  10. this.queue = new SpscLinkedArrayQueue<C>(bufferSize());
  11. this.observers = new CompositeDisposable();
  12. this.upstream = new AtomicReference<Disposable>();
  13. this.buffers = new LinkedHashMap<Long, C>();
  14. this.errors = new AtomicThrowable();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Converts this {@code Observable} into an {@link Iterable}.
  3. * <p>
  4. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @return an {@link Iterable} version of this {@code Observable}
  11. * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
  12. */
  13. @CheckReturnValue
  14. @SchedulerSupport(SchedulerSupport.NONE)
  15. public final Iterable<T> blockingIterable() {
  16. return blockingIterable(bufferSize());
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
  3. * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param <T> the common element base type
  12. * @param sources the ObservableSource sequence of ObservableSources
  13. * @return the new ObservableSource with the concatenating behavior
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  18. return concatDelayError(sources, bufferSize(), true);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidMaxConcurrent() {
  3. Observable.just(1).concatMapEager(toJust, 0, Observable.bufferSize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidCapacityHint() {
  3. Observable.just(1).concatMapEager(toJust, Observable.bufferSize(), 0);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenOnError() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. assertEquals(false, didRunOnTerminate.get());
  10. us.onError(new RuntimeException("some error"));
  11. assertEquals(true, didRunOnTerminate.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenOnError() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. assertEquals(false, didRunOnTerminate.get());
  10. us.onError(new RuntimeException("some error"));
  11. assertEquals(true, didRunOnTerminate.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenCanceled() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. final Disposable subscribe = us.subscribe();
  10. assertEquals(false, didRunOnTerminate.get());
  11. subscribe.dispose();
  12. assertEquals(true, didRunOnTerminate.get());
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenCanceled() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. final Disposable subscribe = us.subscribe();
  10. assertEquals(false, didRunOnTerminate.get());
  11. subscribe.dispose();
  12. assertEquals(true, didRunOnTerminate.get());
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenOnComplete() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. assertEquals(false, didRunOnTerminate.get());
  10. us.onComplete();
  11. assertEquals(true, didRunOnTerminate.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onTerminateCalledWhenOnComplete() {
  3. final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  4. UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
  5. @Override public void run() {
  6. didRunOnTerminate.set(true);
  7. }
  8. });
  9. assertEquals(false, didRunOnTerminate.get());
  10. us.onComplete();
  11. assertEquals(true, didRunOnTerminate.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Integer> apply(Integer t) {
  3. return Observable.range(1, Observable.bufferSize() * 2)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer t) {
  7. count.getAndIncrement();
  8. }
  9. }).hide();
  10. }
  11. }).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void longEager() {
  3. Observable.range(1, 2 * Observable.bufferSize())
  4. .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) {
  7. return Observable.just(1);
  8. }
  9. })
  10. .test()
  11. .assertValueCount(2 * Observable.bufferSize())
  12. .assertNoErrors()
  13. .assertComplete();
  14. }

相关文章

Observable类方法