io.reactivex.Observable.bufferSize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(176)

本文整理了Java中io.reactivex.Observable.bufferSize()方法的一些代码示例,展示了Observable.bufferSize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.bufferSize()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:bufferSize

Observable.bufferSize介绍

[英]Returns the default 'island' size or capacity-increment hint for unbounded buffers.

Delegates to Flowable#bufferSize but is public for convenience.

The value can be overridden via system parameter rx2.buffer-sizebefore the Flowable class is loaded.
[中]返回无边界缓冲区的默认“孤岛”大小或容量增量提示。
代表可流动#缓冲区大小,但为方便起见是公开的。
该值可通过系统参数rx2覆盖。加载可流动类之前的缓冲区大小。

代码示例

代码示例来源:origin: ReactiveX/RxJava

SimplePlainQueue<T> getOrCreateQueue() {
  SimplePlainQueue<T> q = queue;
  if (q == null) {
    q = new SpscLinkedArrayQueue<T>(bufferSize());
    queue = q;
  }
  return q;
}

代码示例来源:origin: ReactiveX/RxJava

SimplePlainQueue<T> getOrCreateQueue() {
  SimplePlainQueue<T> q = queue;
  if (q == null) {
    q = new SpscLinkedArrayQueue<T>(bufferSize());
    queue = q;
  }
  return q;
}

代码示例来源:origin: ReactiveX/RxJava

SpscLinkedArrayQueue<R> getOrCreateQueue() {
  for (;;) {
    SpscLinkedArrayQueue<R> current = queue.get();
    if (current != null) {
      return current;
    }
    current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
    if (queue.compareAndSet(null, current)) {
      return current;
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

SpscLinkedArrayQueue<R> getOrCreateQueue() {
  for (;;) {
    SpscLinkedArrayQueue<R> current = queue.get();
    if (current != null) {
      return current;
    }
    current = new SpscLinkedArrayQueue<R>(Observable.bufferSize());
    if (queue.compareAndSet(null, current)) {
      return current;
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends R> apply(List<ObservableSource<? extends T>> list) {
    return Observable.zipIterable(list, zipper, false, Observable.bufferSize());
  }
}

代码示例来源:origin: ReactiveX/RxJava

JoinDisposable(Observer<? super R> actual,
    Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
    Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
        BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
  this.downstream = actual;
  this.disposables = new CompositeDisposable();
  this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
  this.lefts = new LinkedHashMap<Integer, TLeft>();
  this.rights = new LinkedHashMap<Integer, TRight>();
  this.error = new AtomicReference<Throwable>();
  this.leftEnd = leftEnd;
  this.rightEnd = rightEnd;
  this.resultSelector = resultSelector;
  this.active = new AtomicInteger(2);
}

代码示例来源:origin: ReactiveX/RxJava

GroupJoinDisposable(
    Observer<? super R> actual,
    Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
    Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
    BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector) {
  this.downstream = actual;
  this.disposables = new CompositeDisposable();
  this.queue = new SpscLinkedArrayQueue<Object>(bufferSize());
  this.lefts = new LinkedHashMap<Integer, UnicastSubject<TRight>>();
  this.rights = new LinkedHashMap<Integer, TRight>();
  this.error = new AtomicReference<Throwable>();
  this.leftEnd = leftEnd;
  this.rightEnd = rightEnd;
  this.resultSelector = resultSelector;
  this.active = new AtomicInteger(2);
}

代码示例来源:origin: ReactiveX/RxJava

BufferBoundaryObserver(Observer<? super C> actual,
    ObservableSource<? extends Open> bufferOpen,
    Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose,
    Callable<C> bufferSupplier
) {
  this.downstream = actual;
  this.bufferSupplier = bufferSupplier;
  this.bufferOpen = bufferOpen;
  this.bufferClose = bufferClose;
  this.queue = new SpscLinkedArrayQueue<C>(bufferSize());
  this.observers = new CompositeDisposable();
  this.upstream = new AtomicReference<Disposable>();
  this.buffers = new LinkedHashMap<Long, C>();
  this.errors = new AtomicThrowable();
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Converts this {@code Observable} into an {@link Iterable}.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an {@link Iterable} version of this {@code Observable}
 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
  return blockingIterable(bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
 * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the ObservableSource sequence of ObservableSources
 * @return the new ObservableSource with the concatenating behavior
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return concatDelayError(sources, bufferSize(), true);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void testInvalidMaxConcurrent() {
  Observable.just(1).concatMapEager(toJust, 0, Observable.bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void testInvalidCapacityHint() {
  Observable.just(1).concatMapEager(toJust, Observable.bufferSize(), 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenOnError() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  assertEquals(false, didRunOnTerminate.get());
  us.onError(new RuntimeException("some error"));
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenOnError() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  assertEquals(false, didRunOnTerminate.get());
  us.onError(new RuntimeException("some error"));
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenCanceled() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  final Disposable subscribe = us.subscribe();
  assertEquals(false, didRunOnTerminate.get());
  subscribe.dispose();
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenCanceled() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  final Disposable subscribe = us.subscribe();
  assertEquals(false, didRunOnTerminate.get());
  subscribe.dispose();
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenOnComplete() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastProcessor<Integer> us = UnicastProcessor.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  assertEquals(false, didRunOnTerminate.get());
  us.onComplete();
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onTerminateCalledWhenOnComplete() {
  final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
  UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
    @Override public void run() {
      didRunOnTerminate.set(true);
    }
  });
  assertEquals(false, didRunOnTerminate.get());
  us.onComplete();
  assertEquals(true, didRunOnTerminate.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    return Observable.range(1, Observable.bufferSize() * 2)
        .doOnNext(new Consumer<Integer>() {
          @Override
          public void accept(Integer t) {
            count.getAndIncrement();
          }
        }).hide();
  }
}).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void longEager() {
  Observable.range(1, 2 * Observable.bufferSize())
  .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) {
      return Observable.just(1);
    }
  })
  .test()
  .assertValueCount(2 * Observable.bufferSize())
  .assertNoErrors()
  .assertComplete();
}

相关文章

Observable类方法