我正在尝试了解如何Spring Boot KafkaTemplate与异步生产者和处理异常。我想处理所有类型的错误,包括网络错误。尝试与重试配置,但其重试次数超过我提供给
@Service
public class UserInfoService {
private static final Logger LOGGER = LoggerFactory.getLogger(UserInfoService.class);
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendUserInfo(UserInfo data) {
final ProducerRecord<String, UserInfo> record = new ProducerRecord<>("usr-test-data", "test-app", data);
try {
ListenableFuture<SendResult<String, UserInfo>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, UserInfo>>() {
@Override
public void onFailure(Throwable ex) {
handleFailure(ex);
}
@Override
public void onSuccess(SendResult<String, UserInfo> result) {
handleSuccess(result);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void handleSuccess(SendResult<String, UserInfo> result) {
LOGGER.info("Message sent successfully with offset: {}", result.getRecordMetadata().offset());
}
private void handleFailure(Throwable ex) {
LOGGER.info("Unable to send message- Error: {}", ex.getMessage());
}
}
尝试用configProps.put(ProducerConfig.RETRIES_CONFIG, "3");
限制重试次数,希望这最终会抛出异常,我可以捕获。但它仍然尝试了3次以上,似乎没有工作。以下是我的完整配置类:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_DOC, "true");
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CountingProducerInterceptor.class.getName());
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, "3");
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
想知道我将来能捕捉到什么。在父代码块中的OnFailure和try catch。
1条答案
按热度按时间p8h8hvxi1#
Future不会在try-catch的生命周期内完成。您需要在onFailure的主体内抛出异常。
根据我的经验,Kafka网络错误不容易被发现