springbootmicroservice@streamlistener在抛出runtimeexception时重试无限时间

zzzyeukh  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(731)

我有一个@streamlistener方法,它将在其中执行rest调用。当rest调用返回异常时,@streamlistener方法将抛出runtimeexception并执行重试@streamlistener方法在抛出runtimeexception时将无限次重试
spring云流重试配置:

spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000

springboot微服务依赖项版本:

Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0
7ajki6be

7ajki6be1#

使用retrytemplate或增加maxattempts属性具有重试应在 max.poll.interval.ms ,否则kafka代理会认为使用者已关闭,并将分区重新分配给另一个使用者(如果可用)。
另一种方法是让监听器使用 consumer.seek 方法。

@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.OFFSET) String offset) {
   try {
       //do the logic (example: REST call)
   } catch (Exception e) { // Catch only specific exceptions that can be retried
        consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
   }
}
wqlqzqxt

wqlqzqxt2#

经过几次尝试和错误,我们发现kafka配置:max.poll.interval.ms默认为5分钟。由于我们的消费者重试机制,在最坏的情况下,我们的整个重试过程将需要15分钟。
因此,在第一条消息被消费5分钟后,kafka分区决定消费者没有提供任何响应,执行自动平衡并将相同的消息分配给另一个分区。

f4t66c6m

f4t66c6m3#

你当然可以增加尝试的次数( maxAttempts (财产)类似的东西 Integer.MAX_VALUE ,也可以提供自己的示例 RetryTemplate 可以按您的意愿配置的bean。在这里你可以得到更多的信息https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template

相关问题