Spring Boot RetrayableTopic -让Kafka选择重试/dlt分区

ktecyv1j  于 2023-10-16  发布在  Spring
关注(0)|答案(1)|浏览(113)

默认情况下,Spring Kafka的可重试主题将来自主主题的消息发送到与原始ConsumerRecord相同分区上的重试/DLT。
然而,在我的例子中,我在主主题、重试主题和DLT之间配置了不同数量的分区。我知道当一个分区不存在时,目标解析器会让Kafka选择它。
相反,我希望Kafka总是选择分区。
我已经看到了这个答案(https://stackoverflow.com/a/70836041/7612074),这似乎正是我所需要的。我使用spring-kafka 2.7.12,配置如下

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .maxAttempts(3)
                .useSingleTopicForFixedDelays()
                .fixedBackOff(3000)
                .dltHandlerMethod(DltHandler.class, "handle")
                .retryOn(List.of(ProcessRetryableException.class, RuntimeException.class))
                .retryTopicSuffix(".RETRY")
                .dltSuffix(".DLT")
                .create(template);
    }
}

正如前面的回答中所建议的,我添加了下面的bean声明

@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory(DestinationTopicResolver resolver) {
    return new DeadLetterPublishingRecovererFactory(resolver) {
        @Override
        protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
            return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
        }
    };
}

不幸的是,应用程序不会启动,因为它在spring上下文中找不到任何DestinationTopicResolverbean。
我无法手动创建它,因为自动配置将两个Map注入其中,包含有关主题的基本信息(用于确定重试流中的下一个目标主题),我将丢失所有这些信息。
它似乎不是一个spring bean,而是DestinationTopicResolver中引用的一个简单bean
我错过了什么?Thanks in advance

xzlaal3s

xzlaal3s1#

正如加里拉塞尔指出的那样,这是该版本的局限性
解决了升级到2.9.12并将此添加到我的配置类

@Configuration
@EnableKafka
public class KafkaConfig extends RetryTopicConfigurationSupport {

    @Bean
    public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .maxAttempts(3)
                .useSingleTopicForFixedDelays()
                .fixedBackOff(3000)
                .dltHandlerMethod("dltProcessor", "process")
                .retryOn(List.of(ProcessRetryableException.class, RuntimeException.class))
                .retryTopicSuffix(".RETRY")
                .dltSuffix(".DLT")
                .create(template);
    }

    @Override
    public RetryTopicComponentFactory createComponentFactory() {

        return new RetryTopicComponentFactory() {

            @Override
            public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {

                return new DeadLetterPublishingRecovererFactory(destinationTopicResolver) {

                    @Override
                    public DeadLetterPublishingRecoverer create() {
                        super.neverLogListenerException();
                        return super.create();
                    }

                    @Override
                    protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
                        return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
                    }
                };
            }
        };
    }
}

相关问题