Spark streaming -具有重复行为的withWatermark()

3ks5zfa0  于 2023-11-21  发布在  Apache
关注(0)|答案(1)|浏览(134)

我阅读数据从Kafka(startingOffsets作为最早)和写入控制台上的数据进行几次测试。已给出水印持续时间为10秒。以下Spark文档-https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid"])

result_df\
    .writeStream\
    .option("checkpointLocation", "gs://test/checkpoint_ts/")\
    .format("console")\
    .start()

字符串

**将以下记录逐个传递到Kafka,并从spark结构化流中阅读每条记录:**忽略以下#

{“uuid”:10150,“event_time”:“2023-08- 07 T08:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10151,“event_time”:“2023-08- 07 T09:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10152,“event_time”:“2023-08- 07 T10:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10150,“event_time”:“2023-08- 07 T11:00:00.004071876Z”} #已丢弃,未显示在控制台上(这是预期的吗?鉴于水印阈值现在为2023-08- 07 T10:00:00 - 10秒,因此该记录不应通过?)
{“uuid”:10153,“event_time”:“2023-08- 07 T06:00:00.004071876Z”} #已丢弃,预期给定的event_time早于水印阈值

问题:当阅读第4条uuid:10150的记录时,水印阈值为2023-08- 07 T10:00:00 - 10 sec。由于第一条具有相同uuid:10150的记录(具有不同的event_time福尔斯)比当前水印阈值更旧,因此第4条记录是否应通过给定的spark流式传输状态存储,而不是将第1条记录保存在存储器中,以便spark基于uuid将其检测为重复?

y1aodyip

y1aodyip1#

dropDuplicates运算符有点微妙。下面是它工作的两种“模式”:
1.如果你没有传入一个事件时间列,它将无限期地保持状态。在这种情况下,它会进行适当的“全局”重复数据删除,因为它会记住它看到的所有内容。
1.如果你传入一个事件时间列,它将清除小于水印的状态。它还使用你提供的所有列执行重复数据删除。
你做了什么
在你的例子中,你只传入了一个“value”列,即非事件时间列。这是“mode”1:

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid"]) # <--- event_time not being passed in!

字符串
因此,dropDuplicates将无限期地保留状态。因此,示例中的记录0、1、2和4将被允许通过,但记录3将被删除重复数据,因为记录0中存在其UUID 10150

你想做的事

真的,你想做第二个“模式”:

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid", "event_time"])


让我们考虑一下你的例子,假设你这样做了。我将假设这些记录中的每一个都在它自己的批次中被处理,这样水印就更新了每一个记录(结构化流只更新每一个批次的水印)。
1.第一条记录被处理。(10150,8小时)被添加到状态。水印更新到07:59:50。我们发出记录,因为它是唯一的。
1.第二条记录被处理。(10151,9 hours)被添加到状态,水印更新为08:59:50。我们发出记录。由于水印现在大于事件时间(10150,8 hours),其他记录处于状态,我们从状态中删除(10150,8 hours)。现在,状态中唯一的内容是(10151,9 hours)。
1.同样的事情。(10152,10 hours)被添加到状态,水印更新为09:59:50,我们发出记录。由于水印大于(10151,9 hours)的事件时间,状态中的另一条记录,我们从状态中删除(10151,9 hours)。现在,状态中唯一的东西是(10152,10 hours)。
1.现在,为了一些微妙之处:下一条记录的uuid 10150不是全局唯一的,但是由于我们由于水印而删除了state,操作员不知道它以前是否见过10150。因此,我们实际上发出了这条记录。(10150,11 hours)被添加到state,水印更新为10:59:50,并且我们从state中删除(10152,10 hours)。
1.此时,水印为10:59:50。由于最后一条记录的时间戳小于水印,因此我们将其删除,即使它是全局唯一的。在StreamingQueryProgress中,您将看到numRowsDroppedByWatermark为1。

为什么会掉记录

如果您遵循了这个示例,您会注意到我们删除记录有两个原因。
1.第一个原因是我们发现了一个重复的记录。如果我们收到的记录已经存在于状态中,就会发生这种情况。
1.第二个原因是由于一个 late 记录。如果我们收到一个事件时间小于水印的记录,我们会删除它(即使它是全局唯一的)。
删除记录的第二个原因实际上与去重无关。它只与水印的语义有关。水印为我们提供了一个事件时间t,在此之前我们可以预期不会收到更多记录。如果引擎确实收到了一个事件时间小于t的记录,它将忽略它。
在Structured Streaming中,任何内置的有状态操作符都是如此,无论是重复数据删除、聚合还是流-流连接。

结语

  • 如果您的流上有一个withWatermark调用,那么您应该将正在使用的事件时间列的名称传递给dropDuplicates操作符。
  • 事件时间小于水印的记录将被删除。
  • 如果记录不是全局唯一的,但其副本已从状态存储中删除,则会发出这些记录。这可能会导致下游副本,特别是当水印延迟太小时。
  • 如果你真的不能容忍重复,设置一个大的水印延迟或做真正的重复数据删除没有水印。

相关问题