在storm bolt中获取单个元组的kafka偏移量

7gcisfzg  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(344)

用例
使用apachestorm将kafka消息持久化到s3
迄今为止的故事
我试过用secor(https://github.com/pinterest/secor),工作很好,服务宗旨。但是,按照经理的说法,这可能是维护过度(他们说他总是对的)
我们已经有了ApacheKafkaApacheStorm稳定集群,所以计划利用infra。
议程与问题
来自Kafka的消息将在Storm中批处理,并以文件的形式写入本地磁盘
在一定的间隔和/或大小标准之后,它将被上传到s3
为了管理故障,每个bolt应该能够跟踪kafka分区,并理想地按元组进行偏移,因为bolt将随机分布在集群中。
分区/偏移可以持久化到zookeeper,但首先,如何从bolt中的元组获取它们?除了把它们从Kafka的喷口转给博尔特,还有别的办法吗?

guykilcj

guykilcj1#

kafka已经在zookeeper中跟踪了主题的偏移量,所以您不需要在bolt中实现这个逻辑。
Kafka喷口将emmit元组和拓扑结构将跟踪它。当元组被螺栓钉住时,它就穿过了。喷口将认为元组已交付。在emmiting元组后面,喷口将跟踪zookeeper中的当前偏移量,所以如果出现问题,您可以开始读取消息,而不是来自Begging。
上述拓扑将保证至少交付一次。使用三叉戟拓扑,您可以保证只交付一次。在这两种情况下,请看 topology.max.spout.pending 设置。正确设置它是至关重要的,因为您将使用批处理。

ocebsuys

ocebsuys2#

配置 KafkaSpoutorg.apache.storm.kafka.StringMessageAndMetadataScheme 将偏移量和分区添加到喷口发射值

相关问题