rx.Observable.onBackpressureDrop()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.6k)|赞(0)|评价(0)|浏览(116)

本文整理了Java中rx.Observable.onBackpressureDrop()方法的一些代码示例,展示了Observable.onBackpressureDrop()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onBackpressureDrop()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:onBackpressureDrop

Observable.onBackpressureDrop介绍

[英]Use this operator when the upstream does not natively support backpressure and you wish to drop onNext when unable to handle further events.

If the downstream request count hits 0 then onNext will be dropped until request(long n)is invoked again to increase the request count. Scheduler: onBackpressureDrop does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: PipelineAI/pipeline

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllUtilization)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllConfig)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: davidmoten/rxjava-extras

private static <Out> Observable<Notification<Out>> applyBackpressure(
    Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  if (backpressureStrategy == BackpressureStrategy.BUFFER)
    return o.onBackpressureBuffer();
  else if (backpressureStrategy == BackpressureStrategy.DROP)
    return o.onBackpressureDrop();
  else if (backpressureStrategy == BackpressureStrategy.LATEST)
    return o.onBackpressureLatest();
  else
    throw new IllegalArgumentException(
        "backpressure strategy not supported: " + backpressureStrategy);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

private static <Out> Observable<Notification<Out>> applyBackpressure(
    Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
  if (backpressureStrategy == BackpressureStrategy.BUFFER)
    return o.onBackpressureBuffer();
  else if (backpressureStrategy == BackpressureStrategy.DROP)
    return o.onBackpressureDrop();
  else if (backpressureStrategy == BackpressureStrategy.LATEST)
    return o.onBackpressureLatest();
  else
    throw new IllegalArgumentException(
        "backpressure strategy not supported: " + backpressureStrategy);
}

代码示例来源:origin: avluis/Hentoid

void process(File rootDir, String name) {
  cancelPrevOp();
  Observable<File> observable = new MakeDirObservable().create(rootDir, name);
  Observer<File> observer = new MakeDirObserver(dirTree, bus);
  subscription = observable.observeOn(Schedulers.io())
      .onBackpressureDrop()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(observer);
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: hawkular/hawkular-metrics

private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  return tObservable -> tObservable
      .retryWhen(observable -> {
        Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
        Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
        Observable<Integer> range = Observable.range(1, maxRetries);
        Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
          int delay = Math.min((int) Math.pow(2, i), maxDelay);
          logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
          return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
        });
        return Observable.merge(zipWith);
      });
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  return tObservable -> tObservable
      .retryWhen(observable -> {
        Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
        Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
        Observable<Integer> range = Observable.range(1, maxRetries);
        Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
          int delay = Math.min((int) Math.pow(2, i), maxDelay);
          logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
          return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
        });
        return Observable.merge(zipWith);
      });
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_213() throws Exception {
  Observable<Picture> fast = Observable
      .interval(10, MICROSECONDS)
      .map(Picture::new);
  Observable<Picture> slow = Observable
      .interval(11, MICROSECONDS)
      .map(Picture::new);
  Observable
      .zip(fast, slow, (f, s) -> f + " : " + s);
  Observable
      .zip(
          fast.onBackpressureDrop(),
          slow.onBackpressureDrop(),
          (f, s) -> f + " : " + s);
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  return tObservable -> tObservable
      .retryWhen(observable -> {
        Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
        Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
          log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
              " exception");
          if (t instanceof DriverException) {
            return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
          } else {
            return Observable.error(t);
          }
        });
        return Observable.merge(zipWith);
      })
      .doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
      .onErrorResumeNext(Observable.empty());
}

代码示例来源:origin: hawkular/hawkular-metrics

private <T> Observable.Transformer<T, T> applyRetryPolicy() {
  return tObservable -> tObservable
      .retryWhen(observable -> {
        Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
        Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
          log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
              " exception");
          if (t instanceof DriverException) {
            return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
          } else {
            return Observable.error(t);
          }
        });
        return Observable.merge(zipWith);
      })
      .doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
      .onErrorResumeNext(Observable.empty());
}

相关文章

Observable类方法