本文整理了Java中rx.Observable.toMap()
方法的一些代码示例,展示了Observable.toMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.toMap()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:toMap
[英]Returns an Observable that emits a single HashMap containing all items emitted by the source Observable, mapped by the keys returned by a specified keySelector function.
If more than one source item maps to the same key, the HashMap will contain the latest of those items. Backpressure Support: This operator does not support backpressure as by intent it is requesting and buffering everything. Scheduler: toMap does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出一个HashMap,其中包含源Observable发出的所有项,由指定KeySelect函数返回的键映射。
如果多个源项映射到同一个键,HashMap将包含这些项中的最新项。背压支持:该操作员无意支持背压,因为它正在请求和缓冲所有内容。调度程序:默认情况下,toMap不会在特定的调度程序上运行。
代码示例来源:origin: au.gov.amsa.risky/ihs-reader
public static Observable<Map<String, Map<String, String>>> fromZipAsMapByImo(File file) {
return fromZip(file).toMap(map -> map.get(Key.LRIMOShipNo.name()));
}
代码示例来源:origin: au.gov.amsa.risky/geotools-extras
public Shapes(String resource) {
InputStreamReader r = new InputStreamReader(
Shapes.class.getResourceAsStream(resource));
shapes = Strings
.lines(r)
// ignore comment lines
.filter(line -> !line.startsWith("#"))
// ignore blank lines
.filter(line -> line.trim().length() > 0)
// split by | character
.map(line -> line.split("\\|"))
// log
.doOnNext(items -> {
log.info("loading " + items[0]);
})
// build map
.toMap(items -> items[0].trim(),
items -> shapefileFromZip(items[1].trim(),
Double.parseDouble(items[2].trim())))
// go
.toBlocking().single();
}
代码示例来源:origin: au.gov.amsa.risky/ihs-reader
public static Observable<Map<String, Map<String, String>>> fromZipAsMapByMmsi(File file) {
return fromZip(file)
// only ships with an mmsi
.filter(map -> map.get(Key.MaritimeMobileServiceIdentityMMSINumber.name()) != null)
// as map
.toMap(map -> map.get(Key.MaritimeMobileServiceIdentityMMSINumber.name()));
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<Map<String, TaggedBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
Predicate<DataPoint<? extends Number>> filter = dataPoint -> true;
for (Entry<String, String> entry : tags.entrySet()) {
boolean positive = (!entry.getValue().startsWith("!"));
Pattern pattern = filterPattern(entry.getValue());
filter = filter.and(dataPoint -> {
return dataPoint.getTags().containsKey(entry.getKey()) &&
(positive == pattern.matcher(dataPoint.getTags().get(entry.getKey())).matches());
});
}
return dataPoints
.filter(filter::test)
.groupBy(dataPoint -> tags.entrySet().stream().collect(
toMap(Entry::getKey, e -> dataPoint.getTags().get(e.getKey()))))
.flatMap(group -> group.collect(() -> new TaggedDataPointCollector(group.getKey(), percentiles),
TaggedDataPointCollector::increment))
.map(TaggedDataPointCollector::toBucketPoint)
.toMap(bucketPoint -> bucketPoint.getTags().entrySet().stream().map(e ->
e.getKey() + ":" + e.getValue()).collect(joining(",")));
}
}
代码示例来源:origin: henrymorgen/android-advanced-light
private void toMap() {
Swordsman s1 = new Swordsman("韦一笑", "A");
Swordsman s2 = new Swordsman("张三丰", "SS");
Swordsman s3 = new Swordsman("周芷若", "S");
Observable.just(s1,s2,s3).toMap(new Func1<Swordsman, String>() {
@Override
public String call(Swordsman swordsman) {
return swordsman.getLevel();
}
}).subscribe(new Action1<Map<String, Swordsman>>() {
@Override
public void call(Map<String, Swordsman> stringSwordsmanMap) {
Log.i("wangshu", "toMap:" + stringSwordsmanMap.get("SS").getName());
}
});
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<Map<String, TaggedBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
Predicate<DataPoint<? extends Number>> filter = dataPoint -> true;
for (Entry<String, String> entry : tags.entrySet()) {
boolean positive = (!entry.getValue().startsWith("!"));
Pattern pattern = filterPattern(entry.getValue());
filter = filter.and(dataPoint -> {
return dataPoint.getTags().containsKey(entry.getKey()) &&
(positive == pattern.matcher(dataPoint.getTags().get(entry.getKey())).matches());
});
}
return dataPoints
.filter(filter::test)
.groupBy(dataPoint -> tags.entrySet().stream().collect(
toMap(Entry::getKey, e -> dataPoint.getTags().get(e.getKey()))))
.flatMap(group -> group.collect(() -> new TaggedDataPointCollector(group.getKey(), percentiles),
TaggedDataPointCollector::increment))
.map(TaggedDataPointCollector::toBucketPoint)
.toMap(bucketPoint -> bucketPoint.getTags().entrySet().stream().map(e ->
e.getKey() + ":" + e.getValue()).collect(joining(",")));
}
}
代码示例来源:origin: com.netflix.eureka2/eureka-bridge
.toMap(new Func1<InstanceInfo, String>() {
@Override
public String call(InstanceInfo instanceInfo) {
代码示例来源:origin: oembedler/spring-graphql-common
public ExecutionResult doExecute(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, Map<String, List<Field>> fields) {
List<Observable<Pair<String, Object>>> observablesResult = new ArrayList<>();
List<Observable<Double>> observablesComplexity = new ArrayList<>();
for (String fieldName : fields.keySet()) {
final List<Field> fieldList = fields.get(fieldName);
ExecutionResult executionResult = resolveField(executionContext, parentType, source, fieldList);
observablesResult.add(unwrapExecutionResult(fieldName, executionResult));
observablesComplexity.add(calculateFieldComplexity(executionContext, parentType, fieldList,
executionResult != null ? ((GraphQLRxExecutionResult) executionResult).getComplexityObservable() : Observable.just(0.0)));
}
Observable<Map<String, Object>> result =
Observable.merge(observablesResult)
.toMap(Pair::getLeft, Pair::getRight);
GraphQLRxExecutionResult graphQLRxExecutionResult = new GraphQLRxExecutionResult(result, Observable.just(executionContext.getErrors()), MathObservable.sumDouble(Observable.merge(observablesComplexity)));
return graphQLRxExecutionResult;
}
代码示例来源:origin: com.netflix.eureka/eureka2-bridge
private void loadUpdatesFromV1Registry() {
logger.info("Starting new round of replication from v1 to v2");
registry.forSnapshot(Interests.forFullRegistry(), Source.matcherFor(Source.Origin.LOCAL))
.filter(new Func1<InstanceInfo, Boolean>() { // filter self so it's not take into account
@Override
public Boolean call(InstanceInfo instanceInfo) {
return !instanceInfo.getId().equals(self.getId());
}
})
.toMap(new Func1<InstanceInfo, String>() {
@Override
public String call(InstanceInfo instanceInfo) {
return instanceInfo.getId();
}
})
.subscribe(new Subscriber<Map<String, InstanceInfo>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
logger.warn("Error generating snapshot of registry", e);
}
@Override
public void onNext(Map<String, InstanceInfo> currentSnapshot) {
diff(currentSnapshot);
}
});
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
return dataPoints
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(()
-> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
NumericDataPointCollector::increment))
.map(NumericDataPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
}
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
return dataPoints
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(()
-> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
NumericDataPointCollector::increment))
.map(NumericDataPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
}
}
代码示例来源:origin: hawkular/hawkular-metrics
public Observable<Configuration> load(String id) {
return session.executeAndFetch(findConfigurationGroup.bind(id))
.toMap(row -> row.getString(0), row -> row.getString(1))
.map(map -> new Configuration(id, map));
}
代码示例来源:origin: oembedler/spring-graphql-common
.toMap(Pair::getLeft, Pair::getRight);
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
long start, long end, Buckets buckets) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
return this.findDataPoints(metricId, start, end, 0, ASC)
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
AvailabilityDataPointCollector::increment))
.map(AvailabilityDataPointCollector::toBucketPoint)
.toMap(AvailabilityBucketPoint::getStart)
.map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}
代码示例来源:origin: nfl/graphql-rxjava
@Override
public ExecutionResult execute(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, Map<String, List<Field>> fields) {
List<Observable<Pair<String, ?>>> observables = new ArrayList<>();
for (String fieldName : fields.keySet()) {
final List<Field> fieldList = fields.get(fieldName);
ExecutionResult executionResult = resolveField(executionContext, parentType, source, fieldList);
if (executionResult instanceof RxExecutionResult) {
RxExecutionResult rxResult = (RxExecutionResult)executionResult;
Observable<?> unwrapped = rxResult.getDataObservable().flatMap(potentialResult -> {
if (potentialResult instanceof RxExecutionResult) {
return ((RxExecutionResult) potentialResult).getDataObservable();
}
if (potentialResult instanceof ExecutionResult) {
return Observable.just(((ExecutionResult) potentialResult).getData());
}
return Observable.just(potentialResult);
});
observables.add(Observable.zip(Observable.just(fieldName), unwrapped, Pair::of));
} else {
observables.add(Observable.just(Pair.of(fieldName, executionResult != null ? executionResult.getData() : null)));
}
}
Observable<Map<String, Object>> result =
Observable.merge(observables)
.toMap(Pair::getLeft, Pair::getRight);
return new RxExecutionResult(result, Observable.just(executionContext.getErrors()));
}
代码示例来源:origin: hawkular/hawkular-metrics
public Observable<Configuration> load(String id, Scheduler scheduler) {
return session.executeAndFetch(updateConfigurationValue.bind(id), scheduler)
.toMap(row -> row.getString(0), row -> row.getString(1))
.map(map -> new Configuration(id, map));
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
long start, long end, Buckets buckets) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
return this.findDataPoints(metricId, start, end, 0, ASC)
.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
.flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
AvailabilityDataPointCollector::increment))
.map(AvailabilityDataPointCollector::toBucketPoint)
.toMap(AvailabilityBucketPoint::getStart)
.map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
SumNumericBucketPointCollector::increment))
.map(SumNumericBucketPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
代码示例来源:origin: nmoskalenko/RxFirebase
RxFirebaseDatabase.observeSingleValueEvent(mockDatabase)
.subscribeOn(Schedulers.immediate())
.toMap(new Func1<DataSnapshot, String>() {
@Override
public String call(DataSnapshot dataSnapshot) {
代码示例来源:origin: com.couchbase.client/java-client
.singleOrDefault(null),
asqr.facets()
.toMap(new Func1<FacetResult, String>() {
@Override
public String call(FacetResult facetResult) {
内容来源于网络,如有侵权,请联系作者删除!