基于pyspark中的其他列值创建新行

gwbalxhn  于 2024-01-06  发布在  Spark
关注(0)|答案(4)|浏览(244)

我有一个pyspark框架如下:

  1. c1 c2
  2. 111 null
  3. null 222
  4. 333 444
  5. null null

字符串
我需要有一个额外的列像下面的最终框架

  1. c1 c2 new_col
  2. 111 null 111
  3. null 222 222
  4. 333 444 333
  5. 333 444 444
  6. null null null


如果两个列都有值,那么我需要创建一个新的行,其中包含cols1和col2的值。

  1. df = df.withColumn('new_col', when(col('c1').isNull(), col('c2')) \
  2. .otherwise(when(col('c2').isNull(), col('c1')).otherwise(col(c2'))))


如果列c1和c2都有值,我会创建一个新行。有人能提出解决方案吗?

pn9klfpd

pn9klfpd1#

可以使用unionAll创建新行。在Scala上,可以轻松转换为Python:

  1. df.withColumn("new_col", coalesce($"c1", $"c2"))
  2. .unionAll(
  3. df.where($"c1".isNotNull && $"c2".isNotNull)
  4. .withColumn("new_col", $"c2")
  5. )

字符串
测试结果:

  1. +----+----+-------+
  2. |c1 |c2 |new_col|
  3. +----+----+-------+
  4. |111 |null|111 |
  5. |null|222 |222 |
  6. |333 |444 |333 |
  7. |null|null|null |
  8. |333 |444 |444 |
  9. +----+----+-------+

展开查看全部
o75abkj4

o75abkj42#

  1. from pyspark.sql import functions as F
  2. data = [(111, None), (None, 222), (333, 444), (None, None)]
  3. columns = ["c1", "c2"]
  4. df = spark.createDataFrame(data, columns)
  5. df = df.withColumn("c1", F.col("c1").cast('int'))
  6. df = df.withColumn("c2", F.col("c2").cast('int'))
  7. df1 = df.filter((F.col("c1").isNotNull()) & (F.col("c2").isNotNull()))
  8. df2 = df.filter(~((F.col("c1").isNotNull()) & (F.col("c2").isNotNull())))
  9. df1 = (df1.withColumn("new_col", F.array(df["c1"], df["c2"]))
  10. .withColumn("new_col", F.explode("new_col"))
  11. .withColumn("new_col", F.col("new_col").cast("int")))
  12. df2 = df2.withColumn("new_col", F.when(df2["c1"].isNull(), df2["c2"])
  13. .when(df2["c2"].isNull(), df2["c1"])
  14. .when((df2["c2"].isNull() & df2["c1"].isNull()), F.lit(None).cast('int'))
  15. .otherwise(F.lit(None).cast('int')))
  16. df1.show()
  17. df2.show()
  18. final_df = df2.unionByName(df1)
  19. final_df.show()

字符串
这里是结果,应该是你想要的输出

  1. +----+----+-------+
  2. | c1| c2|new_col|
  3. +----+----+-------+
  4. | 111|NULL| 111|
  5. |NULL| 222| 222|
  6. |NULL|NULL| NULL|
  7. | 333| 444| 333|
  8. | 333| 444| 444|
  9. +----+----+-------+

展开查看全部
wmvff8tz

wmvff8tz3#

你可以使用coalesce来创建一个数组,然后使用explode来从中创建行:

  1. from pyspark.sql.functions import expr, explode, coalesce
  2. df \
  3. .withColumn(
  4. "array_col",
  5. expr(
  6. "CASE WHEN c1 IS NOT NULL AND c2 IS NOT NULL THEN array(c1, c2)" +
  7. "ELSE array(coalesce(c1, c2))" +
  8. "END"
  9. )
  10. ) \
  11. .withColumn("new_col", explode("array_col")) \
  12. .drop("array_col") \
  13. .show()

字符串
输出量:

  1. +----+----+-------+
  2. | c1| c2|new_col|
  3. +----+----+-------+
  4. | 111|NULL| 111|
  5. |NULL| 222| 222|
  6. | 333| 444| 333|
  7. | 333| 444| 444|
  8. |NULL|NULL| NULL|
  9. +----+----+-------+

展开查看全部
jecbmhm3

jecbmhm34#

要实现所需的结果,您可以使用union操作将DataFrame与其自身的修改版本组合在一起,其中新列(new_col)根据您提到的条件填充。以下是如何做到这一点:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import functions as F
  3. # Create a Spark session
  4. spark = SparkSession.builder.appName("example").getOrCreate()
  5. # Sample DataFrame
  6. data = [(111, None), (None, 222), (333, 444), (None, None)]
  7. columns = ["c1", "c2"]
  8. df = spark.createDataFrame(data, columns)
  9. # Create a new DataFrame with a modified new_col
  10. new_df = df.withColumn("new_col", F.when(df["c1"].isNull(), df["c2"])
  11. .when(df["c2"].isNull(), df["c1"])
  12. .otherwise(F.array(df["c1"], df["c2"])))
  13. # Explode the array in new_col to create separate rows
  14. result_df = new_df.select("c1", "c2", F.explode("new_col").alias("new_col"))
  15. # Show the result
  16. result_df.show()

字符串
这将为您提供以下DataFrame:

  1. +----+----+-------+
  2. | c1| c2|new_col|
  3. +----+----+-------+
  4. | 111|null| 111|
  5. |null| 222| 222|
  6. | 333| 444| 333|
  7. | 333| 444| 444|
  8. |null|null| null|
  9. +----+----+-------+


在这里,F.array(df["c1"], df["c2"])用于创建一个数组列new_col,其中包含c1c2值。然后使用F.explode函数将该数组分解为单独的行。这样,您可以为数组中的每个值获得一个新行。

展开查看全部

相关问题