reduce-on-pojo字段与apache-flink一起使用java

jum4pzuy  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(404)

目前,我正在为一些分布式处理工具构建一个基准测试工具,在使用apache flink时遇到了一些问题。
设置很简单:logpojo是一个简单的pojo,有三个字段(长日期、双值、字符串数据)。在一个列表中,我正在寻找一个具有最小“value”字段的logpojo。基本相当于:

  1. pojoList.stream().min(new LogPojo.Comp()).get().getValue();

我的flink设置如下所示:

  1. public double processLogs(List<LogPojo> logs) {
  2. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. DataSet<LogPojo> logSet = env.fromCollection(logs);
  4. double result = 0.0;
  5. try {
  6. ReduceOperator ro = logSet.reduce(new LogReducer());
  7. List<LogPojo> c = ro.collect();
  8. result = c.get(0).getValue();
  9. } catch (Exception ex) {
  10. System.out.println("Exception caught" + ex);
  11. }
  12. return result;
  13. }
  14. public class LogReducer implements ReduceFunction<LogPojo> {
  15. @Override
  16. public LogPojo reduce(LogPojo o1, LogPojo o2) {
  17. return (o1.getValue() < o2.getValue()) ? o1 : o2;
  18. }
  19. }

它停止于:

  1. Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;

所以它似乎无法应用reduce函数。我就是找不到,为什么。有什么提示吗?

mbskvtky

mbskvtky1#

首先你应该检查一下你的进口货。您从scala类中得到一个异常,但是您的程序是用java实现的。您可能意外地导入了scala数据集api。使用javaapi不应导致scala异常(除非您使用的是依赖于scala的类)。
不管怎样,flink有一个用于 min , max 等等。

  1. DataSet<LogPojo> logSet = env.fromCollection(logs);
  2. // map LogPojo to a Tuple1<Double>
  3. // (Flink's built-in aggregation functions work only on Tuple types)
  4. DataSet<Tuple1<Double>> values = logSet.map(new MapFunction<LogPojo, Tuple1<Double>>() {
  5. @Override
  6. public Tuple1<Double> map(LogPojo l) throws Exception {
  7. return new Tuple1<>(l.value);
  8. }
  9. });
  10. // fetch the min value (at position 0 in the Tuple)
  11. List<Tuple1<Double>> c = values.min(0).collect();
  12. // get the first field of the Tuple
  13. Double minVal = c.get(0).f0;

相关问题