在pyspark中高效地合并两个或多个Dataframe/rdd

h79rfbju  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(3441)

我正试图合并三个基于同一密钥的rdd。以下是数据。

+------+---------+-----+                                    
|UserID|UserLabel|Total|
+------+---------+-----+
|     2|    Panda|   15|
|     3|    Candy|   15|
|     1|  Bahroze|   15|
+------+---------+-----+
+------+---------+-----+
|UserID|UserLabel|Total|
+------+---------+-----+
|     2|    Panda| 7342|
|     3|    Candy| 5669|
|     1|  Bahroze| 8361|
+------+---------+-----+

+------+---------+-----+
|UserID|UserLabel|Total|
+------+---------+-----+
|     2|    Panda|   37|
|     3|    Candy|   27|
|     1|  Bahroze|   39|
+------+---------+-----+

我可以合并这三个数据框。我将它们转换为rdd dict,并使用以下三种代码

new_rdd = userTotalVisits.rdd.map(lambda row: row.asDict(True))

在rdd转换之后,我将一个rdd和另外两个作为列表。Map第一个rdd,然后基于相同的用户ID向其添加其他键。我希望有一个更好的方法来使用pyspark。这是我写的代码。

def transform(row):
    # Add a new key to each row
    for x in conversion_list: # first rdd in list of object as[{}] after using collect()
        if( x['UserID'] == row['UserID'] ):
            row["Total"] = { "Visitors": row["Total"], "Conversions": x["Total"]  }

    for y in Revenue_list: # second rdd in list of object as[{}] after using collect()
         if( y['UserID'] == row['UserID'] ):
            row["Total"]["Revenue"] = y["Total"]
    return row

potato = new_rdd.map(lambda row: transform(row)) #first rdd

如何有效地合并这三个RDD/DF(因为我必须在一个巨大的df上执行三个不同的任务。寻找一个更有效的想法。ps我还是个新手。我的代码的结果如下,这是我需要的。

{'UserID': '2', 'UserLabel': 'Panda', 'Total': {'Visitors': 37, 'Conversions': 15, 'Revenue': 7342}}
{'UserID': '3', 'UserLabel': 'Candy', 'Total': {'Visitors': 27, 'Conversions': 15, 'Revenue': 5669}}
{'UserID': '1', 'UserLabel': 'Bahroze', 'Total': {'Visitors': 39, 'Conversions': 15, 'Revenue': 8361}}

谢谢您。

fquxozlt

fquxozlt1#

您只需在所有三个Dataframe上执行左连接,但请确保您使用的第一个Dataframe具有所有userid和userlabel值。您可以忽略@blackbishop建议的groupby操作,但它仍然会提供所需的输出
我展示了如何在scala中完成它,但是您可以在python中做类似的事情。

//source data
val visitorDF = Seq((2,"Panda",15),(3,"Candy",15),(1,"Bahroze",15),(4,"Test",25)).toDF("UserID","UserLabel","Total")
val conversionsDF = Seq((2,"Panda",37),(3,"Candy",27),(1,"Bahroze",39)).toDF("UserID","UserLabel","Total")
val revenueDF = Seq((2,"Panda",7342),(3,"Candy",5669),(1,"Bahroze",8361)).toDF("UserID","UserLabel","Total")

import org.apache.spark.sql.functions._
val finalDF = visitorDF.as("v").join(conversionsDF.as("c"),Seq("UserID","UserLabel"),"left")
.join(revenueDF.as("r"),Seq("UserID","UserLabel"),"left")
.withColumn("TotalArray",struct($"v.Total".as("Visitor"),$"c.Total".as("Conversions"),$"r.Total".as("Revenue")))
.drop("Total")
display(finalDF)

您可以看到如下输出:

d8tt03nd

d8tt03nd2#

可以在列上连接3个Dataframe ["UserID", "UserLabel"] ,创建新结构 total 从总共3列中:

from pyspark.sql import functions as F

result = df1.alias("conv") \
    .join(df2.alias("rev"), ["UserID", "UserLabel"], "left") \
    .join(df3.alias("visit"), ["UserID", "UserLabel"], "left") \
    .select(
    F.col("UserID"),
    F.col("UserLabel"),
    F.struct(
        F.col("conv.Total").alias("Conversions"),
        F.col("rev.Total").alias("Revenue"),
        F.col("visit.Total").alias("Visitors")
    ).alias("Total")
)

# write into json file

result.write.json("output")

# print result:

for i in result.toJSON().collect():
    print(i)

# {"UserID":3,"UserLabel":"Candy","Total":{"Conversions":15,"Revenue":5669,"Visitors":27}}

# {"UserID":1,"UserLabel":"Bahroze","Total":{"Conversions":15,"Revenue":8361,"Visitors":39}}

# {"UserID":2,"UserLabel":"Panda","Total":{"Conversions":15,"Revenue":7342,"Visitors":37}}

相关问题