我正试图合并三个基于同一密钥的rdd。以下是数据。
+------+---------+-----+
|UserID|UserLabel|Total|
+------+---------+-----+
| 2| Panda| 15|
| 3| Candy| 15|
| 1| Bahroze| 15|
+------+---------+-----+
+------+---------+-----+
|UserID|UserLabel|Total|
+------+---------+-----+
| 2| Panda| 7342|
| 3| Candy| 5669|
| 1| Bahroze| 8361|
+------+---------+-----+
+------+---------+-----+
|UserID|UserLabel|Total|
+------+---------+-----+
| 2| Panda| 37|
| 3| Candy| 27|
| 1| Bahroze| 39|
+------+---------+-----+
我可以合并这三个数据框。我将它们转换为rdd dict,并使用以下三种代码
new_rdd = userTotalVisits.rdd.map(lambda row: row.asDict(True))
在rdd转换之后,我将一个rdd和另外两个作为列表。Map第一个rdd,然后基于相同的用户ID向其添加其他键。我希望有一个更好的方法来使用pyspark。这是我写的代码。
def transform(row):
# Add a new key to each row
for x in conversion_list: # first rdd in list of object as[{}] after using collect()
if( x['UserID'] == row['UserID'] ):
row["Total"] = { "Visitors": row["Total"], "Conversions": x["Total"] }
for y in Revenue_list: # second rdd in list of object as[{}] after using collect()
if( y['UserID'] == row['UserID'] ):
row["Total"]["Revenue"] = y["Total"]
return row
potato = new_rdd.map(lambda row: transform(row)) #first rdd
如何有效地合并这三个RDD/DF(因为我必须在一个巨大的df上执行三个不同的任务。寻找一个更有效的想法。ps我还是个新手。我的代码的结果如下,这是我需要的。
{'UserID': '2', 'UserLabel': 'Panda', 'Total': {'Visitors': 37, 'Conversions': 15, 'Revenue': 7342}}
{'UserID': '3', 'UserLabel': 'Candy', 'Total': {'Visitors': 27, 'Conversions': 15, 'Revenue': 5669}}
{'UserID': '1', 'UserLabel': 'Bahroze', 'Total': {'Visitors': 39, 'Conversions': 15, 'Revenue': 8361}}
谢谢您。
2条答案
按热度按时间fquxozlt1#
您只需在所有三个Dataframe上执行左连接,但请确保您使用的第一个Dataframe具有所有userid和userlabel值。您可以忽略@blackbishop建议的groupby操作,但它仍然会提供所需的输出
我展示了如何在scala中完成它,但是您可以在python中做类似的事情。
您可以看到如下输出:
d8tt03nd2#
可以在列上连接3个Dataframe
["UserID", "UserLabel"]
,创建新结构total
从总共3列中: