如何在pyspark中根据多个条件更新框架行值?

qnzebej0  于 11个月前  发布在  Spark
关注(0)|答案(1)|浏览(150)

在pyspark中,我有当前的 Dataframe df_A:

+-------------+-------------+--------+-----------+--------+
|           id|         Type|id_count|     Value1|  Value2|
+-------------+-------------+--------+- ---------+--------+
|           18|          AAA|       2|      null|     null|
|           18|          CCC|       2|      null|     null|
|           16|          AAA|       2|      null|     null|
|           16|          BBB|       2|      null|     null|
|           17|          CCC|       1|      null|     null|
+-------------+-------------+--------+----------+---------+

字符串
我想循环遍历每一行,并根据条件更新Value 1和Value 2。

  • 如果id_count == 1且Type == CCC,我想在当前行中为Value 1输入一个值。
  • 如果id_count == 2且Type == CCC,我想在当前行中为Value 1输入一个值。
  • 如果id_count == 2且Type == AAA,我想在当前行中为Value 2输入一个值。

我已经在df.foreach和rdd.map上尝试过了,但是我无法更新行中的值。我不知道如何从foreach返回一个“行”,以便在我现有的数据框中更新。
我所尝试的是沿着这条线的某个地方,使用一个test-string作为value:

def update_row(row):
     row.Value1 = "test_value"
     return row

updated_df = df_A.foreach(lambda row: update_row(row)

  • 如何更新该值?
  • 它是否应返回行?
  • 如果返回一行,则将其追加到updated_df.

有人能帮我解释一下怎么做吗?

odopli94

odopli941#

你需要的确实是一种不同的方法。首先,要接近列,你需要使用DataFrame的.withColumn method。但是你不仅要更新,而且要有条件地更新。在这种情况下你需要的函数是“when”,可以像这样导入

from pyspark.sql.functions import when

字符串
然后,您将以以下方式处理该问题:

updated_df = df.withColumn("Value1", when(df.id_count == "1", "certain_value").otherwise(df.Value1))


这行代码将在id_count=1时将列“Value1”更新为“certain_value”,否则将保持原样。
你只需要合并适当地结合你的条件和价值观来达到你的目标。

相关问题