spark group-by具有自定义逻辑性能

0kjbasz6  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(209)

我在hdfs上有Parquet文件,其中包含以下形式的记录:

unique_id_1 | state_1| prop1| prop2 | prop3
unique_id_1 | state_2| prop4| prop5 | prop6
unique_id_1 | state_3| prop8| prop8 | prop9 
unique_id_2 | state_1| prop1| prop2 | prop3
unique_id_2 | state_2| prop4| prop5 | prop6
unique_id_2 | state_3| prop7| prop8 | prop9

每个唯一的\u id \u x记录正好出现3次,每个唯一键有3个单独的状态,以及不同的其他属性。
我需要做的是将所有记录按唯一的\u id分组,并合并每个组中的属性-创建一个全新的记录,该记录将写入输出Parquet文件。
我已经研究了spark的rdd groupby,然后是mapvalues——这给了我一个具有3种不同状态的3行的iterable——我可以很好地构造我的新行。
但是我看到很多反对使用rdd.groupby().mapvalues()方法的建议,因为性能原因-它需要洗牌大量数据(这3条记录需要在同一个reducer中结束),并且它不使用分区本地的组合器来减少数据。
这里列出的其他方法建议使用aggregate或groupbykey—但是如果您只有一个简单的sum()或count()聚合函数,而不是像我需要的一些自定义逻辑,这似乎是可行的。
有没有比现在的groupby更好的方法来实现我的最终目标?
编辑:我看到了关于groupby的这个答案:“groupbykey很适合我们需要每个键的值的“小”集合的情况,如问题中所示。”考虑到我的情况,每个键总是有3个值-这种方法最适合这种情况吗?
编辑2:附加代码:

SQLContext sqlContext = SQLContext.getOrCreate(sparkContext.sc());

    JavaRDD<MyBean> rdd = sqlContext
            .read()
            .parquet(inputLocation)
            .toJavaRDD()
            .groupBy((Function<Row, String>) v1 -> v1.getAs("record_id"))
            .mapValues((Function<Iterable<Row>, MyBean>) records -> {
            //getting the 3 states from the interator and merging them
                Supplier<Stream<Row>> rowSupplier = () -> StreamSupport.stream(records.spliterator(), false);
                Optional<Row> state1Row = rowSupplier.get().filter(row -> row.getAs("state").equals("state_1")).findFirst();
                Optional<Row> state2Row = rowSupplier.get().filter(row -> row.getAs("state").equals("state_2")).findFirst();
                Optional<Row> state3Row = rowSupplier.get().filter(row -> row.getAs("state").equals("state_3")).findFirst();

                return merge(state1Row, state2Row, state3Row); //contains the merge logic - returns an instance of MyBean

            })
            .map((Function<Tuple2<String, MyBean>, MyBean>) Tuple2::_2); //not interested in the record_id anymore
     sqlContext.createDataFrame(rdd, MyBean.class).write().parquet(outputLocation);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题