在pyspark中,我有当前的 Dataframe df_A:
+-------------+-------------+--------+-----------+--------+
| id| Type|id_count| Value1| Value2|
+-------------+-------------+--------+- ---------+--------+
| 18| AAA| 2| null| null|
| 18| CCC| 2| null| null|
| 16| AAA| 2| null| null|
| 16| BBB| 2| null| null|
| 17| CCC| 1| null| null|
+-------------+-------------+--------+----------+---------+
字符串
我想循环遍历每一行,并根据条件更新Value 1和Value 2。
- 如果id_count == 1且Type == CCC,我想在当前行中为Value 1输入一个值。
- 如果id_count == 2且Type == CCC,我想在当前行中为Value 1输入一个值。
- 如果id_count == 2且Type == AAA,我想在当前行中为Value 2输入一个值。
我已经在df.foreach和rdd.map上尝试过了,但是我无法更新行中的值。我不知道如何从foreach返回一个“行”,以便在我现有的数据框中更新。
我所尝试的是沿着这条线的某个地方,使用一个test-string作为value:
def update_row(row):
row.Value1 = "test_value"
return row
updated_df = df_A.foreach(lambda row: update_row(row)
型
- 如何更新该值?
- 它是否应返回行?
- 如果返回一行,则将其追加到updated_df.
有人能帮我解释一下怎么做吗?
1条答案
按热度按时间odopli941#
你需要的确实是一种不同的方法。首先,要接近列,你需要使用DataFrame的.withColumn method。但是你不仅要更新,而且要有条件地更新。在这种情况下你需要的函数是“when”,可以像这样导入
字符串
然后,您将以以下方式处理该问题:
型
这行代码将在id_count=1时将列“Value1”更新为“certain_value”,否则将保持原样。
你只需要合并适当地结合你的条件和价值观来达到你的目标。