我要根据指定的预定义类别对源中的所有项进行分组。每个类别的项目数可能在数百万左右。groupby帮助我实现了这一点,但是我想了解在分组之前对产品类型重新分区是否会更有效?
spark作业的来源是配置单元表。spark的版本是最新的2.4.4版本。对我来说,问题是我想为给定类别中的每个项目和其他项目运行一个定制的相似性算法。所以,在这个操作结束时,对于每一个项目,我都会有10个最相似的项目。
因为这涉及到groupby操作,而且groupby涉及到数据的洗牌,所以我认为首先应该根据类别重新划分数据。我甚至可以将分区的数量设置为我拥有的类别的数量(数量级为100)。
一旦数据在单个worker上被重新分区发送,那么运行groupby应该是一个本地操作—如果我在同一类型上执行groupby的话。这个假设正确吗?
// For demo, I am reading from CSV. The final source is a hive table
Dataset<Row> rows = spark.read().option("sep", "\t")
.csv("<some path>")
.repartition(20, new Column("category"))
.cache();
Dataset<Row> ids_grouped_by_category = rows.map((MapFunction<Row, Row>) items -> {
// Some transformation returns a row in the format I need.
return new-row;
}, <encoder>)
.groupBy(functions.col("category"))
.agg(functions.collect_list("category").as("ids"));
在这个操作结束时,我已经能够将给定类别的所有项目ID分组到一个列表中。像这样:
+---------------------------+------------------------------------------+
|category | ids |
+---------------------------+------------------------------------------+
|category-1 | [id1, id2...] |
|category-2 | [idx, idy...] |
+---------------------------+------------------------------------------+
我已经能够得到我需要的格式的数据,但我想了解的是,这样做一个小组的方式是正确的?另外,执行collectlist操作意味着什么?它能把所有的东西都载入内存吗?
暂无答案!
目前还没有任何答案,快来回答吧!