风暴过后向Kafka提交补偿

9cbw7uwe  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(227)

当批次螺栓处理完一个批次时,只提交每个部分的最高偏移量的正确方法是什么?我主要担心的是机器在批量加工时会死机,因为整个shebang将在aws现场运行。
我是新的风暴发展,我似乎找不到一个答案,国际海事组织是相当直接的使用Kafka和风暴。
脚本:
根据保证消息处理指南,假设我有一个 ("word",count) tuples,批量处理x tuples,进行一些聚合并创建csv文件,将文件上传到hdfs/db和acks。
在非strom“naive”实现中,我将读取x个msg(或读取y秒),聚合,写入hdfs,一旦上传完成,将每个分区的最新(最高)偏移提交给kafka。如果机器或进程在db提交之前死亡-下一次迭代将从上一个地方开始。
在storm中,我可以创建一个batch bolt,它将锚定所有批处理元组并立即确认它们,但是我找不到将每个分区的最高偏移量提交给kafka的方法,因为喷口不知道该批处理,所以一旦batch bolt确认了元组,每个喷口示例都将逐个确认其元组,因此我认为我可以:
在喷口上的每个ack上提交acked消息的偏移量。这将导致许多提交(每个批可以是几个tuples),可能是无序的,并且如果在提交偏移量时spoutworks终止,我将部分地重放一些事件。
与1相同。但是我可以在提交的最高偏移量中添加一些本地偏移量管理(修复无序的偏移量提交),并提交每隔几秒钟出现的最高偏移量(减少提交的高数量),但是如果喷口死亡,我仍然可以得到部分提交的偏移量
将偏移提交逻辑移到bolt中-我可以将每个消息的分区和偏移添加到发送到批处理bolt的数据中,并将每个分区的最高处理偏移作为批处理的一部分提交(在批处理结束时发送到“offset submitter”bolt)。这将解决偏移跟踪、多次提交和空间重放的问题,但这将为螺栓添加Kafka特定的逻辑,从而将螺栓代码与Kafka连接起来,在我看来,一般来说似乎是重新发明了车轮。
更进一步与车轮改造和手动管理最高处理的分配偏移组合在zk和读取这个值时,我初始化喷口。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题