Kafka Streams:如何确保在处理完成后提交偏移量

e5nszbig  于 2024-01-06  发布在  Apache
关注(0)|答案(2)|浏览(222)

我想使用Kafka流处理Kafka主题中的消息。
处理的最后一步是将结果放入数据库表中。为了避免与数据库争用相关的问题(程序将24*7运行并处理数百万条消息),我将使用JDBC调用。
但在这种情况下,有可能会丢失消息(在一个场景中,我从一个主题中读取了500条消息,流将标记偏移量,现在程序失败。JDBC批量更新中存在的消息丢失,但偏移量被标记为这些消息)。
我想在数据库插入/更新完成后手动标记最后一条消息的偏移量,但根据以下问题:How to commit manually with Kafka Stream?,这是不可能的。
有没有人能提出任何可能的解决办法

ruarlubt

ruarlubt1#

正如@sun007的回答中所暗示的,我宁愿稍微改变你的方法:

  • 使用Kafka Streams处理输入数据。让Kafka Streams应用程序将其输出写入Kafka,而不是关系数据库。
  • 使用Kafka Connect(例如,即用型JDBC连接器)将数据从Kafka摄取到关系数据库。根据需要配置和调优连接器,例如,用于批量插入数据库。

这种处理的解耦(Kafka Streams)和摄取(Kafka Connect)通常是一个更好的设计。例如,您不再将处理步骤与数据库的可用性相结合:如果数据库关闭,为什么您的KStreams应用程序应该停止?这是一个与处理逻辑无关的操作问题,在处理逻辑中,您肯定不希望处理超时、重试,(即使您使用Kafka Streams以外的工具进行处理,这种解耦仍然是一种更好的设置。)

p5cysglq

p5cysglq2#

Kafka Stream不支持手动提交,同时也不支持批处理。根据您的用例,有几种可能性:
1.使用普通消费者,实现批量处理和控制手工抵销。
1.按照以下Kafka Spark Structured Stream使用Spark Kafka结构化流
1.尝试Spring Kafka [ Spring Kafka ] 2
1.在这种情况下,也可以考虑使用JDBC Kafka Connector。Kafka JDBC Connector

相关问题