rx.Observable.onBackpressureDrop()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.6k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中rx.Observable.onBackpressureDrop()方法的一些代码示例,展示了Observable.onBackpressureDrop()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onBackpressureDrop()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:onBackpressureDrop

Observable.onBackpressureDrop介绍

[英]Use this operator when the upstream does not natively support backpressure and you wish to drop onNext when unable to handle further events.

If the downstream request count hits 0 then onNext will be dropped until request(long n)is invoked again to increase the request count. Scheduler: onBackpressureDrop does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixUtilizationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllUtilization)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixConfigurationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllConfig)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: PipelineAI/pipeline

  1. private HystrixDashboardStream(int delayInMs) {
  2. this.delayInMs = delayInMs;
  3. this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
  4. .map(new Func1<Long, DashboardData>() {
  5. @Override
  6. public DashboardData call(Long timestamp) {
  7. return new DashboardData(
  8. HystrixCommandMetrics.getInstances(),
  9. HystrixThreadPoolMetrics.getInstances(),
  10. HystrixCollapserMetrics.getInstances()
  11. );
  12. }
  13. })
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share()
  27. .onBackpressureDrop();
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2. final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3. final Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  5. Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  6. @Override
  7. public Observable<Output> call(Observable<Bucket> window) {
  8. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  9. }
  10. };
  11. this.sourceStream = bucketedStream //stream broken up into buckets
  12. .window(numBuckets, 1) //emit overlapping windows of buckets
  13. .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share() //multiple subscribers should get same data
  27. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. private static <Out> Observable<Notification<Out>> applyBackpressure(
  2. Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  3. if (backpressureStrategy == BackpressureStrategy.BUFFER)
  4. return o.onBackpressureBuffer();
  5. else if (backpressureStrategy == BackpressureStrategy.DROP)
  6. return o.onBackpressureDrop();
  7. else if (backpressureStrategy == BackpressureStrategy.LATEST)
  8. return o.onBackpressureLatest();
  9. else
  10. throw new IllegalArgumentException(
  11. "backpressure strategy not supported: " + backpressureStrategy);
  12. }

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. private static <Out> Observable<Notification<Out>> applyBackpressure(
  2. Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  3. if (backpressureStrategy == BackpressureStrategy.BUFFER)
  4. return o.onBackpressureBuffer();
  5. else if (backpressureStrategy == BackpressureStrategy.DROP)
  6. return o.onBackpressureDrop();
  7. else if (backpressureStrategy == BackpressureStrategy.LATEST)
  8. return o.onBackpressureLatest();
  9. else
  10. throw new IllegalArgumentException(
  11. "backpressure strategy not supported: " + backpressureStrategy);
  12. }

代码示例来源:origin: avluis/Hentoid

  1. void process(File rootDir, String name) {
  2. cancelPrevOp();
  3. Observable<File> observable = new MakeDirObservable().create(rootDir, name);
  4. Observer<File> observer = new MakeDirObserver(dirTree, bus);
  5. subscription = observable.observeOn(Schedulers.io())
  6. .onBackpressureDrop()
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .subscribe(observer);
  9. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  2. return tObservable -> tObservable
  3. .retryWhen(observable -> {
  4. Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
  5. Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
  6. Observable<Integer> range = Observable.range(1, maxRetries);
  7. Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
  8. int delay = Math.min((int) Math.pow(2, i), maxDelay);
  9. logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
  10. return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
  11. });
  12. return Observable.merge(zipWith);
  13. });
  14. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  2. return tObservable -> tObservable
  3. .retryWhen(observable -> {
  4. Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
  5. Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
  6. Observable<Integer> range = Observable.range(1, maxRetries);
  7. Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
  8. int delay = Math.min((int) Math.pow(2, i), maxDelay);
  9. logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
  10. return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
  11. });
  12. return Observable.merge(zipWith);
  13. });
  14. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2. final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3. final Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  5. Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  6. @Override
  7. public Observable<Output> call(Observable<Bucket> window) {
  8. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  9. }
  10. };
  11. this.sourceStream = bucketedStream //stream broken up into buckets
  12. .window(numBuckets, 1) //emit overlapping windows of buckets
  13. .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share() //multiple subscribers should get same data
  27. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  28. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. private HystrixDashboardStream(int delayInMs) {
  2. this.delayInMs = delayInMs;
  3. this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
  4. .map(new Func1<Long, DashboardData>() {
  5. @Override
  6. public DashboardData call(Long timestamp) {
  7. return new DashboardData(
  8. HystrixCommandMetrics.getInstances(),
  9. HystrixThreadPoolMetrics.getInstances(),
  10. HystrixCollapserMetrics.getInstances()
  11. );
  12. }
  13. })
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share()
  27. .onBackpressureDrop();
  28. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_213() throws Exception {
  3. Observable<Picture> fast = Observable
  4. .interval(10, MICROSECONDS)
  5. .map(Picture::new);
  6. Observable<Picture> slow = Observable
  7. .interval(11, MICROSECONDS)
  8. .map(Picture::new);
  9. Observable
  10. .zip(fast, slow, (f, s) -> f + " : " + s);
  11. Observable
  12. .zip(
  13. fast.onBackpressureDrop(),
  14. slow.onBackpressureDrop(),
  15. (f, s) -> f + " : " + s);
  16. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  2. return tObservable -> tObservable
  3. .retryWhen(observable -> {
  4. Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
  5. Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
  6. log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
  7. " exception");
  8. if (t instanceof DriverException) {
  9. return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
  10. } else {
  11. return Observable.error(t);
  12. }
  13. });
  14. return Observable.merge(zipWith);
  15. })
  16. .doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
  17. .onErrorResumeNext(Observable.empty());
  18. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  2. return tObservable -> tObservable
  3. .retryWhen(observable -> {
  4. Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
  5. Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
  6. log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
  7. " exception");
  8. if (t instanceof DriverException) {
  9. return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
  10. } else {
  11. return Observable.error(t);
  12. }
  13. });
  14. return Observable.merge(zipWith);
  15. })
  16. .doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
  17. .onErrorResumeNext(Observable.empty());
  18. }

相关文章

Observable类方法