本文整理了Java中rx.Observable.onBackpressureDrop()
方法的一些代码示例,展示了Observable.onBackpressureDrop()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onBackpressureDrop()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:onBackpressureDrop
[英]Use this operator when the upstream does not natively support backpressure and you wish to drop onNext when unable to handle further events.
If the downstream request count hits 0 then onNext will be dropped until request(long n)is invoked again to increase the request count. Scheduler: onBackpressureDrop does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: PipelineAI/pipeline
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllUtilization)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: PipelineAI/pipeline
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: davidmoten/rxjava-extras
private static <Out> Observable<Notification<Out>> applyBackpressure(
Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
if (backpressureStrategy == BackpressureStrategy.BUFFER)
return o.onBackpressureBuffer();
else if (backpressureStrategy == BackpressureStrategy.DROP)
return o.onBackpressureDrop();
else if (backpressureStrategy == BackpressureStrategy.LATEST)
return o.onBackpressureLatest();
else
throw new IllegalArgumentException(
"backpressure strategy not supported: " + backpressureStrategy);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
private static <Out> Observable<Notification<Out>> applyBackpressure(
Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
if (backpressureStrategy == BackpressureStrategy.BUFFER)
return o.onBackpressureBuffer();
else if (backpressureStrategy == BackpressureStrategy.DROP)
return o.onBackpressureDrop();
else if (backpressureStrategy == BackpressureStrategy.LATEST)
return o.onBackpressureLatest();
else
throw new IllegalArgumentException(
"backpressure strategy not supported: " + backpressureStrategy);
}
代码示例来源:origin: avluis/Hentoid
void process(File rootDir, String name) {
cancelPrevOp();
Observable<File> observable = new MakeDirObservable().create(rootDir, name);
Observer<File> observer = new MakeDirObserver(dirTree, bus);
subscription = observable.observeOn(Schedulers.io())
.onBackpressureDrop()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: hawkular/hawkular-metrics
private <T> Observable.Transformer<T, T> applyRetryPolicy() {
return tObservable -> tObservable
.retryWhen(observable -> {
Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
Observable<Integer> range = Observable.range(1, maxRetries);
Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
int delay = Math.min((int) Math.pow(2, i), maxDelay);
logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
});
return Observable.merge(zipWith);
});
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
private <T> Observable.Transformer<T, T> applyRetryPolicy() {
return tObservable -> tObservable
.retryWhen(observable -> {
Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
Observable<Integer> range = Observable.range(1, maxRetries);
Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
int delay = Math.min((int) Math.pow(2, i), maxDelay);
logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
});
return Observable.merge(zipWith);
});
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_213() throws Exception {
Observable<Picture> fast = Observable
.interval(10, MICROSECONDS)
.map(Picture::new);
Observable<Picture> slow = Observable
.interval(11, MICROSECONDS)
.map(Picture::new);
Observable
.zip(fast, slow, (f, s) -> f + " : " + s);
Observable
.zip(
fast.onBackpressureDrop(),
slow.onBackpressureDrop(),
(f, s) -> f + " : " + s);
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
private <T> Observable.Transformer<T, T> applyRetryPolicy() {
return tObservable -> tObservable
.retryWhen(observable -> {
Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
" exception");
if (t instanceof DriverException) {
return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
} else {
return Observable.error(t);
}
});
return Observable.merge(zipWith);
})
.doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
.onErrorResumeNext(Observable.empty());
}
代码示例来源:origin: hawkular/hawkular-metrics
private <T> Observable.Transformer<T, T> applyRetryPolicy() {
return tObservable -> tObservable
.retryWhen(observable -> {
Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE);
Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
log.debug("Attempt #" + i + " to retry the operation after Cassandra client" +
" exception");
if (t instanceof DriverException) {
return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop();
} else {
return Observable.error(t);
}
});
return Observable.merge(zipWith);
})
.doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t))
.onErrorResumeNext(Observable.empty());
}
内容来源于网络,如有侵权,请联系作者删除!