spark:如何合并转换

2wnc66cl  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(419)

我有1000个json文件,我需要对每个文件进行一些转换,然后创建一个合并的输出文件,它可以对值进行重叠操作(例如,它不应该有重复的值)
所以,如果我把文件读成 wholeTextFiles ,作为 title,content 配对,然后在 map 函数,我解析 content 作为 json tree 并执行转换,在何处以及如何合并输出?
我是否需要对结果rdd进行另一个转换来合并这些值,这将如何工作?我是否可以在所有Map块中拥有一个共享对象(列表、Map或rdd(?),这些对象将作为转换的一部分进行更新,以便在那里检查重复的值?
p、 s:即使输出创建了零件文件,我仍然希望没有重复。
代码:

//read the files as JavaPairRDD , which gives <filename, content> pairs
String filename = "/sample_jsons";
JavaPairRDD<String,String> distFile = sc.wholeTextFiles(filename);

//then create a JavaRDD from the content.
JavaRDD<String> jsonContent = distFile.map(x -> x._2);

//apply transformations, the map function will return an ArrayList which would
//have property names.

JavaRDD<ArrayList<String>> apm = jsonContent.map(
                new Function< String, ArrayList<String> >() {
                            @Override
                            public ArrayList<String> call(String arg0) throws Exception {

                                JsonNode rootNode = mapper.readTree(arg0);
                                return parseJsonAndFindKey(rootNode, "type", "rootParent");
                            }
                });

所以,这样我就可以在一个 ArrayList ,从每个json文件。
现在我需要期末考试 ArrayList ,作为所有这些ArrayList的并集,删除重复项。我怎样才能做到这一点?

lokaqttq

lokaqttq1#

为什么1000个json文件需要1000个rdd?
您认为将输入阶段的1000个json文件合并到一个rdd中有什么问题吗?
如果您将从输入阶段使用一个rdd,那么在这个rdd上执行所有需要的操作应该并不困难。

相关问题