默认情况下,Spring Kafka的可重试主题将来自主主题的消息发送到与原始ConsumerRecord相同分区上的重试/DLT。
然而,在我的例子中,我在主主题、重试主题和DLT之间配置了不同数量的分区。我知道当一个分区不存在时,目标解析器会让Kafka选择它。
相反,我希望Kafka总是选择分区。
我已经看到了这个答案(https://stackoverflow.com/a/70836041/7612074),这似乎正是我所需要的。我使用spring-kafka 2.7.12,配置如下
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(3)
.useSingleTopicForFixedDelays()
.fixedBackOff(3000)
.dltHandlerMethod(DltHandler.class, "handle")
.retryOn(List.of(ProcessRetryableException.class, RuntimeException.class))
.retryTopicSuffix(".RETRY")
.dltSuffix(".DLT")
.create(template);
}
}
正如前面的回答中所建议的,我添加了下面的bean声明
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory(DestinationTopicResolver resolver) {
return new DeadLetterPublishingRecovererFactory(resolver) {
@Override
protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
}
};
}
不幸的是,应用程序不会启动,因为它在spring上下文中找不到任何DestinationTopicResolverbean。
我无法手动创建它,因为自动配置将两个Map注入其中,包含有关主题的基本信息(用于确定重试流中的下一个目标主题),我将丢失所有这些信息。
它似乎不是一个spring bean,而是DestinationTopicResolver中引用的一个简单bean
我错过了什么?Thanks in advance
1条答案
按热度按时间xzlaal3s1#
正如加里拉塞尔指出的那样,这是该版本的局限性
解决了升级到2.9.12并将此添加到我的配置类