当我从最新的offset启动spark structured streaming 3.0.1应用程序时,它运行得很好。但是当我想从一些早期的偏移开始时-例如:
开始偏移到“最早”
开始偏移到特定偏移,如{“mytopic-v1”:{“0”:1686734237}
我可以在日志中看到,起始偏移量被正确拾取,但随后会发生一系列寻道(从我定义的位置开始),直到它到达当前的最新偏移量。
我删除了我的检查点目录,并尝试了几个选项,但情况总是一样的-它报告正确的起始偏移量,但然后需要很长时间,只是慢慢寻找到最新的,并开始处理-知道为什么,我应该额外检查什么吗?
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786737237 for partition MyTopic-v1-0
我让应用程序运行了更长的时间,它最终开始生成文件,但我的处理触发器(100秒)没有满足要求,数据显示得更晚了——20-30分钟之后。
(我也在spark 2.4.5上测试了它-同样的问题-可能是Kafka的配置?)
1条答案
按热度按时间inb24sb21#
使用选项
startingOffsets
使用如您所示的json对象应该可以很好地工作。您所观察到的是,在应用程序的第一次启动时,结构化流作业将读取所有(!)从提供的偏移量(1686734237)到主题中最后一个可用偏移量。由于这可能是相当多的消息,因此处理这一大块消息将使第一个微批处理非常繁忙。
记住
Trigger
选项仅定义微批次的触发频率。您应该确保将此触发速率与预期的处理时间对齐。我在这里看到了两个基本的选择:使用选项
maxOffsetsPerTriger
限制每个触发器/微批从kafka获取的偏移量避免使用任何触发器,因为默认情况下,这将允许流在前一个触发器处理完数据后立即触发