pyspark 存在流式聚合时不支持追加输出模式

ivqmmu1c  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(114)

我的问题是Append output mode not supported when there are streaming aggregations。正如前面提到的here,我需要在下面的代码中的groupby语句中添加modifiedon,如下所示
agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))

from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")

但这会产生问题,因为final中仍然包含重复的Id s。由于我没有在groupby中添加该列,所以稍后我在创建merge into delta表时遇到了问题。

final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
31moq8wy

31moq8wy1#

在这里,你需要给予timestamp类型的列在group by中使用window函数或timestamp列,但在你的情况下,你不能给予modifiedon列,即使它是timestamp类型,因为你的要求是对modifiedon列本身进行聚合。
因此,正如我前面提到的,使用具有更大天数的窗口,您可以确定这些天数在您的数据范围内。
在这里,我用了20000天。

agg = df.groupBy(window("modifiedon","20000 day"),"id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")

和输出。

final.writeStream.format("delta").option("checkpointLocation", "/csv_chk_pnt/").start("/out_csv/final/")

给予大量天数为很长的旧记录,甚至可能是五万天也。

相关问题