azure 自动加载器过滤器重复

nc1teljy  于 2023-08-07  发布在  其他
关注(0)|答案(1)|浏览(69)

我heaving流 Dataframe 和想知道我如何才能消除duplicataes加上只选择最新的modifiedon行。
比如说。
| 改良的| modifiedon |
| --| ------------ |
| 2019 - 08 - 21 10:00:00| 03/08/2023 |
| 2019 - 08 - 21 10:00:00| 03/08/2023 |
| 2019 - 08 - 21 00:00:00| 02/08/2023 |
| 2019 - 08 - 21 10:00:00| 03/08/2023 |
期望df
| 改良的| modifiedon |
| --| ------------ |
| 2019 - 08 - 21 10:00:00| 03/08/2023 |
| 2019 - 08 - 21 10:00:00| 03/08/2023 |

因此,如果存在idential(100%)相同的行,如id 1,则需要删除所有行,每个ID仅保留1行。而且,如果同一个ID被多次表示(多次更新),我需要根据最大修改的列进行选择。

df = (
    spark.readStream
    .option("delimiter", ",")
    .option("quote", '"')
    .option("mode", "permissive")
    .option("lineSep", "\r\n")
    .option("multiLine", "true")
    .format("cloudFiles")
    .option("cloudFiles.format", source_format)
    .option("cloudFiles.schemaLocation", checkpoint_directory)
    .option("header", "false")
    .option("escape", '"')
    .schema(schema)
    .load(data_source)
)

字符串

uyto3xhc

uyto3xhc1#

你可以遵循下面的方法。
下面是我使用的数据。


的数据

df = df.dropDuplicates().withColumn("modifiedon",to_timestamp("modifiedon","dd/MM/yyyy"))
df = df.withWatermark("modifiedon", "1 day")
df = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
display(df)

字符串
这里,在1天的时间段内,对流数据应用聚合并在列modifiedon上执行最大值。
输出量:


相关问题