pyspark sql合并组合两个具有相同值但不同名称的列

7z5jn7bk  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(148)

我有两个结构如下的表
表1

  1. lang created_date
  2. java 11-01-23
  3. python 11-11-23

字符串
表2

  1. lang ingested_date
  2. scala 11-21-23


我想创建具有预期结果的组合表:
表3

  1. lang created_date
  2. java 11-01-23
  3. python 11-11-23
  4. scala 11-21-23


实际结果

  1. lang created_date
  2. java 11-01-23
  3. python 11-11-23
  4. scala 11-21-23
  5. scala null


我使用下面的python pyspark代码,但它给了我额外的一行,在created_date列中有空值。

  1. table1DF = sparkSession.read.table("Table1")
  2. table2DF = sparkSession.read.table("Table2")
  3. table1 = table1DF.col("lang").col("created_date")
  4. table2 = table2DF.col("lang").col("ingested_date").alias("created_date")
  5. merged_table = table1.union(table2)
  6. final_table = merged_table.groupBy("lang", "created_date")


当我使用union合并两个表中的数据时,如何避免在其中一列中获得最后一行空值?

9bfwbjaz

9bfwbjaz1#

可以使用coalesce函数从两列中选择非空值

  1. from pyspark.sql.functions import coalesce
  2. table1DF = sparkSession.read.table("Table1")
  3. table2DF = sparkSession.read.table("Table2")
  4. table1 = table1DF.select("lang", "created_date")
  5. table2 = table2DF.select("lang", "ingested_date").alias("created_date")
  6. merged_table = table1.union(table2)
  7. final_table = merged_table.groupBy("lang").agg(coalesce("created_date", "ingested_date").alias("created_date"))

字符串

wz1wpwve

wz1wpwve2#

下面是我针对你的问题测试的一段代码,union函数不关心列的名称,它关心它们的编号和类型,它将数据堆叠在彼此的顶部,获取行('scala',NULL)很奇怪,因为我测试过了,它工作正常,除非你已经在table 2中有了行。

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. from pyspark.sql import Row
  4. from pyspark.sql.functions import coalesce
  5. spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
  6. # Creating the data sample
  7. schema = StructType([
  8. StructField("lang", StringType(), True),
  9. StructField("created_date", StringType(), True)
  10. ])
  11. rows = [Row("java", "11-01-23"), Row("python", "11-11-23")]
  12. rows2 = [Row("scala", "11-21-23"), Row("scala", None)]
  13. schema2 = StructType([
  14. StructField("lang", StringType(), True),
  15. StructField("ingested_date", StringType(), True)
  16. ])
  17. # Creating the dataframes
  18. df = spark.createDataFrame(rows, schema)
  19. df2 = spark.createDataFrame(rows2, schema2)
  20. # Union the dataframes
  21. result = df.union(df2)
  22. result.dropna().show()

字符串
即使表1 created_date中的column 2和表2中的column 2被称为ingested_dateunion也会堆叠数据,因为它会检查 Dataframe 的列数和每列的类型。
关于你的代码,不幸的是,它是不可复制的。但是,如果你在执行union后仍然有一个额外的无用行,你可以做以下事情,你会很好:

  1. merged_table = table1.union(table2).dropna()


好奇和开放的讨论,我希望这有助于!

展开查看全部

相关问题