rx.Observable.reduce()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(162)

本文整理了Java中rx.Observable.reduce()方法的一些代码示例,展示了Observable.reduce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.reduce()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:reduce

Observable.reduce介绍

[英]Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable and a specified seed value, then feeds the result of that function along with the second item emitted by an Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the final result from the final call to your function as its sole item.

This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject method that does a similar operation on lists. Backpressure Support: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: reduce does not operate by default on a particular Scheduler.
[中]返回一个Observable,该函数将您选择的函数应用于源Observable发出的第一个项和指定的种子值,然后将该函数的结果与源Observable发出的第二个项一起提供给同一个函数,依此类推,直到源Observable发出所有项,将函数的最终调用的最终结果作为其唯一项发出。
这种技术在这里被称为“reduce”,在其他编程环境中被称为“聚合”、“折叠”、“累积”、“压缩”或“注入”。例如,Groovy有一个inject方法,可以对列表执行类似的操作。背压支持:此运算符不支持背压,因为它会接收所有值,并将它们减少到单个onNext。调度程序:默认情况下,reduce不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Histogram> call(Observable<Histogram> window) {
    return window.reduce(distributionAggregator);
  }
};

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Integer> call(Observable<Integer> observedConcurrency) {
    return observedConcurrency.reduce(0, reduceToMax);
  }
};

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Histogram> call(Observable<Event> bucket) {
    return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  }
};

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Bucket> call(Observable<Event> eventBucket) {
    return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
  }
};

代码示例来源:origin: Netflix/servo

};
 return response.getContent()
   .reduce(new ByteArrayOutputStream(), accumulator)
   .map(ByteArrayOutputStream::toByteArray);
};

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  HttpClient client = vertx.createHttpClient();
  HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");
  req.toObservable().

    // Status code check and -> Observable<Buffer>
    flatMap(resp -> {
     if (resp.statusCode() != 200) {
      throw new RuntimeException("Wrong status code " + resp.statusCode());
     }
     return Observable.just(Buffer.buffer()).mergeWith(resp.toObservable());
    }).

    // Reduce all buffers in a single buffer
    reduce(Buffer::appendBuffer).

    // Turn in to a string
    map(buffer -> buffer.toString("UTF-8")).

    // Get a single buffer
    subscribe(data -> System.out.println("Server content " + data));

  // End request
  req.end();
 }
}

代码示例来源:origin: apache/usergrid

@Override
public int migrate( final int currentVersion, final ProgressObserver observer ) {
  final int migrationVersion = getMaxVersion();
  observer.start();
  connectionService.deDupeConnections( allApplicationsObservable.getData() ).reduce( 0l, ( count, deDuped ) -> {
    final long newCount = count + 1;
    /**
     * Update our progress observer
     */
    if ( newCount % UPDATE_COUNT == 0 ) {
      logger.info( "De duped {} edges", newCount );
      observer.update( migrationVersion, String.format( "De duped %d edges", newCount ) );
    }
    return newCount;
  } ).doOnNext( total -> {
    logger.info( "Completed de-duping {} edges", total );
    observer.complete();
  } ).toBlocking().lastOrDefault( null ); //want this to run through all records
  return migrationVersion;
}

代码示例来源:origin: apache/usergrid

/**
 * Tests that reduce emits
 */
@Test
public void testReduceEmpty(){
  final int result =  Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  assertEquals(0, result);
}

代码示例来源:origin: apache/usergrid

}, 10 )
.reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
  final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =
    rows.iterator();

代码示例来源:origin: apache/usergrid

.flatMap(writeEvent -> {
    return Observable.just(writeEvent).doOnNext(doWork);
  }, 10).reduce(0, heartbeatReducer).toBlocking().last();
  return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());
}, 10).reduce(0, heartbeatReducer).toBlocking().last();

代码示例来源:origin: jhusain/learnrxjava

/**
 * Retrieve the largest number.
 * 
 * Use reduce to select the maximum value in a list of numbers.
 */
