rx.Observable.scan()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(216)

本文整理了Java中rx.Observable.scan()方法的一些代码示例,展示了Observable.scan()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.scan()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:scan

Observable.scan介绍

[英]Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable and a seed value, then feeds the result of that function along with the second item emitted by the source Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.

This sort of function is sometimes called an accumulator.

Note that the Observable that results from this method will emit initialValue as its first emitted item. Scheduler: scan does not operate by default on a particular Scheduler.
[中]返回一个Observable,将您选择的函数应用于源Observable发出的第一项和种子值,然后将该函数的结果与源Observable发出的第二项一起输入到同一个函数中,依此类推,直到源Observable发出所有项,发出每个迭代的结果。
这种函数有时被称为累加器。
请注意,此方法产生的可观察项将发出initialValue作为其第一个发出的项。计划程序:默认情况下,扫描不会在特定计划程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Output> call(Observable<Bucket> window) {
  3. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  4. }
  5. };

代码示例来源:origin: bumptech/glide

  1. .scan(new Func2<List<Image>, List<Image>, List<Image>>() {
  2. @Override
  3. public List<Image> call(List<Image> images, List<Image> images2) {

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: davidmoten/rtree

  1. /**
  2. * Returns the Observable sequence of trees created by progressively adding
  3. * entries.
  4. *
  5. * @param entries
  6. * the entries to add
  7. * @return a sequence of trees
  8. */
  9. public Observable<RTree<T, S>> add(Observable<Entry<T, S>> entries) {
  10. return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
  11. @Override
  12. public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
  13. return tree.add(entry);
  14. }
  15. });
  16. }

代码示例来源:origin: davidmoten/rtree

  1. /**
  2. * Returns the Observable sequence of trees created by progressively deleting
  3. * entries.
  4. *
  5. * @param entries
  6. * the entries to add
  7. * @param all
  8. * if true delete all matching otherwise just first matching
  9. * @return a sequence of trees
  10. */
  11. public Observable<RTree<T, S>> delete(Observable<Entry<T, S>> entries, final boolean all) {
  12. return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
  13. @Override
  14. public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
  15. return tree.delete(entry, all);
  16. }
  17. });
  18. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String... args) {
  2. Observable.range(0, 10).reduce(new ArrayList<>(), (list, i) -> {
  3. list.add(i);
  4. return list;
  5. }).forEach(System.out::println);
  6. System.out.println("... vs ...");
  7. Observable.range(0, 10).scan(new ArrayList<>(), (list, i) -> {
  8. list.add(i);
  9. return list;
  10. }).forEach(System.out::println);
  11. }
  12. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
  2. final Func1<Double, Double> dfdx, double x0) {
  3. return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
  4. @Override
  5. public Double call(Double xn, Integer n) {
  6. return xn - f.call(xn) / dfdx.call(xn);
  7. }
  8. });
  9. }

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
  2. final Func1<Double, Double> dfdx, double x0) {
  3. return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
  4. @Override
  5. public Double call(Double xn, Integer n) {
  6. return xn - f.call(xn) / dfdx.call(xn);
  7. }
  8. });
  9. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. @Override
  2. public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
  3. return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
  4. @SuppressWarnings("unchecked")
  5. @Override
  6. public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
  7. final int value = n.getValue();
  8. if (predicate.call(value, term.getThrowable()).booleanValue())
  9. return Notification.createOnNext(value + 1);
  10. else
  11. return (Notification<Integer>) term;
  12. }
  13. });
  14. }
  15. }

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
  4. @Override
  5. public Integer call(Integer integer, Integer integer2) {
  6. return integer + integer2;
  7. }
  8. }).subscribe(new Action1<Integer>() {
  9. @Override
  10. public void call(Integer integer) {
  11. log(integer);
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: davidmoten/rxjava-extras

  1. public static <T> Observable<List<Integer>> permutations(int size) {
  2. List<Integer> indexes = new ArrayList<Integer>(size);
  3. for (int i = 0; i < size; i++) {
  4. indexes.add(i);
  5. }
  6. return Observable.from(Permutations.iterable(indexes)).scan(indexes,
  7. new Func2<List<Integer>, Swap<Integer>, List<Integer>>() {
  8. @Override
  9. public List<Integer> call(List<Integer> a, Swap<Integer> swap) {
  10. List<Integer> b = new ArrayList<Integer>(a);
  11. b.set(swap.left(), a.get(swap.right()));
  12. b.set(swap.right(), a.get(swap.left()));
  13. return b;
  14. }
  15. });
  16. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Output> call(Observable<Bucket> window) {
  3. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  4. }
  5. };

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Statistics> call(Observable<T> o) {
  3. return o.scan(Statistics.create(), Functions.collectStats());
  4. }
  5. };

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Statistics> call(Observable<T> o) {
  3. return o.scan(Statistics.create(), Functions.collectStats());
  4. }
  5. };

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Pair<T, Statistics>> call(Observable<T> source) {
  3. return source.scan(Pair.create((T) null, Statistics.create()),
  4. new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
  5. @Override
  6. public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
  7. return Pair.create(t, pair.b().add(function.call(t)));
  8. }
  9. }).skip(1);
  10. }
  11. };

代码示例来源:origin: au.gov.amsa.risky/formats

  1. public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
  2. return o -> {
  3. return o.scan((a, b) -> {
  4. if (!enabled || b.time() > a.time())
  5. return b;
  6. else
  7. return a;
  8. }).distinctUntilChanged();
  9. };
  10. }

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Pair<T, Statistics>> call(Observable<T> source) {
  3. return source.scan(Pair.create((T) null, Statistics.create()),
  4. new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
  5. @Override
  6. public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
  7. return Pair.create(t, pair.b().add(function.call(t)));
  8. }
  9. }).skip(1);
  10. }
  11. };

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. Observable<String> speak(String quote, long millisPerChar) {
  2. String[] tokens = quote.replaceAll("[:,]", "").split(" ");
  3. Observable<String> words = Observable.from(tokens);
  4. Observable<Long> absoluteDelay = words
  5. .map(String::length)
  6. .map(len -> len * millisPerChar)
  7. .scan((total, current) -> total + current);
  8. return words
  9. .zipWith(absoluteDelay.startWith(0L), Pair::of)
  10. .flatMap(pair -> just(pair.getLeft())
  11. .delay(pair.getRight(), MILLISECONDS));
  12. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_431() throws Exception {
  3. Observable<BigInteger> factorials = Observable
  4. .range(2, 100)
  5. .scan(BigInteger.ONE, (big, cur) ->
  6. big.multiply(BigInteger.valueOf(cur)));
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_419() throws Exception {
  3. Observable<Long> progress = transferFile();
  4. Observable<Long> totalProgress = progress
  5. .scan((total, chunk) -> total + chunk);
  6. totalProgress
  7. .toBlocking()
  8. .subscribe(System.out::println);
  9. }

相关文章

Observable类方法