我有一个使用dlq方法的项目,其中 @KafkaListener
,错误将被发送到具有该结构的主题 error-<topic>-<consumergroup>
. 当我们想要手动重试来自这个dlq的kafka消息时,我们将生成一个具有类似结构的主题 retry-<topic>-<consumergroup>
. 例如,如果我们有一个主题 foo
,由消费群体消费 bar
,我们将讨论以下主题: foo
, error-foo-bar
以及 retry-foo-bar
.
这样,我们可以为特定的使用者组重试消息。消息处理程序同时读取主主题和重试主题。
由于项目中有多个监听器都使用同一个容器,因此我们将主题(main、error和retry)放在应用程序属性中,并配置了 DeadLetterPublishingRecoverer
要查找相应的dlq(注意,这是kotlin):
val recoverer = DeadLetterPublishingRecoverer(template) { record, _ ->
val dlq = kafkaProperties.topics.values.firstOrNull { it.main == record.topic() || it.retry == record.topic() }!!.dlq
return@DeadLetterPublishingRecoverer TopicPartition(dlq, -1)
}
一切正常。不过,现在我想补充一点 @KafkaListener
和另一个听众在同一个主题上交谈。因此,我需要给这个听众一个单独的消费群体。
但是,如果我使用与上面相同的逻辑来查找相应的dlq主题,那么两个侦听器中的一个将使用另一个的dlq,因为它不在这里查看消费者组。
所以我的问题是:在 Spring Kafka,有没有办法找到哪个消费群体(又名 groupId
)发生错误了吗?我查看了文档和源代码,但自己找不到。感谢您的帮助。
1条答案
按热度按时间vlju58qv1#
这个
group.id
消费者可拨打KafkaUtils.getGroupId()
(存储在ThreadLocal
容器线程启动时)。