如何在一个单独的框架的列值上分割PySpark框架?

sbtkgmzw  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(115)

我有一个PySpark嵌套框架,如果一列的值存在于另一个嵌套框架中,我想根据条件将它分成两个嵌套框架。
例如,我的输入框看起来像:

| Product     | Category        |
| ----------- | ----------------|
| Product A   | Food            |
| Product B   | Food            |
| Product C   | leisure goods.  |
| Product D   | drinks          |

第二个框架是:

| Product Categories |
| -------------------|
| Food               |
| leisure goods      |

因此,我想有两个由类别分裂的第二个嵌套框架:

df1.show()
| Product     | Category        |
| ----------- | ----------------|
| Product A   | Food            |
| Product B   | Food            |
| Product C   | leisure goods.  |

df2.show()
| Product     | Category        |
| ----------- | ----------------|
| Product D   | drinks          |

当然,我可以在同一个框架上做两个过滤器操作,但我希望运行时间更长。

oalqel3c

oalqel3c1#

我尝试使用join操作来重新生成PySpark,根据Category列存在于第二个DataFrame中的条件来过滤DataFrame。
下面是代码:

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("SplitDataFrame").getOrCreate()
data1 = [("Product A", "Food"),
("Product B", "Food"),
("Product C", "leisure goods"),
("Product D", "drinks")]
columns1 = ["Product", "Category"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [("Food",),
("leisure goods",)]
columns2 = ["Product Categories"]
df2 = spark.createDataFrame(data2, columns2)
df1_filtered = df1.join(df2, df1["Category"] == df2["Product Categories"], "inner").select(df1["Product"], df1["Category"])
df2_filtered = df1.join(df2, df1["Category"] == df2["Product Categories"], "left_anti").select(df1["Product"], df1["Category"])
df1_filtered.show()
df2_filtered.show()

  • 我创建了两个 Dataframe df1和df2
  • 我使用联接操作根据df1中的Category列存在于df2中的ProductCategories列的条件来过滤df1。这将导致df1_filtered。
  • 使用左反联接(left_anti)过滤df1以获取df2中不存在类别的行。这将导致df2_filtered。

通过遵循上述方法,您可以根据条件df1_filtered包含df2中存在的类别的行,而df2_filtered包含df2中不存在的类别的行,将DataFrame拆分为两个DataFrame。

相关问题