rx.Observable.scan()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(185)

本文整理了Java中rx.Observable.scan()方法的一些代码示例,展示了Observable.scan()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.scan()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:scan

Observable.scan介绍

[英]Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable and a seed value, then feeds the result of that function along with the second item emitted by the source Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.

This sort of function is sometimes called an accumulator.

Note that the Observable that results from this method will emit initialValue as its first emitted item. Scheduler: scan does not operate by default on a particular Scheduler.
[中]返回一个Observable,将您选择的函数应用于源Observable发出的第一项和种子值,然后将该函数的结果与源Observable发出的第二项一起输入到同一个函数中,依此类推,直到源Observable发出所有项,发出每个迭代的结果。
这种函数有时被称为累加器。
请注意,此方法产生的可观察项将发出initialValue作为其第一个发出的项。计划程序:默认情况下,扫描不会在特定计划程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Output> call(Observable<Bucket> window) {
    return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  }
};

代码示例来源:origin: bumptech/glide

.scan(new Func2<List<Image>, List<Image>, List<Image>>() {
 @Override
 public List<Image> call(List<Image> images, List<Image> images2) {

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: davidmoten/rtree

/**
 * Returns the Observable sequence of trees created by progressively adding
 * entries.
 * 
 * @param entries
 *            the entries to add
 * @return a sequence of trees
 */
public Observable<RTree<T, S>> add(Observable<Entry<T, S>> entries) {
  return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
    @Override
    public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
      return tree.add(entry);
    }
  });
}

代码示例来源:origin: davidmoten/rtree

/**
 * Returns the Observable sequence of trees created by progressively deleting
 * entries.
 * 
 * @param entries
 *            the entries to add
 * @param all
 *            if true delete all matching otherwise just first matching
 * @return a sequence of trees
 */
public Observable<RTree<T, S>> delete(Observable<Entry<T, S>> entries, final boolean all) {
  return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
    @Override
    public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
      return tree.delete(entry, all);
    }
  });
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
    Observable.range(0, 10).reduce(new ArrayList<>(), (list, i) -> {
      list.add(i);
      return list;
    }).forEach(System.out::println);

    System.out.println("... vs ...");

    Observable.range(0, 10).scan(new ArrayList<>(), (list, i) -> {
      list.add(i);
      return list;
    }).forEach(System.out::println);
  }
}

代码示例来源:origin: davidmoten/rxjava-extras

public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
    final Func1<Double, Double> dfdx, double x0) {
  return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
    @Override
    public Double call(Double xn, Integer n) {
      return xn - f.call(xn) / dfdx.call(xn);
    }
  });
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
    final Func1<Double, Double> dfdx, double x0) {
  return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
    @Override
    public Double call(Double xn, Integer n) {
      return xn - f.call(xn) / dfdx.call(xn);
    }
  });
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
  public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
    return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
      @SuppressWarnings("unchecked")
      @Override
      public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
        final int value = n.getValue();
        if (predicate.call(value, term.getThrowable()).booleanValue())
          return Notification.createOnNext(value + 1);
        else
          return (Notification<Integer>) term;
      }
    });
  }
}

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
   @Override
   public Integer call(Integer integer, Integer integer2) {
    return integer + integer2;
   }
  }).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
 }
});

代码示例来源:origin: davidmoten/rxjava-extras

public static <T> Observable<List<Integer>> permutations(int size) {
  List<Integer> indexes = new ArrayList<Integer>(size);
  for (int i = 0; i < size; i++) {
    indexes.add(i);
  }
  return Observable.from(Permutations.iterable(indexes)).scan(indexes,
      new Func2<List<Integer>, Swap<Integer>, List<Integer>>() {
        @Override
        public List<Integer> call(List<Integer> a, Swap<Integer> swap) {
          List<Integer> b = new ArrayList<Integer>(a);
          b.set(swap.left(), a.get(swap.right()));
          b.set(swap.right(), a.get(swap.left()));
          return b;
        }
      });
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

@Override
  public Observable<Output> call(Observable<Bucket> window) {
    return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  }
};

代码示例来源:origin: davidmoten/rxjava-extras

@Override
  public Observable<Statistics> call(Observable<T> o) {
    return o.scan(Statistics.create(), Functions.collectStats());
  }
};

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
  public Observable<Statistics> call(Observable<T> o) {
    return o.scan(Statistics.create(), Functions.collectStats());
  }
};

代码示例来源:origin: davidmoten/rxjava-extras

@Override
  public Observable<Pair<T, Statistics>> call(Observable<T> source) {
    return source.scan(Pair.create((T) null, Statistics.create()),
        new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
          @Override
          public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
            return Pair.create(t, pair.b().add(function.call(t)));
          }
        }).skip(1);
  }
};

代码示例来源:origin: au.gov.amsa.risky/formats

public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
  return o -> {
    return o.scan((a, b) -> {
      if (!enabled || b.time() > a.time())
        return b;
      else
        return a;
    }).distinctUntilChanged();
  };
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
  public Observable<Pair<T, Statistics>> call(Observable<T> source) {
    return source.scan(Pair.create((T) null, Statistics.create()),
        new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
          @Override
          public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
            return Pair.create(t, pair.b().add(function.call(t)));
          }
        }).skip(1);
  }
};

代码示例来源:origin: nurkiewicz/rxjava-book-examples

Observable<String> speak(String quote, long millisPerChar) {
  String[] tokens = quote.replaceAll("[:,]", "").split(" ");
  Observable<String> words = Observable.from(tokens);
  Observable<Long> absoluteDelay = words
      .map(String::length)
      .map(len -> len * millisPerChar)
      .scan((total, current) -> total + current);
  return words
      .zipWith(absoluteDelay.startWith(0L), Pair::of)
      .flatMap(pair -> just(pair.getLeft())
          .delay(pair.getRight(), MILLISECONDS));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_431() throws Exception {
  Observable<BigInteger> factorials = Observable
      .range(2, 100)
      .scan(BigInteger.ONE, (big, cur) ->
          big.multiply(BigInteger.valueOf(cur)));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_419() throws Exception {
  Observable<Long> progress = transferFile();
  Observable<Long> totalProgress = progress
      .scan((total, chunk) -> total + chunk);
  totalProgress
      .toBlocking()
      .subscribe(System.out::println);
}

相关文章

Observable类方法