
x33g5p2x  于2022-01-25 转载在 其他  



[英]Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable and a specified seed value, then feeds the result of that function along with the second item emitted by an Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the final result from the final call to your function as its sole item.

This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject method that does a similar operation on lists. Backpressure Support: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: reduce does not operate by default on a particular Scheduler.


代码示例来源:origin: PipelineAI/pipeline

  public Observable<Histogram> call(Observable<Histogram> window) {
    return window.reduce(distributionAggregator);

代码示例来源:origin: PipelineAI/pipeline

  public Observable<Integer> call(Observable<Integer> observedConcurrency) {
    return observedConcurrency.reduce(0, reduceToMax);

代码示例来源:origin: PipelineAI/pipeline

  public Observable<Histogram> call(Observable<Event> bucket) {
    return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);

代码示例来源:origin: PipelineAI/pipeline

  public Observable<Bucket> call(Observable<Event> eventBucket) {
    return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);

代码示例来源:origin: Netflix/servo

 return response.getContent()
   .reduce(new ByteArrayOutputStream(), accumulator)

代码示例来源:origin: vert-x3/vertx-examples

 public void start() throws Exception {
  HttpClient client = vertx.createHttpClient();
  HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");

    // Status code check and -> Observable<Buffer>
    flatMap(resp -> {
     if (resp.statusCode() != 200) {
      throw new RuntimeException("Wrong status code " + resp.statusCode());
     return Observable.just(Buffer.buffer()).mergeWith(resp.toObservable());

    // Reduce all buffers in a single buffer

    // Turn in to a string
    map(buffer -> buffer.toString("UTF-8")).

    // Get a single buffer
    subscribe(data -> System.out.println("Server content " + data));

  // End request

代码示例来源:origin: apache/usergrid

public int migrate( final int currentVersion, final ProgressObserver observer ) {
  final int migrationVersion = getMaxVersion();
  connectionService.deDupeConnections( allApplicationsObservable.getData() ).reduce( 0l, ( count, deDuped ) -> {
    final long newCount = count + 1;
     * Update our progress observer
    if ( newCount % UPDATE_COUNT == 0 ) {
      logger.info( "De duped {} edges", newCount );
      observer.update( migrationVersion, String.format( "De duped %d edges", newCount ) );
    return newCount;
  } ).doOnNext( total -> {
    logger.info( "Completed de-duping {} edges", total );
  } ).toBlocking().lastOrDefault( null ); //want this to run through all records
  return migrationVersion;

代码示例来源:origin: apache/usergrid

 * Tests that reduce emits
public void testReduceEmpty(){
  final int result =  Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  assertEquals(0, result);

代码示例来源:origin: apache/usergrid

}, 10 )
.reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
  final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =

代码示例来源:origin: apache/usergrid

.flatMap(writeEvent -> {
    return Observable.just(writeEvent).doOnNext(doWork);
  }, 10).reduce(0, heartbeatReducer).toBlocking().last();
  return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());
}, 10).reduce(0, heartbeatReducer).toBlocking().last();

代码示例来源:origin: jhusain/learnrxjava

 * Retrieve the largest number.
 * Use reduce to select the maximum value in a list of numbers.
public Observable<Integer> exerciseReduce(Observable<Integer> nums) {
  return nums.reduce((max, item) -> {
    if (item > max) {
      return item;
    } else {
      return max;

代码示例来源:origin: PipelineAI/pipeline

Observable < Boolean > reduced = zipped.reduce(true, new Func2<Boolean, Boolean, Boolean>() {
  public Boolean call(Boolean a, Boolean b) {

代码示例来源:origin: davidmoten/rxjava-jdbc

 * Executes the update query immediately, blocking till completion and
 * returns total of counts of records affected.
 * @return total of counts of records affected by update queries
public int execute() {
  return count().reduce(0, TotalHolder.TOTAL).toBlocking().single();

代码示例来源:origin: jhusain/learnrxjava

 * Retrieve the id, title, and smallest box art url for every video.
 * Now let's try combining reduce() with our other functions to build more complex queries.
 * This is a variation of the problem we solved earlier, where we retrieved the url of the boxart with a
 * width of 150px. This time we'll use reduce() instead of filter() to retrieve the _smallest_ box art in
 * the boxarts list.
 * See Exercise 19 of ComposableListExercises
public Observable<JSON> exerciseMovie(Observable<Movies> movies) {
  return movies.flatMap(ml -> {
    return ml.videos.<JSON> flatMap(v -> {
      return v.boxarts.reduce((max, box) -> {
        int maxSize = max.height * max.width;
        int boxSize = box.height * box.width;
        if (boxSize < maxSize) {
          return box;
        } else {
          return max;
      }).map(maxBoxart -> {
        return json("id", v.id, "title", v.title, "boxart", maxBoxart.url);

代码示例来源:origin: davidmoten/rxjava-jdbc

 * Returns an {@link Observable} that is the result of running a sequence of
 * update commands (insert/update/delete, ddl) read from the given
 * {@link Observable} sequence.
 * @param commands
 * @return
public Observable<Integer> run(Observable<String> commands) {
  return commands.reduce(Observable.<Integer> empty(),
      new Func2<Observable<Integer>, String, Observable<Integer>>() {
        public Observable<Integer> call(Observable<Integer> dep, String command) {
          return update(command).dependsOn(dep).count();
      }).flatMap(Functions.<Observable<Integer>> identity());

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
    Observable.range(0, 10).reduce(new ArrayList<>(), (list, i) -> {
      return list;

    System.out.println("... vs ...");

    Observable.range(0, 10).scan(new ArrayList<>(), (list, i) -> {
      return list;

代码示例来源:origin: davidmoten/rtree

private Rectangle calculateMaxView(RTree<T, S> tree) {
  return tree.entries().reduce(Optional.<Rectangle>absent(),
      new Func2<Optional<Rectangle>, Entry<T, S>, Optional<Rectangle>>() {
        public Optional<Rectangle> call(Optional<Rectangle> r, Entry<T, S> entry) {
          if (r.isPresent())
            return of(r.get().add(entry.geometry().mbr()));
            return of(entry.geometry().mbr());
      }).toBlocking().single().or(rectangle(0, 0, 0, 0));

代码示例来源:origin: jhusain/learnrxjava

.groupBy(n -> n)
.flatMap(g -> {
  return g.take(3).reduce((s, s2) -> s + s2);

代码示例来源:origin: com.netflix.rxjava/rxjava-math

private static <T> Observable<T> minMax(Observable<T> source,
    final Comparator<? super T> comparator, final long flag) {
  return source.reduce(new Func2<T, T, T>() {
    public T call(T acc, T value) {
      if (flag * comparator.compare(acc, value) > 0) {
        return acc;
      return value;

代码示例来源:origin: jhusain/learnrxjava

.reduce(new StreamState(), (state, playEvent) -> {
  System.out.println("    state: " + state + "  event: " + playEvent.id + "-" + playEvent.session);

