使用spring云流Kafka时如何启用deliveryAttemptHeader获取DefaultErrorHandler重试计数

rsaldnfx  于 2023-02-22  发布在  Spring
关注(0)|答案(1)|浏览(112)

我正在使用spring-cloud-stream-binder-kafka,并且已经使用DefaultErrorHandler实现了有状态重试,我发现通过启用容器属性的deliveryAttemptHeader,我可以从消息头访问重试计数或deliveryAttempt计数,但我无法启用它。
我尝试将该值设置为true,如下所示

@Bean
  public ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> containerCustomizer(
      ConcurrentKafkaListenerContainerFactory<String, Message> factory) {
    ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> custCustomizer = container -> {
      container.getContainerProperties().setDeliveryAttemptHeader(true);
    };
    factory.setContainerCustomizer(custCustomizer);
    return custCustomizer;
  }

在这种配置下,当我启动应用程序并调试KafkaMessageListenerContainer.java#L1078时,我仍然看到deliveryAttemptHeader被禁用,而且我创建的ContainerCustomizer示例也没有被调用。

hmmo2u0o

hmmo2u0o1#

spring-cloud-stream不使用 Boot 的容器工厂;请使用它的ListenerContainerCustomizer
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration

相关问题