我尝试使用sparkDataframe而不是rdd,因为它们看起来比rdd更高级,并且倾向于生成更可读的代码。
在一个14节点的google dataproc集群中,我有大约600万个名字被两个不同的系统翻译成ids: sa
以及 sb
. 每个 Row
包含 name
, id_sa
以及 id_sb
. 我的目标是从 id_sa
至 id_sb
这样每个 id_sa
,对应的 id_sb
是附加到的所有名称中最常见的id id_sa
.
让我们用一个例子来说明。如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是从 a1
至 b2
. 事实上,与 a1
是 n1
, n2
以及 n3
,分别Map到 b1
, b2
以及 b2
,所以 b2
是关联到的名称中最频繁的Map a1
. 同样地, a2
将Map到 b2
. 我们可以假设总会有一个赢家:不需要打破僵局。
我希望我能 groupBy(df.id_sa)
但我不知道下一步该怎么办。我希望聚合最终能产生以下行:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
但也许我尝试使用错误的工具,我应该回去使用RDD。
2条答案
按热度按时间a9wyjsp71#
我认为您可能需要的是窗口功能:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
下面是scala中的一个示例(我现在没有带配置单元的spark shell,所以我无法测试代码,但我认为它应该可以工作):
可能有更有效的方法来实现与窗口函数相同的结果,但我希望这为您指明了正确的方向。
7nbnzgx92#
使用
join
(如果是平局,将导致多行分组):使用窗口函数(将删除关系):
使用
struct
订购:另请参见如何选择每组的第一行?