本文整理了Java中rx.Observable.scan()
方法的一些代码示例,展示了Observable.scan()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.scan()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:scan
[英]Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable and a seed value, then feeds the result of that function along with the second item emitted by the source Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.
This sort of function is sometimes called an accumulator.
Note that the Observable that results from this method will emit initialValue as its first emitted item. Scheduler: scan does not operate by default on a particular Scheduler.
[中]返回一个Observable,将您选择的函数应用于源Observable发出的第一项和种子值,然后将该函数的结果与源Observable发出的第二项一起输入到同一个函数中,依此类推,直到源Observable发出所有项,发出每个迭代的结果。
这种函数有时被称为累加器。
请注意,此方法产生的可观察项将发出initialValue作为其第一个发出的项。计划程序:默认情况下,扫描不会在特定计划程序上运行。
代码示例来源:origin: PipelineAI/pipeline
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
代码示例来源:origin: bumptech/glide
.scan(new Func2<List<Image>, List<Image>, List<Image>>() {
@Override
public List<Image> call(List<Image> images, List<Image> images2) {
代码示例来源:origin: PipelineAI/pipeline
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: davidmoten/rtree
/**
* Returns the Observable sequence of trees created by progressively adding
* entries.
*
* @param entries
* the entries to add
* @return a sequence of trees
*/
public Observable<RTree<T, S>> add(Observable<Entry<T, S>> entries) {
return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
@Override
public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
return tree.add(entry);
}
});
}
代码示例来源:origin: davidmoten/rtree
/**
* Returns the Observable sequence of trees created by progressively deleting
* entries.
*
* @param entries
* the entries to add
* @param all
* if true delete all matching otherwise just first matching
* @return a sequence of trees
*/
public Observable<RTree<T, S>> delete(Observable<Entry<T, S>> entries, final boolean all) {
return entries.scan(this, new Func2<RTree<T, S>, Entry<T, S>, RTree<T, S>>() {
@Override
public RTree<T, S> call(RTree<T, S> tree, Entry<T, S> entry) {
return tree.delete(entry, all);
}
});
}
代码示例来源:origin: jhusain/learnrxjava
public static void main(String... args) {
Observable.range(0, 10).reduce(new ArrayList<>(), (list, i) -> {
list.add(i);
return list;
}).forEach(System.out::println);
System.out.println("... vs ...");
Observable.range(0, 10).scan(new ArrayList<>(), (list, i) -> {
list.add(i);
return list;
}).forEach(System.out::println);
}
}
代码示例来源:origin: davidmoten/rxjava-extras
public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
final Func1<Double, Double> dfdx, double x0) {
return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
@Override
public Double call(Double xn, Integer n) {
return xn - f.call(xn) / dfdx.call(xn);
}
});
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static Observable<Double> solveWithNewtonsMethod(final Func1<Double, Double> f,
final Func1<Double, Double> dfdx, double x0) {
return Observable.just(1).repeat().scan(x0, new Func2<Double, Integer, Double>() {
@Override
public Double call(Double xn, Integer n) {
return xn - f.call(xn) / dfdx.call(xn);
}
});
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
@SuppressWarnings("unchecked")
@Override
public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
final int value = n.getValue();
if (predicate.call(value, term.getThrowable()).booleanValue())
return Notification.createOnNext(value + 1);
else
return (Notification<Integer>) term;
}
});
}
}
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: davidmoten/rxjava-extras
public static <T> Observable<List<Integer>> permutations(int size) {
List<Integer> indexes = new ArrayList<Integer>(size);
for (int i = 0; i < size; i++) {
indexes.add(i);
}
return Observable.from(Permutations.iterable(indexes)).scan(indexes,
new Func2<List<Integer>, Swap<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> a, Swap<Integer> swap) {
List<Integer> b = new ArrayList<Integer>(a);
b.set(swap.left(), a.get(swap.right()));
b.set(swap.right(), a.get(swap.left()));
return b;
}
});
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Statistics> call(Observable<T> o) {
return o.scan(Statistics.create(), Functions.collectStats());
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Statistics> call(Observable<T> o) {
return o.scan(Statistics.create(), Functions.collectStats());
}
};
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Pair<T, Statistics>> call(Observable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
return Pair.create(t, pair.b().add(function.call(t)));
}
}).skip(1);
}
};
代码示例来源:origin: au.gov.amsa.risky/formats
public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
return o -> {
return o.scan((a, b) -> {
if (!enabled || b.time() > a.time())
return b;
else
return a;
}).distinctUntilChanged();
};
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Pair<T, Statistics>> call(Observable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
return Pair.create(t, pair.b().add(function.call(t)));
}
}).skip(1);
}
};
代码示例来源:origin: nurkiewicz/rxjava-book-examples
Observable<String> speak(String quote, long millisPerChar) {
String[] tokens = quote.replaceAll("[:,]", "").split(" ");
Observable<String> words = Observable.from(tokens);
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
return words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_431() throws Exception {
Observable<BigInteger> factorials = Observable
.range(2, 100)
.scan(BigInteger.ONE, (big, cur) ->
big.multiply(BigInteger.valueOf(cur)));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_419() throws Exception {
Observable<Long> progress = transferFile();
Observable<Long> totalProgress = progress
.scan((total, chunk) -> total + chunk);
totalProgress
.toBlocking()
.subscribe(System.out::println);
}
内容来源于网络,如有侵权,请联系作者删除!