我的问题是Append output mode not supported when there are streaming aggregations
。正如前面提到的here,我需要在下面的代码中的groupby
语句中添加modifiedon
,如下所示agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))
from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")
但这会产生问题,因为final
中仍然包含重复的Id
s。由于我没有在groupby中添加该列,所以稍后我在创建merge into
delta表时遇到了问题。
final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
1条答案
按热度按时间31moq8wy1#
在这里,你需要给予timestamp类型的列在group by中使用
window
函数或timestamp列,但在你的情况下,你不能给予modifiedon
列,即使它是timestamp类型,因为你的要求是对modifiedon
列本身进行聚合。因此,正如我前面提到的,使用具有更大天数的窗口,您可以确定这些天数在您的数据范围内。
在这里,我用了20000天。
和输出。
给予大量天数为很长的旧记录,甚至可能是五万天也。