我正在使用spring-cloud-stream-binder-kafka
,并且已经使用DefaultErrorHandler
实现了有状态重试,我发现通过启用容器属性的deliveryAttemptHeader
,我可以从消息头访问重试计数或deliveryAttempt
计数,但我无法启用它。
我尝试将该值设置为true
,如下所示
@Bean
public ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> containerCustomizer(
ConcurrentKafkaListenerContainerFactory<String, Message> factory) {
ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> custCustomizer = container -> {
container.getContainerProperties().setDeliveryAttemptHeader(true);
};
factory.setContainerCustomizer(custCustomizer);
return custCustomizer;
}
在这种配置下,当我启动应用程序并调试KafkaMessageListenerContainer.java#L1078时,我仍然看到deliveryAttemptHeader
被禁用,而且我创建的ContainerCustomizer
示例也没有被调用。
1条答案
按热度按时间hmmo2u0o1#
spring-cloud-stream
不使用 Boot 的容器工厂;请使用它的ListenerContainerCustomizer
。https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration