本文整理了Java中rx.Observable.groupBy()
方法的一些代码示例,展示了Observable.groupBy()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.groupBy()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:groupBy
[英]Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like #take (0) to them. Backpressure Support: This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable" and blocking any one group would block the entire parent stream. If you need backpressure on individual groups then you should use operators such as #onBackpressureDropor #onBackpressureBuffer. Scheduler: groupBy does not operate by default on a particular Scheduler.
[中]根据指定的标准对可观察对象发出的项进行分组,并将这些分组的项作为GroupedObservable发出,每组一个GroupedObservable。
*注:*GroupedObservable将缓存它要发送的项目,直到订阅为止。因此,为了避免内存泄漏,您不应该简单地忽略那些与您无关的GroupedObservable。相反,您可以向他们发出信号,通过对其应用#take(0)之类的运算符,他们可能会丢弃缓冲区。背压支持:该操作符不支持背压,因为拆分流会有效地将其转化为“热可观察”,而阻塞任何一个组都会阻塞整个父流。如果需要对单个组施加反压力,则应使用#onBackpressureDropor#onBackpressureBuffer等运算符。调度器:默认情况下,groupBy不会在特定的调度器上运行。
代码示例来源:origin: jhusain/learnrxjava
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.toList();
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.filter(i -> i <= 20).toList();
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(20).toList();
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.takeWhile(i -> i < 30).toList();
.groupBy(n -> n)
.flatMap(g -> {
return g.take(3).reduce((s, s2) -> s + s2);
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
.groupBy(n -> n % 2 == 0)
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<GroupedObservable<K, R>> call(Observable<T> o) {
return o.groupBy(keySelector, elementSelector, evictingMapFactory);
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<GroupedObservable<K, R>> call(Observable<T> o) {
return o.groupBy(keySelector, elementSelector, evictingMapFactory);
}
};
代码示例来源:origin: davidmoten/rxjava-file
private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events,
final long sampleTimeMs) {
return events
// group by true if is modify or overflow, false otherwise
.groupBy(IS_MODIFY_OR_OVERFLOW)
// only sample if is modify or overflow
.flatMap(sampleIfTrue(sampleTimeMs));
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) throws Exception {
Observable<Integer> source = range(1, 10000);
source
.doOnRequest(i -> System.out.println("Requested " + i))
.groupBy(v -> v % 5)
.flatMap(g -> g.observeOn(Schedulers.io()).map(GroupByTest2::calculation), 4)
.subscribe(i -> System.out.println("Got " + i));
Thread.sleep(100000);
}
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).groupBy(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return (integer % 2 == 0) ? "even" : "odd";
}
}).subscribe(new Action1<GroupedObservable<String, Integer>>() {
@Override
public void call(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) {
log("group ok:" + stringIntegerGroupedObservable.getKey());
stringIntegerGroupedObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer + " of group " + stringIntegerGroupedObservable.getKey());
}
});
}
});
}
});
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<Map<String, TaggedBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
Predicate<DataPoint<? extends Number>> filter = dataPoint -> true;
for (Entry<String, String> entry : tags.entrySet()) {
boolean positive = (!entry.getValue().startsWith("!"));
Pattern pattern = filterPattern(entry.getValue());
filter = filter.and(dataPoint -> {
return dataPoint.getTags().containsKey(entry.getKey()) &&
(positive == pattern.matcher(dataPoint.getTags().get(entry.getKey())).matches());
});
}
return dataPoints
.filter(filter::test)
.groupBy(dataPoint -> tags.entrySet().stream().collect(
toMap(Entry::getKey, e -> dataPoint.getTags().get(e.getKey()))))
.flatMap(group -> group.collect(() -> new TaggedDataPointCollector(group.getKey(), percentiles),
TaggedDataPointCollector::increment))
.map(TaggedDataPointCollector::toBucketPoint)
.toMap(bucketPoint -> bucketPoint.getTags().entrySet().stream().map(e ->
e.getKey() + ":" + e.getValue()).collect(joining(",")));
}
}
代码示例来源:origin: au.gov.amsa.risky/formats
public static Observable<List<Fix>> writeFixes(final Func1<Fix, String> fileMapper,
Observable<Fix> fixes, int bufferSize, boolean zip, BinaryFixesFormat format) {
return fixes
// group by filename
.groupBy(fileMapper)
// buffer fixes by filename
.flatMap(buffer(bufferSize))
// write each list to a file
.doOnNext(writeFixList(fileMapper, zip, format));
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
return dataPoints
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(()
-> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
NumericDataPointCollector::increment))
.map(NumericDataPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
}
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
return dataPoints
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(()
-> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
NumericDataPointCollector::increment))
.map(NumericDataPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
}
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
long start, long end, Buckets buckets) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
return this.findDataPoints(metricId, start, end, 0, ASC)
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
AvailabilityDataPointCollector::increment))
.map(AvailabilityDataPointCollector::toBucketPoint)
.toMap(AvailabilityBucketPoint::getStart)
.map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
long start, long end, Buckets buckets) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
return this.findDataPoints(metricId, start, end, 0, ASC)
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
AvailabilityDataPointCollector::increment))
.map(AvailabilityDataPointCollector::toBucketPoint)
.toMap(AvailabilityBucketPoint::getStart)
.map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
Observable<String> names = Observable.just(
"John", "Steve", "Ruth",
"Sam", "Jane", "James");
names.groupBy(s -> s.charAt(0))
.flatMap(grp -> grp.publish(o -> o.first().concatWith(o.ignoreElements())))
.subscribe(s -> System.out.println(s));
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
System.setProperty("rx.ring-buffer.size", "16");
List<AppInfo> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 3; j++) {
AppInfo ai = new AppInfo();
ai.name = i + " - " + j;
ai.date = LocalDate.of(2016, 3, i + 1);
list.add(ai);
}
}
Observable<GroupedObservable<String, AppInfo>> o = Observable.from(list)
.groupBy(v -> v.date.format(DateTimeFormatter.ofPattern("MM/yyyy")));
Observable.concat(o)
.subscribe(System.out::println);
}
}
代码示例来源:origin: hawkular/hawkular-metrics
private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() {
return tObservable -> tObservable
.groupBy(b -> {
ByteBuffer routingKey = b.getRoutingKey(ProtocolVersion.NEWEST_SUPPORTED,
codecRegistry);
Token token = metadata.newToken(routingKey);
for (TokenRange tokenRange : session.getCluster().getMetadata().getTokenRanges()) {
if (tokenRange.contains(token)) {
return tokenRange;
}
}
log.warn("Unable to find any Cassandra node to insert token " + token.toString());
return session.getCluster().getMetadata().getTokenRanges().iterator().next();
})
.flatMap(g -> g.compose(new BoundBatchStatementTransformer()))
.flatMap(batch -> rxSession
.execute(batch)
.compose(applyInsertRetryPolicy())
.map(resultSet -> batch.size())
);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_157() throws Exception {
Observable<BigDecimal> totalPrice = Observable
.just("bread", "butter", "egg", "milk", "tomato",
"cheese", "tomato", "egg", "egg")
.groupBy(prod -> prod)
.flatMap(grouped -> grouped
.count()
.map(quantity -> {
String productName = grouped.getKey();
return Pair.of(productName, quantity);
}))
.flatMap(order -> rxGroceries
.purchase(order.getKey(), order.getValue())
.subscribeOn(schedulerA))
.reduce(BigDecimal::add)
.single();
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public void exitObject(ObjectContext ctx) {
if (ctx.logical_operator() != null) {
Observable<MetricId<T>> leftObservable = popObservable(ctx.object(0).getText().hashCode());
Observable<MetricId<T>> rightObservable = popObservable(ctx.object(1).getText().hashCode());
observables.remove(ctx.object(0).getText().hashCode());
observables.remove(ctx.object(1).getText().hashCode());
Observable<MetricId<T>> result = leftObservable.concatWith(rightObservable);
if (ctx.logical_operator().AND() != null) {
//group by metric and then use one element from the groups with two elements
//if a group has two elements it is in both sets, hence AND
result = result
.groupBy(m -> m)
.flatMap(s -> s.skip(1).take(1));
} else if (ctx.logical_operator().OR() != null) {
result = result.distinct();
}
pushObservable(ctx.getText().hashCode(), result);
} else {
if (ctx.object(0) != null && ctx.object(0).getText().hashCode() != ctx.getText().hashCode()) {
Observable<MetricId<T>> expressionObservable = popObservable(ctx.object(0).getText().hashCode());
observables.remove(ctx.object(0).getText().hashCode());
pushObservable(ctx.getText().hashCode(), expressionObservable);
}
}
};
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_52() throws Exception {
FactStore factStore = new CassandraFactStore();
Observable<ReservationEvent> facts = factStore.observe();
Observable<GroupedObservable<UUID, ReservationEvent>> grouped =
facts.groupBy(ReservationEvent::getReservationUuid);
grouped.subscribe(byUuid -> {
byUuid.subscribe(this::updateProjection);
});
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test1() {
rx.Observable.range(1, 20)
.groupBy(v -> v % 3)
.concatMap(g -> g.toList())
.toList()
.subscribe(System.out::println);
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void so() throws Exception {
rx.Observable
.defer(() -> rx.Observable.from(longTimeFunc()))
.doOnNext(strings -> sout("RX_TAG_FIRST", Arrays.toString(strings)))
.groupBy(rows -> rows[0])
.doOnNext(groupedObservable -> sout("RX_TAG_TWO", groupedObservable.getKey()))
.concatMap(group -> group.collect(Shop::new, (shop, rows) -> {
shop.id = Integer.parseInt(rows[0]);
shop.name = rows[1];
shop.coordinates.add(new Coordinate(rows[2], rows[3]));
}))
.doOnNext(shop -> sout("RX_TAG_THREE", String.valueOf(shop)))
.toList()
.doOnNext(shops -> sout("RX_TAG_FOUR", String.valueOf(shops)))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(shops -> sout("RX_TAG_SUBSCRIBE", String.valueOf(shops)));
Thread.sleep(1000);
}
}
内容来源于网络,如有侵权,请联系作者删除!