我现在给Kafka发消息,像这样:
try {
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
System.out.println(metadata);
System.out.println(e);
if (e != null)
{
System.out.println(e);
try
{
throw(e);
} catch (InvalidTopicException er) {
logger.error("Unrecoverable error: ", er);
System.exit(-1);
} //...otherfatalexceptions...
catch (UnknownServerException er) {
logger.error("Unrecoverable error: ", er);
System.exit(-1);
}catch (Exception er) {
logger.error("Failed to send message to topic:" + topic, er);
}
}
}
});
}catch (Exception e) {
logger.error("Failed to send message to topic:" + topic, e);
}
我正在使用producer.send,并且在发送队列中的下一个记录之前不等待记录元数据返回,因为它会花费很长时间。我使用onCallback
方法来处理这些数据。
代码背后的想法是终止程序,以防我得到一个不可恢复的错误。
Producer是这样初始化的:
producer = new KafkaProducer<String, String>(properties);
具有以下特性
Properties mainKafkaProperties = new Properties();
mainKafkaProperties.put("bootstrap.servers", kafkaUrl);
mainKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("acks", "all");
当消息被正确发送时,System.out.println(元数据);行正确打印元数据,显示消息正在正确发送,但是当我删除主题以模拟InvalidTopicException
而不是执行catch方法中的代码时,我只得到以下警告:[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 97 : {em_om_messages_local=UNKNOWN_TOPIC_OR_PARTITION}
使用的Kafka库是:org.apache.Kafka.clients我正在eclipse调试器上运行程序
无论主题有多少个分区,我都会得到相同的错误Java版本17.0.6+10 Spring未使用
1条答案
按热度按时间x6yk4ghg1#
检查javadoc
例如,主题名称太长,包含无效字符等。
如果您发送的是一个 valid 的主题名,则不会抛出该异常,而是会得到一个警告日志,这不是一个可以捕获的异常。
也没有理由
throw(e)
。使用instanceof
检查回调中Exception的类型