如何识别Kafka主题消费后未处理的消息

laawzig2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(511)

脚本:
stream create[streamname]——定义“kafka-zkconnect=10.10.10.1:2181——主题=| mycompositemodule”——部署
我们在分布式模式下运行这个流,redis是传输总线。据我所知,kafka source通过[streamname]-kafka offsets主题维护mycompositemodule(通过'module compose'进程创建的模块作为接收器)所使用的消息的偏移量。这是不可读的,我会很感激如果有一个方法来阅读这个主题的数据。
另外,当我从kafka源推送消息时,消息在redis传输中排队,然后模块从这个队列中获取它们。
如果kafka consumer模块开始从kafka redis队列消费1000条消息-复合模块在收到10条消息或随机处理10条消息后失败。那么如何识别剩余的990[1000(已消费)-10(已处理)=990]条未处理的消息。即使我们检查Kafka偏移量,它也会显示消耗的消息计数。示例:-kafka.offsets-在我们的过程中无法读取。
因此,当我们在springxd中使用redis时,所有未处理的消息都将在redis队列中。那么,有谁能帮我识别未处理的消息并将其重新发送到复合模块进行处理呢。
基本上,我正在寻找关于健壮交付的优雅解决方案的建议,在使用kafka源代码时在springxd流中添加故障处理能力。

5m1hhzi4

5m1hhzi41#

如果消息有效地从kafka消费并移动到总线,那么从偏移管理器的Angular 来看,它们将被确认为已消费。
您可以尝试为redis消息总线启用重试和死信,如下所述:http://docs.spring.io/spring-xd/docs/current/reference/html/#error-处理邮件传递失败。
干杯,马吕斯

相关问题