rx.Observable.window()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(262)

本文整理了Java中rx.Observable.window()方法的一些代码示例,展示了Observable.window()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.window()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:window

Observable.window介绍

[英]Returns an Observable that emits windows of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping windows, each containing count items. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.

Backpressure Support: This operator does not support backpressure as it uses count to control data flow. Scheduler: This version of window does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出它从源Observable收集的项目窗口。由此产生的可见光发射连接的、不重叠的窗口,每个窗口都包含计数项。当源可观察对象完成或遇到错误时,生成的可观察对象将发出当前窗口,并从源可观察对象传播通知。
背压支持:此运算符不支持背压,因为它使用计数来控制数据流。调度程序:默认情况下,此版本的Windows不会在特定的调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Bucket> call() {
  3. return inputEventStream
  4. .observe()
  5. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
  6. .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
  7. .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  8. }
  9. });

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2. final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3. final Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  5. Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  6. @Override
  7. public Observable<Output> call(Observable<Bucket> window) {
  8. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  9. }
  10. };
  11. this.sourceStream = bucketedStream //stream broken up into buckets
  12. .window(numBuckets, 1) //emit overlapping windows of buckets
  13. .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share() //multiple subscribers should get same data
  27. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  28. }

代码示例来源:origin: jhusain/learnrxjava

  1. private static void flatMapWindowedExampleAsync() {
  2. Observable.range(0, 5000).window(500).flatMap(work -> {
  3. return work.observeOn(Schedulers.computation()).map(item -> {
  4. // simulate computational work
  5. try {
  6. Thread.sleep(1);
  7. } catch (Exception e) {
  8. }
  9. return item + " processed " + Thread.currentThread();
  10. });
  11. }).toBlocking().forEach(System.out::println);
  12. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String args[]) {
  2. // buffer every 500ms (using 999999999 to mark start of output)
  3. hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  4. // buffer 10 items at a time (using 999999999 to mark start of output)
  5. hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  6. System.out.println("Done");
  7. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Returns an Observable that emits windows of items it collects from the source Observable. The resulting
  3. * Observable emits connected, non-overlapping windows, each containing {@code count} items. When the source
  4. * Observable completes or encounters an error, the resulting Observable emits the current window and
  5. * propagates the notification from the source Observable.
  6. * <p>
  7. * <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window3.png" alt="">
  8. * <dl>
  9. * <dt><b>Backpressure Support:</b></dt>
  10. * <dd>This operator does not support backpressure as it uses {@code count} to control data flow.</dd>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param count
  16. * the maximum size of each window before it should be emitted
  17. * @return an Observable that emits connected, non-overlapping windows, each containing at most
  18. * {@code count} items from the source Observable
  19. * @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
  20. * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
  21. */
  22. public final Observable<Observable<T>> window(int count) {
  23. return window(count, count);
  24. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<BatchStatement> call(Observable<BoundStatement> statements) {
  3. return statements
  4. .window(batchSize)
  5. .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  6. }
  7. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<BatchStatement> call(Observable<Statement> statements) {
  3. return statements
  4. .window(batchSize)
  5. .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  6. }
  7. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<BatchStatement> call(Observable<BoundStatement> statements) {
  3. return statements
  4. .window(batchSize)
  5. .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  6. }
  7. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<BatchStatement> call(Observable<Statement> statements) {
  3. return statements
  4. .window(batchSize)
  5. .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  6. }
  7. }

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).window(3).subscribe(new Action1<Observable<Integer>>() {
  4. @Override
  5. public void call(final Observable<Integer> integerObservable) {
  6. integerObservable.subscribe(new Action1<Integer>() {
  7. @Override
  8. public void call(Integer integer) {
  9. log(integer + " of window " + integerObservable);
  10. }
  11. });
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Bucket> call() {
  3. return inputEventStream
  4. .observe()
  5. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
  6. .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
  7. .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  8. }
  9. });

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_182() throws Exception {
  3. Observable
  4. .range(0, Integer.MAX_VALUE)
  5. .map(Picture::new)
  6. .window(1, TimeUnit.SECONDS)
  7. .flatMap(Observable::count)
  8. .subscribe(System.out::println);
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_192() throws Exception {
  3. Observable
  4. .range(0, Integer.MAX_VALUE)
  5. .map(Picture::new)
  6. .window(10, TimeUnit.SECONDS)
  7. .flatMap(Observable::distinct);
  8. }

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

  1. @Override
  2. public Observable<FeedResponse<T>> call(Observable<T> source) {
  3. return source
  4. // .windows: creates an observable of observable where inner observable
  5. // emits max maxPageSize elements
  6. .window(maxPageSize)
  7. .map(o -> o.toList())
  8. // flattens the observable<Observable<List<T>>> to Observable<List<T>>
  9. .flatMap(resultListObs -> resultListObs, 1)
  10. // translates Observable<List<T>> to Observable<FeedResponsePage<T>>>
  11. .map(resultList -> {
  12. // construct a page from result of
  13. return BridgeInternal.createFeedResponse(resultList, headerResponse(tracker.getAndResetCharge()));
  14. }).switchIfEmpty(
  15. Observable.defer(() -> {
  16. // create an empty page if there is no result
  17. return Observable.just(BridgeInternal.createFeedResponse(
  18. Utils.immutableListOf(),
  19. headerResponse(tracker.getAndResetCharge())));
  20. }));
  21. }
  22. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_216() throws Exception {
  3. Observable<KeyEvent> keyEvents = empty();
  4. Observable<Observable<KeyEvent>> windows = keyEvents.window(1, SECONDS);
  5. Observable<Integer> eventPerSecond = windows
  6. .flatMap(eventsInSecond -> eventsInSecond.count());
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_82() throws Exception {
  3. HystrixCommandCompletionStream
  4. .getInstance(HystrixCommandKey.Factory.asKey("FetchRating"))
  5. .observe()
  6. .filter(e -> e.getEventCounts().getCount(FAILURE) > 0)
  7. .window(1, TimeUnit.SECONDS)
  8. .flatMap(Observable::count)
  9. .subscribe(x -> log.info("{} failures/s", x));
  10. }

相关文章

Observable类方法