我确定每个城市活动部门的员工和企业数量:
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|84.11Z |O |Administration publique générale |3 |153.5 |169.5 |
|14654 |Saint-Pierre-en-Auge|16.24Z |C |Fabrication d'emballages en bois |1 |149.5 |150.5 |
|14654 |Saint-Pierre-en-Auge|10.11Z |C |Transformation et conservation de la viande de boucherie |1 |149.5 |150.5 |
具有分组级别( regroupement
以下)由用户设置:
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|10 |C |Industries alimentaires |16 |208.0 |225.0 |
|14654 |Saint-Pierre-en-Auge|86 |Q |Activités pour la santé humaine |46 |169.5 |218.5 |
|14654 |Saint-Pierre-en-Auge|84 |O |Administration publique et défense ; sécurité sociale obligatoire |5 |153.5 |171.5 |
工作是这样做的:
从 Dataset
对于按部门代码(大致为城市代码的前两个字符)划分的企业和机构,选择以下列: city_code
, city_name
, grouping
(我们保留的活动代码部分: 84.11Z
或者 84
), section
(概括一项活动的部门的代码:工业、商业等), activity_description
, siren
(企业标识符:一个企业可能有许多机构), number_of_workers
, number_of_actives_people
一 groupBy
完成时间:
RelationalGroupedDataset group = enterprisesAndEstablishments
.groupBy("city_code", "city_name", "grouping", "section", "activity_description");
我通过聚合执行计算,然后:
group.agg(countDistinct("siren").as("nombreEntreprises"),
sum("number_of_workers").as("nombreSalaries"),
sum("number_of_actives_people").as("nombreActifs"));
我的问题是 groupBy
方法不关心数据集分区,而是从数据集的任何分区收集数据 enterprisesAndEstablishments
对大量数据进行全局排序,
当只针对一个部分时效率会更高:此示例中的所有活动都在分区中 [codeDepartement=14]
.
我希望它尊重这些分区并这样做 groupBy
以避免混乱。
我在找 sortWithPartitions
的同伴 groupBy
. 一些被称为 groupWithinPartitions
但我没找到。
实现我所追求的目标的最佳方式是什么,
或者如果没有工具,我应该选择什么替代方法?
2条答案
按热度按时间vs91vp4v1#
如果您绝对确定每个分区包含属于该组的所有行,那么可以使用mappartitions在纯java中计算每个组的值,而不使用spark。
这样,分组只在一个分区内进行,不会发生洗牌。缺点是不能使用spark的聚合函数。它们必须被纯java解决方案所取代,例如流。由于这需要相当多的代码行,我不确定是否值得付出努力。
如果分区是正确的,则
aggregated
Dataframe将与中的相同group.agg(...)
在这个问题上。方法
combine
用于组合三个采集器的参数取自此问题和此链接:mefy6pfw2#
您可以使用rdd低级函数实现同样的功能
aggregateByKey
它是聚合函数之一(其他函数是reduceByKey
&groupByKey
)可在Spark一个区别,使它成为一个强大的三个之一。聚合键不需要对同一数据类型进行操作,可以在分区内进行不同的聚合(最大、最小、平均、总和和计数),并在分区之间进行不同的聚合。
aggregatebykey需要3个主要输入:
zerovalue:初始值,它不会影响聚合值。
组合器函数:该函数接受两个参数。第二个参数合并到第一个参数中。此函数用于组合/合并单个分区中的值。
reduce/merge函数:这个函数还接受两个参数。在这里,参数被合并成一个跨rdd分区的参数。
输出: