rx.Observable.toMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(153)

本文整理了Java中rx.Observable.toMap()方法的一些代码示例,展示了Observable.toMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.toMap()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:toMap

Observable.toMap介绍

[英]Returns an Observable that emits a single HashMap containing all items emitted by the source Observable, mapped by the keys returned by a specified keySelector function.

If more than one source item maps to the same key, the HashMap will contain the latest of those items. Backpressure Support: This operator does not support backpressure as by intent it is requesting and buffering everything. Scheduler: toMap does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出一个HashMap,其中包含源Observable发出的所有项,由指定KeySelect函数返回的键映射。
如果多个源项映射到同一个键,HashMap将包含这些项中的最新项。背压支持:该操作员无意支持背压,因为它正在请求和缓冲所有内容。调度程序:默认情况下,toMap不会在特定的调度程序上运行。

代码示例

代码示例来源:origin: au.gov.amsa.risky/ihs-reader

public static Observable<Map<String, Map<String, String>>> fromZipAsMapByImo(File file) {
  return fromZip(file).toMap(map -> map.get(Key.LRIMOShipNo.name()));
}

代码示例来源:origin: au.gov.amsa.risky/geotools-extras

public Shapes(String resource) {
  InputStreamReader r = new InputStreamReader(
      Shapes.class.getResourceAsStream(resource));
  shapes = Strings
      .lines(r)
      // ignore comment lines
      .filter(line -> !line.startsWith("#"))
      // ignore blank lines
      .filter(line -> line.trim().length() > 0)
      // split by | character
      .map(line -> line.split("\\|"))
      // log
      .doOnNext(items -> {
        log.info("loading " + items[0]);
      })
      // build map
      .toMap(items -> items[0].trim(),
          items -> shapefileFromZip(items[1].trim(),
              Double.parseDouble(items[2].trim())))
      // go
      .toBlocking().single();
}

代码示例来源:origin: au.gov.amsa.risky/ihs-reader

public static Observable<Map<String, Map<String, String>>> fromZipAsMapByMmsi(File file) {
  return fromZip(file)
      // only ships with an mmsi
      .filter(map -> map.get(Key.MaritimeMobileServiceIdentityMMSINumber.name()) != null)
      // as map
      .toMap(map -> map.get(Key.MaritimeMobileServiceIdentityMMSINumber.name()));
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
  public Observable<Map<String, TaggedBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
    Predicate<DataPoint<? extends Number>> filter = dataPoint -> true;
    for (Entry<String, String> entry : tags.entrySet()) {
      boolean positive = (!entry.getValue().startsWith("!"));
      Pattern pattern = filterPattern(entry.getValue());
      filter = filter.and(dataPoint -> {
        return dataPoint.getTags().containsKey(entry.getKey()) &&
            (positive == pattern.matcher(dataPoint.getTags().get(entry.getKey())).matches());
      });
    }
    return dataPoints
        .filter(filter::test)
        .groupBy(dataPoint -> tags.entrySet().stream().collect(
            toMap(Entry::getKey, e -> dataPoint.getTags().get(e.getKey()))))
        .flatMap(group -> group.collect(() -> new TaggedDataPointCollector(group.getKey(), percentiles),
            TaggedDataPointCollector::increment))
        .map(TaggedDataPointCollector::toBucketPoint)
        .toMap(bucketPoint -> bucketPoint.getTags().entrySet().stream().map(e ->
            e.getKey() + ":" + e.getValue()).collect(joining(",")));
  }
}

代码示例来源:origin: henrymorgen/android-advanced-light

private void toMap() {
  Swordsman s1 = new Swordsman("韦一笑", "A");
  Swordsman s2 = new Swordsman("张三丰", "SS");
  Swordsman s3 = new Swordsman("周芷若", "S");
  Observable.just(s1,s2,s3).toMap(new Func1<Swordsman, String>() {
    @Override
    public String call(Swordsman swordsman) {
      return swordsman.getLevel();
    }
  }).subscribe(new Action1<Map<String, Swordsman>>() {
    @Override
    public void call(Map<String, Swordsman> stringSwordsmanMap) {
      Log.i("wangshu", "toMap:" + stringSwordsmanMap.get("SS").getName());
    }
  });
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
  public Observable<Map<String, TaggedBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
    Predicate<DataPoint<? extends Number>> filter = dataPoint -> true;
    for (Entry<String, String> entry : tags.entrySet()) {
      boolean positive = (!entry.getValue().startsWith("!"));
      Pattern pattern = filterPattern(entry.getValue());
      filter = filter.and(dataPoint -> {
        return dataPoint.getTags().containsKey(entry.getKey()) &&
            (positive == pattern.matcher(dataPoint.getTags().get(entry.getKey())).matches());
      });
    }
    return dataPoints
        .filter(filter::test)
        .groupBy(dataPoint -> tags.entrySet().stream().collect(
            toMap(Entry::getKey, e -> dataPoint.getTags().get(e.getKey()))))
        .flatMap(group -> group.collect(() -> new TaggedDataPointCollector(group.getKey(), percentiles),
            TaggedDataPointCollector::increment))
        .map(TaggedDataPointCollector::toBucketPoint)
        .toMap(bucketPoint -> bucketPoint.getTags().entrySet().stream().map(e ->
            e.getKey() + ":" + e.getValue()).collect(joining(",")));
  }
}

代码示例来源:origin: com.netflix.eureka2/eureka-bridge

.toMap(new Func1<InstanceInfo, String>() {
  @Override
  public String call(InstanceInfo instanceInfo) {

代码示例来源:origin: oembedler/spring-graphql-common

public ExecutionResult doExecute(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, Map<String, List<Field>> fields) {
  List<Observable<Pair<String, Object>>> observablesResult = new ArrayList<>();
  List<Observable<Double>> observablesComplexity = new ArrayList<>();
  for (String fieldName : fields.keySet()) {
    final List<Field> fieldList = fields.get(fieldName);
    ExecutionResult executionResult = resolveField(executionContext, parentType, source, fieldList);
    observablesResult.add(unwrapExecutionResult(fieldName, executionResult));
    observablesComplexity.add(calculateFieldComplexity(executionContext, parentType, fieldList,
        executionResult != null ? ((GraphQLRxExecutionResult) executionResult).getComplexityObservable() : Observable.just(0.0)));
  }
  Observable<Map<String, Object>> result =
      Observable.merge(observablesResult)
          .toMap(Pair::getLeft, Pair::getRight);
  GraphQLRxExecutionResult graphQLRxExecutionResult = new GraphQLRxExecutionResult(result, Observable.just(executionContext.getErrors()), MathObservable.sumDouble(Observable.merge(observablesComplexity)));
  return graphQLRxExecutionResult;
}

代码示例来源:origin: com.netflix.eureka/eureka2-bridge

private void loadUpdatesFromV1Registry() {
  logger.info("Starting new round of replication from v1 to v2");
  registry.forSnapshot(Interests.forFullRegistry(), Source.matcherFor(Source.Origin.LOCAL))
      .filter(new Func1<InstanceInfo, Boolean>() {  // filter self so it's not take into account
        @Override
        public Boolean call(InstanceInfo instanceInfo) {
          return !instanceInfo.getId().equals(self.getId());
        }
      })
      .toMap(new Func1<InstanceInfo, String>() {
        @Override
        public String call(InstanceInfo instanceInfo) {
          return instanceInfo.getId();
        }
      })
      .subscribe(new Subscriber<Map<String, InstanceInfo>>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
          logger.warn("Error generating snapshot of registry", e);
        }
        @Override
        public void onNext(Map<String, InstanceInfo> currentSnapshot) {
          diff(currentSnapshot);
        }
      });
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
  public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
    return dataPoints
        .groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
        .flatMap(group -> group.collect(()
                -> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
            NumericDataPointCollector::increment))
        .map(NumericDataPointCollector::toBucketPoint)
        .toMap(NumericBucketPoint::getStart)
        .map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
  }
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
  public Observable<List<NumericBucketPoint>> call(Observable<DataPoint<? extends Number>> dataPoints) {
    return dataPoints
        .groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
        .flatMap(group -> group.collect(()
                -> new NumericDataPointCollector(buckets, group.getKey(), percentiles),
            NumericDataPointCollector::increment))
        .map(NumericDataPointCollector::toBucketPoint)
        .toMap(NumericBucketPoint::getStart)
        .map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
  }
}