public Observable<Integer> exerciseReduce(Observable<Integer> nums) {
  return nums.reduce((max, item) -> {
    if (item > max) {
      return item;
    } else {
      return max;
    }
  });
}

代码示例来源:origin: PipelineAI/pipeline

Observable < Boolean > reduced = zipped.reduce(true, new Func2<Boolean, Boolean, Boolean>() {
  @Override
  public Boolean call(Boolean a, Boolean b) {

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * Executes the update query immediately, blocking till completion and
 * returns total of counts of records affected.
 * 
 * @return total of counts of records affected by update queries
 */
public int execute() {
  return count().reduce(0, TotalHolder.TOTAL).toBlocking().single();
}

代码示例来源:origin: jhusain/learnrxjava

/**
 * Retrieve the id, title, and smallest box art url for every video.
 * 
 * Now let's try combining reduce() with our other functions to build more complex queries.
 * 
 * This is a variation of the problem we solved earlier, where we retrieved the url of the boxart with a
 * width of 150px. This time we'll use reduce() instead of filter() to retrieve the _smallest_ box art in
 * the boxarts list.
 * 
 * See Exercise 19 of ComposableListExercises
 */
public Observable<JSON> exerciseMovie(Observable<Movies> movies) {
  return movies.flatMap(ml -> {
    return ml.videos.<JSON> flatMap(v -> {
      return v.boxarts.reduce((max, box) -> {
        int maxSize = max.height * max.width;
        int boxSize = box.height * box.width;
        if (boxSize < maxSize) {
          return box;
        } else {
          return max;
        }
      }).map(maxBoxart -> {
        return json("id", v.id, "title", v.title, "boxart", maxBoxart.url);
      });
    });
  });
}

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * Returns an {@link Observable} that is the result of running a sequence of
 * update commands (insert/update/delete, ddl) read from the given
 * {@link Observable} sequence.
 * 
 * @param commands
 * @return
 */
public Observable<Integer> run(Observable<String> commands) {
  return commands.reduce(Observable.<Integer> empty(),
      new Func2<Observable<Integer>, String, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Observable<Integer> dep, String command) {
          return update(command).dependsOn(dep).count();
        }
      }).flatMap(Functions.<Observable<Integer>> identity());
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
    Observable.range(0, 10).reduce(new ArrayList<>(), (list, i) -> {
      list.add(i);
      return list;
    }).forEach(System.out::println);

    System.out.println("... vs ...");

    Observable.range(0, 10).scan(new ArrayList<>(), (list, i) -> {
      list.add(i);
      return list;
    }).forEach(System.out::println);
  }
}

代码示例来源:origin: davidmoten/rtree

private Rectangle calculateMaxView(RTree<T, S> tree) {
  return tree.entries().reduce(Optional.<Rectangle>absent(),
      new Func2<Optional<Rectangle>, Entry<T, S>, Optional<Rectangle>>() {
        @Override
        public Optional<Rectangle> call(Optional<Rectangle> r, Entry<T, S> entry) {
          if (r.isPresent())
            return of(r.get().add(entry.geometry().mbr()));
          else
            return of(entry.geometry().mbr());
        }
      }).toBlocking().single().or(rectangle(0, 0, 0, 0));
}

代码示例来源:origin: jhusain/learnrxjava

.groupBy(n -> n)
.flatMap(g -> {
  return g.take(3).reduce((s, s2) -> s + s2);
}).forEach(System.out::println);

代码示例来源:origin: com.netflix.rxjava/rxjava-math

private static <T> Observable<T> minMax(Observable<T> source,
    final Comparator<? super T> comparator, final long flag) {
  return source.reduce(new Func2<T, T, T>() {
    @Override
    public T call(T acc, T value) {
      if (flag * comparator.compare(acc, value) > 0) {
        return acc;
      }
      return value;
    }
  });
}

代码示例来源:origin: jhusain/learnrxjava

})
.reduce(new StreamState(), (state, playEvent) -> {
  System.out.println("    state: " + state + "  event: " + playEvent.id + "-" + playEvent.session);

相关文章

Observable类方法