在pyspark中高效地合并两个或多个Dataframe/rdd

h79rfbju  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(3537)

我正试图合并三个基于同一密钥的rdd。以下是数据。

  1. +------+---------+-----+
  2. |UserID|UserLabel|Total|
  3. +------+---------+-----+
  4. | 2| Panda| 15|
  5. | 3| Candy| 15|
  6. | 1| Bahroze| 15|
  7. +------+---------+-----+
  8. +------+---------+-----+
  9. |UserID|UserLabel|Total|
  10. +------+---------+-----+
  11. | 2| Panda| 7342|
  12. | 3| Candy| 5669|
  13. | 1| Bahroze| 8361|
  14. +------+---------+-----+
  15. +------+---------+-----+
  16. |UserID|UserLabel|Total|
  17. +------+---------+-----+
  18. | 2| Panda| 37|
  19. | 3| Candy| 27|
  20. | 1| Bahroze| 39|
  21. +------+---------+-----+

我可以合并这三个数据框。我将它们转换为rdd dict,并使用以下三种代码

  1. new_rdd = userTotalVisits.rdd.map(lambda row: row.asDict(True))

在rdd转换之后,我将一个rdd和另外两个作为列表。Map第一个rdd,然后基于相同的用户ID向其添加其他键。我希望有一个更好的方法来使用pyspark。这是我写的代码。

  1. def transform(row):
  2. # Add a new key to each row
  3. for x in conversion_list: # first rdd in list of object as[{}] after using collect()
  4. if( x['UserID'] == row['UserID'] ):
  5. row["Total"] = { "Visitors": row["Total"], "Conversions": x["Total"] }
  6. for y in Revenue_list: # second rdd in list of object as[{}] after using collect()
  7. if( y['UserID'] == row['UserID'] ):
  8. row["Total"]["Revenue"] = y["Total"]
  9. return row
  10. potato = new_rdd.map(lambda row: transform(row)) #first rdd

如何有效地合并这三个RDD/DF(因为我必须在一个巨大的df上执行三个不同的任务。寻找一个更有效的想法。ps我还是个新手。我的代码的结果如下,这是我需要的。

  1. {'UserID': '2', 'UserLabel': 'Panda', 'Total': {'Visitors': 37, 'Conversions': 15, 'Revenue': 7342}}
  2. {'UserID': '3', 'UserLabel': 'Candy', 'Total': {'Visitors': 27, 'Conversions': 15, 'Revenue': 5669}}
  3. {'UserID': '1', 'UserLabel': 'Bahroze', 'Total': {'Visitors': 39, 'Conversions': 15, 'Revenue': 8361}}

谢谢您。

fquxozlt

fquxozlt1#

您只需在所有三个Dataframe上执行左连接,但请确保您使用的第一个Dataframe具有所有userid和userlabel值。您可以忽略@blackbishop建议的groupby操作,但它仍然会提供所需的输出
我展示了如何在scala中完成它,但是您可以在python中做类似的事情。

  1. //source data
  2. val visitorDF = Seq((2,"Panda",15),(3,"Candy",15),(1,"Bahroze",15),(4,"Test",25)).toDF("UserID","UserLabel","Total")
  3. val conversionsDF = Seq((2,"Panda",37),(3,"Candy",27),(1,"Bahroze",39)).toDF("UserID","UserLabel","Total")
  4. val revenueDF = Seq((2,"Panda",7342),(3,"Candy",5669),(1,"Bahroze",8361)).toDF("UserID","UserLabel","Total")
  5. import org.apache.spark.sql.functions._
  6. val finalDF = visitorDF.as("v").join(conversionsDF.as("c"),Seq("UserID","UserLabel"),"left")
  7. .join(revenueDF.as("r"),Seq("UserID","UserLabel"),"left")
  8. .withColumn("TotalArray",struct($"v.Total".as("Visitor"),$"c.Total".as("Conversions"),$"r.Total".as("Revenue")))
  9. .drop("Total")
  10. display(finalDF)

您可以看到如下输出:

展开查看全部
d8tt03nd

d8tt03nd2#

可以在列上连接3个Dataframe ["UserID", "UserLabel"] ,创建新结构 total 从总共3列中:

  1. from pyspark.sql import functions as F
  2. result = df1.alias("conv") \
  3. .join(df2.alias("rev"), ["UserID", "UserLabel"], "left") \
  4. .join(df3.alias("visit"), ["UserID", "UserLabel"], "left") \
  5. .select(
  6. F.col("UserID"),
  7. F.col("UserLabel"),
  8. F.struct(
  9. F.col("conv.Total").alias("Conversions"),
  10. F.col("rev.Total").alias("Revenue"),
  11. F.col("visit.Total").alias("Visitors")
  12. ).alias("Total")
  13. )
  14. # write into json file
  15. result.write.json("output")
  16. # print result:
  17. for i in result.toJSON().collect():
  18. print(i)
  19. # {"UserID":3,"UserLabel":"Candy","Total":{"Conversions":15,"Revenue":5669,"Visitors":27}}
  20. # {"UserID":1,"UserLabel":"Bahroze","Total":{"Conversions":15,"Revenue":8361,"Visitors":39}}
  21. # {"UserID":2,"UserLabel":"Panda","Total":{"Conversions":15,"Revenue":7342,"Visitors":37}}
展开查看全部

相关问题