rx.Observable.onBackpressureBuffer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(184)

本文整理了Java中rx.Observable.onBackpressureBuffer()方法的一些代码示例,展示了Observable.onBackpressureBuffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onBackpressureBuffer()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:onBackpressureBuffer

Observable.onBackpressureBuffer介绍

[英]Instructs an Observable that is emitting items faster than its observer can consume them to buffer these items indefinitely until they can be emitted.

Scheduler: onBackpressureBuffer does not operate by default on a particular Scheduler.
[中]指示发射项目的速度快于其观察者使用项目的速度的可观察者无限期地缓冲这些项目,直到它们可以发射。
调度程序:onBackpressureBuffer默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: Rukey7/MvpApp

  1. /**
  2. * 一个默认的订阅方法
  3. *
  4. * @param type
  5. * @param next
  6. * @param error
  7. * @param <T>
  8. * @return
  9. */
  10. public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
  11. return toObservable(type)
  12. // 加上背压处理,不然有些地方会有异常,关于背压参考这里:https://gold.xitu.io/post/582d413c8ac24700619cceed
  13. .onBackpressureBuffer()
  14. .subscribeOn(Schedulers.io())
  15. .observeOn(AndroidSchedulers.mainThread())
  16. .subscribe(next, error);
  17. }

代码示例来源:origin: mesosphere/mesos-rxjava

  1. /**
  2. * Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
  3. * a burst of incoming Mesos messages is handled by an unbounded buffer rather than a
  4. * MissingBackpressureException.
  5. *
  6. * As an example, this may be necessary for Mesos schedulers that launch large numbers
  7. * of tasks at a time and then request reconciliation.
  8. *
  9. * @return this builder (allowing for further chained calls)
  10. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  11. */
  12. @NotNull
  13. public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
  14. ) {
  15. this.backpressureTransformer = observable -> observable.onBackpressureBuffer();
  16. return this;
  17. }

代码示例来源:origin: com.mesosphere.mesos.rx.java/mesos-rxjava-client

  1. /**
  2. * Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
  3. * a burst of incoming Mesos messages is handled by an unbounded buffer rather than a
  4. * MissingBackpressureException.
  5. *
  6. * As an example, this may be necessary for Mesos schedulers that launch large numbers
  7. * of tasks at a time and then request reconciliation.
  8. *
  9. * @return this builder (allowing for further chained calls)
  10. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  11. */
  12. @NotNull
  13. public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
  14. ) {
  15. this.backpressureTransformer = observable -> observable.onBackpressureBuffer();
  16. return this;
  17. }

代码示例来源:origin: com.mesosphere.mesos.rx.java/mesos-rxjava-client

  1. /**
  2. * Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
  3. * a burst of incoming Mesos messages is handled by a bounded buffer rather than a
  4. * MissingBackpressureException. If the buffer is overflown, a {@link java.nio.BufferOverflowException}
  5. * is thrown.
  6. *
  7. * As an example, this may be necessary for Mesos schedulers that launch large numbers
  8. * of tasks at a time and then request reconciliation.
  9. *
  10. * @param capacity number of slots available in the buffer.
  11. * @return this builder (allowing for further chained calls)
  12. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  13. */
  14. @NotNull
  15. public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
  16. final long capacity
  17. ) {
  18. this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity);
  19. return this;
  20. }

代码示例来源:origin: HelloChenJinJun/TestChat

  1. public <T> Subscription registerEvent(Class<T> type, Action1<T> action1, Action1<Throwable> error) {
  2. return getObservable(type)
  3. .onBackpressureBuffer().subscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(action1, error);
  6. }

代码示例来源:origin: HelloChenJinJun/TestChat

  1. public <T> Subscription registerEvent(Class<T> type, Action1<T> action1, Action1<Throwable> error) {
  2. return getObservable(type)
  3. .onBackpressureBuffer().subscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(action1, error);
  6. }

代码示例来源:origin: SmartDengg/RxBlur

  1. @CheckResult @NonNull
  2. public static Observable<Bitmap> animatorBlur(@NonNull Context context, @DrawableRes int resId, int radius,
  3. long duration) {
  4. return Observable.create(new AnimationBlurOnSubscribe(context, resId, radius, duration)).onBackpressureBuffer();
  5. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. private static <Out> Observable<Notification<Out>> applyBackpressure(
  2. Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  3. if (backpressureStrategy == BackpressureStrategy.BUFFER)
  4. return o.onBackpressureBuffer();
  5. else if (backpressureStrategy == BackpressureStrategy.DROP)
  6. return o.onBackpressureDrop();
  7. else if (backpressureStrategy == BackpressureStrategy.LATEST)
  8. return o.onBackpressureLatest();
  9. else
  10. throw new IllegalArgumentException(
  11. "backpressure strategy not supported: " + backpressureStrategy);
  12. }

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. private static <Out> Observable<Notification<Out>> applyBackpressure(
  2. Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  3. if (backpressureStrategy == BackpressureStrategy.BUFFER)
  4. return o.onBackpressureBuffer();
  5. else if (backpressureStrategy == BackpressureStrategy.DROP)
  6. return o.onBackpressureDrop();
  7. else if (backpressureStrategy == BackpressureStrategy.LATEST)
  8. return o.onBackpressureLatest();
  9. else
  10. throw new IllegalArgumentException(
  11. "backpressure strategy not supported: " + backpressureStrategy);
  12. }

代码示例来源:origin: marcoRS/rxjava-essentials

  1. private void loadList(List<AppInfo> apps) {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. getObservableApps(apps).onBackpressureBuffer()
  4. .subscribeOn(Schedulers.computation())
  5. .observeOn(AndroidSchedulers.mainThread())
  6. .subscribe(new Observer<AppInfo>() {
  7. @Override public void onCompleted() {
  8. mSwipeRefreshLayout.setRefreshing(false);
  9. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  10. }
  11. @Override public void onError(Throwable e) {
  12. Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
  13. if (mSwipeRefreshLayout != null) {
  14. mSwipeRefreshLayout.setRefreshing(false);
  15. }
  16. }
  17. @Override public void onNext(AppInfo appInfo) {
  18. mAddedApps.add(appInfo);
  19. mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
  20. }
  21. });
  22. }

代码示例来源:origin: easemob/kefu-android-demo

  1. /**
  2. * 订阅文件下载进度
  3. */
  4. private void subscribeLoadProgress() {
  5. rxSubscriptions.add(RxBus.getInstance()
  6. .toObservable(FileLoadEvent.class)
  7. .onBackpressureBuffer()
  8. .subscribeOn(Schedulers.io())
  9. .observeOn(AndroidSchedulers.mainThread())
  10. .subscribe(new Action1<FileLoadEvent>() {
  11. @Override
  12. public void call(FileLoadEvent fileLoadEvent) {
  13. progress(fileLoadEvent.getProgress(), fileLoadEvent.getTotal());
  14. }
  15. }));
  16. }

代码示例来源:origin: shanyao0/DownLoadManager

  1. /**
  2. * 订阅文件下载进度
  3. */
  4. private void subscribeLoadProgress() {
  5. rxSubscriptions.add(RxBus.getDefault()
  6. .toObservable(FileLoadingBean.class)
  7. .onBackpressureBuffer()
  8. .subscribeOn(Schedulers.io())
  9. .observeOn(AndroidSchedulers.mainThread())
  10. .subscribe(new Action1<FileLoadingBean>() {
  11. @Override
  12. public void call(FileLoadingBean fileLoadEvent) {
  13. onLoading(fileLoadEvent.getProgress(), fileLoadEvent.getTotal());
  14. }
  15. }));
  16. }
  17. /**

代码示例来源:origin: org.springframework.cloud/spring-cloud-netflix-hystrix

  1. public Function<HystrixObservableCommand<T>, Observable<T>> getObservableFunction() {
  2. Function<HystrixObservableCommand<T>, Observable<T>> observableFunc;
  3. if (this.toObservable != null) {
  4. observableFunc = this.toObservable;
  5. } else if (this.eager) {
  6. observableFunc = cmd -> cmd.observe();
  7. } else { // apply a default onBackpressureBuffer if not eager
  8. observableFunc = cmd -> cmd.toObservable().onBackpressureBuffer();
  9. }
  10. return observableFunc;
  11. }

代码示例来源:origin: dswarm/dswarm

  1. protected ConnectableObservable<GDMModel> transformResultModel(final Observable<org.dswarm.persistence.model.internal.Model> model) {
  2. final AtomicInteger resultCounter = new AtomicInteger(0);
  3. return model
  4. .onBackpressureBuffer(10000)
  5. .doOnSubscribe(() -> GDMModelTransformationFlow.LOG.debug("subscribed to results observable in transformation engine"))
  6. .doOnNext(resultObj -> {
  7. resultCounter.incrementAndGet();
  8. if (resultCounter.get() == 1) {
  9. GDMModelTransformationFlow.LOG.debug("received first result in transformation engine");
  10. }
  11. })
  12. .doOnCompleted(() -> GDMModelTransformationFlow.LOG.debug("received '{}' results in transformation engine overall", resultCounter.get()))
  13. .cast(org.dswarm.persistence.model.internal.gdm.GDMModel.class)
  14. .onBackpressureBuffer(10000)
  15. .publish();
  16. }

代码示例来源:origin: techery/janet

  1. @Override public Observable<ActionState<A>> call() {
  2. return pipeline.asObservable()
  3. .onBackpressureBuffer()
  4. .map(new Func1<ActionPair, ActionState>() {
  5. @Override public ActionState call(ActionPair pair) {
  6. return pair.state;
  7. }
  8. })
  9. .filter(new Func1<ActionState, Boolean>() {
  10. @Override public Boolean call(ActionState actionState) {
  11. return actionClass.isInstance(actionState.action);
  12. }
  13. }).compose(new CastToState<A>());
  14. }
  15. }, new Action1<A>() {

代码示例来源:origin: dswarm/dswarm

  1. @Override
  2. public Observable<JsonNode> generate(final Observable<GDMModel> recordGDM,
  3. final OutputStream outputStream) throws XMLStreamException {
  4. final StreamRDF writer = StreamRDFWriter.getWriterStream(outputStream, rdfSerializationFormat);
  5. writer.start();
  6. final ConcurrentHashMap<String, org.apache.jena.graph.Node> resourceNodeCache = new ConcurrentHashMap<>();
  7. final ConcurrentHashMap<String, org.apache.jena.graph.Node> predicateCache = new ConcurrentHashMap<>();
  8. return recordGDM
  9. .doOnSubscribe(() -> LOG.debug("subscribed to RDF export; will return data as '{}'", mediaType.toString()))
  10. .onBackpressureBuffer(10000)
  11. .map(recordGDMModel -> processRecordGDMModel(writer, resourceNodeCache, predicateCache, recordGDMModel))
  12. .map(org.dswarm.persistence.model.internal.Model::toGDMCompactJSON)
  13. .flatMapIterable(nodes -> {
  14. final ArrayList<JsonNode> nodeList = new ArrayList<>();
  15. Iterators.addAll(nodeList, nodes.elements());
  16. return nodeList;
  17. })
  18. .doOnCompleted(writer::finish)
  19. .doOnCompleted(() -> LOG.debug("finished RDF export; return data as '{}'", mediaType.toString()));
  20. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<T> call() {
  3. final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
  4. return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
  5. @Override
  6. public void call(Long n) {
  7. op.requestMore(n);
  8. }
  9. });
  10. }
  11. });

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<T> call() {
  3. final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
  4. return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
  5. @Override
  6. public void call(Long n) {
  7. op.requestMore(n);
  8. }
  9. });
  10. }
  11. });

代码示例来源:origin: dswarm/dswarm

  1. protected ConnectableObservable<org.dswarm.persistence.model.internal.Model> doPostProcessingOfResultModel(final GDMModelReceiver writer, final Scheduler scheduler) {
  2. final ConnectableObservable<GDMModel> modelConnectableObservable = writer.getObservable()
  3. .observeOn(scheduler)
  4. .onBackpressureBuffer(10000)
  5. .publish();
  6. final ConnectableObservable<org.dswarm.persistence.model.internal.Model> model = doPostProcessingOfResultModel(modelConnectableObservable);
  7. modelConnectableObservable.connect();
  8. return model;
  9. }

代码示例来源:origin: avluis/Hentoid

  1. void process(File rootDir) {
  2. if (rootDir.canRead()) {
  3. cancelPrevOp();
  4. updateDirList(rootDir);
  5. Observable<File> observable = new ListDirObservable().create(rootDir);
  6. Observer<File> observer = new ListDirObserver(dirTree, bus);
  7. subscription = observable.subscribeOn(Schedulers.io())
  8. .onBackpressureBuffer()
  9. .observeOn(AndroidSchedulers.mainThread())
  10. .subscribe(observer);
  11. } else {
  12. Timber.d("Failed to process directory list.");
  13. bus.post(new OpFailedEvent());
  14. }
  15. }

相关文章

Observable类方法