java 如何处理Kafka的生产者例外

hjzp0vay  于 2023-01-01  发布在  Java
关注(0)|答案(1)|浏览(114)

我正在尝试了解如何Spring Boot KafkaTemplate与异步生产者和处理异常。我想处理所有类型的错误,包括网络错误。尝试与重试配置,但其重试次数超过我提供给

@Service
public class UserInfoService {
     private static final Logger LOGGER = LoggerFactory.getLogger(UserInfoService.class);

     @Autowired
     private KafkaTemplate kafkaTemplate;

     public void sendUserInfo(UserInfo data) {
         final ProducerRecord<String, UserInfo> record = new ProducerRecord<>("usr-test-data", "test-app", data);
         
         try {
             ListenableFuture<SendResult<String, UserInfo>> future = kafkaTemplate.send(record);
             future.addCallback(new ListenableFutureCallback<SendResult<String, UserInfo>>() {
                 @Override
                 public void onFailure(Throwable ex) {
                     handleFailure(ex);
                 }

                 @Override
                 public void onSuccess(SendResult<String, UserInfo> result) {
                     handleSuccess(result);
                 }
             });
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
         
     }

     private void handleSuccess(SendResult<String, UserInfo> result) {
         LOGGER.info("Message sent successfully with offset: {}", result.getRecordMetadata().offset());
     }
     private void handleFailure(Throwable ex) {
         LOGGER.info("Unable to send message- Error: {}",  ex.getMessage());
     }
 }

尝试用configProps.put(ProducerConfig.RETRIES_CONFIG, "3");限制重试次数,希望这最终会抛出异常,我可以捕获。但它仍然尝试了3次以上,似乎没有工作。以下是我的完整配置类:

@Configuration
public class KafkaProducerConfig {
     @Value(value = "${spring.kafka.bootstrap-servers}")
     private String bootstrapAddress;

     @Bean
     public ProducerFactory<String, String> producerFactory() {
         Map<String, Object> configProps = new HashMap<>();
         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
         configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_DOC, "true");
         configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CountingProducerInterceptor.class.getName());
         configProps.put(ProducerConfig.ACKS_CONFIG, "all");
         configProps.put(ProducerConfig.RETRIES_CONFIG, "3");
         configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
         return new DefaultKafkaProducerFactory<>(configProps);
     }

     @Bean
     public KafkaTemplate<String, String> kafkaTemplate() {
         return new KafkaTemplate<>(producerFactory());
     }
 }

想知道我将来能捕捉到什么。在父代码块中的OnFailure和try catch。

p8h8hvxi

p8h8hvxi1#

Future不会在try-catch的生命周期内完成。您需要在onFailure的主体内抛出异常。
根据我的经验,Kafka网络错误不容易被发现

相关问题