代码示例来源:origin: hawkular/hawkular-metrics

public Observable<Configuration> load(String id) {
  return session.executeAndFetch(findConfigurationGroup.bind(id))
      .toMap(row -> row.getString(0), row -> row.getString(1))
      .map(map -> new Configuration(id, map));
}

代码示例来源:origin: oembedler/spring-graphql-common

.toMap(Pair::getLeft, Pair::getRight);

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
    long start, long end, Buckets buckets) {
  checkArgument(isValidTimeRange(start, end), "Invalid time range");
  return this.findDataPoints(metricId, start, end, 0, ASC)
      .groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
      .flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
          AvailabilityDataPointCollector::increment))
      .map(AvailabilityDataPointCollector::toBucketPoint)
      .toMap(AvailabilityBucketPoint::getStart)
      .map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}

代码示例来源:origin: nfl/graphql-rxjava

@Override
public ExecutionResult execute(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, Map<String, List<Field>> fields) {
  List<Observable<Pair<String, ?>>> observables = new ArrayList<>();
  for (String fieldName : fields.keySet()) {
    final List<Field> fieldList = fields.get(fieldName);
    ExecutionResult executionResult = resolveField(executionContext, parentType, source, fieldList);
    if (executionResult instanceof RxExecutionResult) {
      RxExecutionResult rxResult = (RxExecutionResult)executionResult;
      Observable<?> unwrapped = rxResult.getDataObservable().flatMap(potentialResult -> {
        if (potentialResult instanceof RxExecutionResult) {
          return ((RxExecutionResult) potentialResult).getDataObservable();
        }
        if (potentialResult instanceof ExecutionResult) {
          return Observable.just(((ExecutionResult) potentialResult).getData());
        }
        return Observable.just(potentialResult);
      });
      observables.add(Observable.zip(Observable.just(fieldName), unwrapped, Pair::of));
    } else {
      observables.add(Observable.just(Pair.of(fieldName, executionResult != null ? executionResult.getData() : null)));
    }
  }
  Observable<Map<String, Object>> result =
      Observable.merge(observables)
          .toMap(Pair::getLeft, Pair::getRight);
  return new RxExecutionResult(result, Observable.just(executionContext.getErrors()));
}

代码示例来源:origin: hawkular/hawkular-metrics

public Observable<Configuration> load(String id, Scheduler scheduler) {
  return session.executeAndFetch(updateConfigurationValue.bind(id), scheduler)
      .toMap(row -> row.getString(0), row -> row.getString(1))
      .map(map -> new Configuration(id, map));
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId,
    long start, long end, Buckets buckets) {
  checkArgument(isValidTimeRange(start, end), "Invalid time range");
  return this.findDataPoints(metricId, start, end, 0, ASC)
      .groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp()))
      .flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, group.getKey()),
          AvailabilityDataPointCollector::increment))
      .map(AvailabilityDataPointCollector::toBucketPoint)
      .toMap(AvailabilityBucketPoint::getStart)
      .map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

SumNumericBucketPointCollector::increment))
.map(SumNumericBucketPointCollector::toBucketPoint)
.toMap(NumericBucketPoint::getStart)
.map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));

代码示例来源:origin: nmoskalenko/RxFirebase

RxFirebaseDatabase.observeSingleValueEvent(mockDatabase)
    .subscribeOn(Schedulers.immediate())
    .toMap(new Func1<DataSnapshot, String>() {
      @Override
      public String call(DataSnapshot dataSnapshot) {

代码示例来源:origin: com.couchbase.client/java-client

.singleOrDefault(null),
asqr.facets()
    .toMap(new Func1<FacetResult, String>() {
      @Override
      public String call(FacetResult facetResult) {

相关文章

Observable类方法