pyspark 如何使用Spark连接操作将两个 Dataframe 合并成一个新的 Dataframe ?

iecba09b  于 2023-01-20  发布在  Spark
关注(0)|答案(1)|浏览(164)

下面是我的两个输入PySpark Dataframe
Dataframe 1

li = [('abc', 'xyz')]
liColumns = ["aid", "bid"]
tempDF = spark.createDataFrame(data=li, schema = liColumns)
tempDF.printSchema()
tempDF.show(truncate=False)
+---+---+
|aid|bid|
+---+---+
|abc|xyz|
+---+---+

Dataframe 2

other_li = [('abc', '111', 'desc111'), ('abc', '112', 'desc112'), ('xyz', 'A123', 'city'), ('xyz', 'A456', 'state'), ('xyz', 'A789', 'zip')]
otherColumns = ['real_aid', 'code', 'some_value']
otherDF = spark.createDataFrame(data=other_li, schema = otherColumns)
otherDF.printSchema()
otherDF.show(truncate=False)
+--------+----+----------+
|real_aid|code|some_value|
+--------+----+----------+
|abc     |111 |desc111   |
|abc     |112 |desc112   |
|xyz     |A123|city      |
|xyz     |A456|state     |
|xyz     |A789|zip       |
+--------+----+----------+

问题:我如何合并这两个 Dataframe 来获得第三个 Dataframe 。2我知道如何使用append/union来完成这一操作,但是有没有一种方法可以使用joins来完成这一操作?3或者有没有一种方法可以更有效地完成这一操作?4我需要在两个大表上完成这一操作。

预期 Dataframe

output_li = [('abc', '111', 'desc111'), ('abc', '112', 'desc112'), ('abc', 'A123', 'city'), ('abc', 'A456', 'state'), ('abc', 'A789', 'zip'), ('xyz', 'A123', 'city'), ('xyz', 'A456', 'state'), ('xyz', 'A789', 'zip')]
otherColumns = ['real_aid', 'code', 'some_value']
otherDF = spark.createDataFrame(data=output_li, schema = otherColumns)
otherDF.printSchema()
otherDF.show(truncate=False)
+--------+----+----------+
|real_aid|code|some_value|
+--------+----+----------+
|abc     |111 |desc111   |
|abc     |112 |desc112   |
|abc     |A123|city      |
|abc     |A456|state     |
|abc     |A789|zip       |
|xyz     |A123|city      |
|xyz     |A456|state     |
|xyz     |A789|zip       |
+--------+----+----------+
vuv7lop3

vuv7lop31#

据我所知,您希望基于real_aidbid列连接两个 Dataframe ,然后,如果aid不等于real_aid,则需要"扩展"该行。
您可以这样做:

tempDF\
    .withColumnRenamed("bid", "real_aid")\
    .join(otherDF, ['real_aid'], "right")\
    .withColumn("real_aid", F.explode(F.array("real_aid", "aid")))\
    .drop("aid")\
    .filter(F.col("real_aid").isNotNull())\
    .show()
+--------+----+----------+
|real_aid|code|some_value|
+--------+----+----------+
|     abc| 111|   desc111|
|     abc| 112|   desc112|
|     xyz|A123|      city|
|     abc|A123|      city|
|     xyz|A456|     state|
|     abc|A456|     state|
|     xyz|A789|       zip|
|     abc|A789|       zip|
+--------+----+----------+

相关问题