我有一个@streamlistener方法,它将在其中执行rest调用。当rest调用返回异常时,@streamlistener方法将抛出runtimeexception并执行重试@streamlistener方法在抛出runtimeexception时将无限次重试
spring云流重试配置:
spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000
springboot微服务依赖项版本:
Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0
3条答案
按热度按时间7ajki6be1#
使用retrytemplate或增加maxattempts属性具有重试应在
max.poll.interval.ms
,否则kafka代理会认为使用者已关闭,并将分区重新分配给另一个使用者(如果可用)。另一种方法是让监听器使用
consumer.seek
方法。wqlqzqxt2#
经过几次尝试和错误,我们发现kafka配置:max.poll.interval.ms默认为5分钟。由于我们的消费者重试机制,在最坏的情况下,我们的整个重试过程将需要15分钟。
因此,在第一条消息被消费5分钟后,kafka分区决定消费者没有提供任何响应,执行自动平衡并将相同的消息分配给另一个分区。
f4t66c6m3#
你当然可以增加尝试的次数(
maxAttempts
(财产)类似的东西Integer.MAX_VALUE
,也可以提供自己的示例RetryTemplate
可以按您的意愿配置的bean。在这里你可以得到更多的信息https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template