Kafka 为什么在发送消息失败时,producer.send方法中的callback没有被执行?

koaltpgm  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(194)

我现在给Kafka发消息,像这样:

try {
    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            System.out.println(metadata);
            System.out.println(e);
            if (e != null)
            {
                System.out.println(e);
                try
                {
                    throw(e);
                } catch (InvalidTopicException er) {
                    logger.error("Unrecoverable error: ", er);
                    System.exit(-1);
                } //...otherfatalexceptions...
                catch (UnknownServerException er) {
                    logger.error("Unrecoverable error: ", er);
                    System.exit(-1);
                }catch (Exception er) {
                    logger.error("Failed to send message to topic:" + topic, er);
                }
            }
        }
        });
}catch (Exception e) {
    logger.error("Failed to send message to topic:" + topic, e);
}

我正在使用producer.send,并且在发送队列中的下一个记录之前不等待记录元数据返回,因为它会花费很长时间。我使用onCallback方法来处理这些数据。
代码背后的想法是终止程序,以防我得到一个不可恢复的错误。
Producer是这样初始化的:

producer = new KafkaProducer<String, String>(properties);

具有以下特性

Properties mainKafkaProperties = new Properties();
mainKafkaProperties.put("bootstrap.servers", kafkaUrl);
mainKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("acks", "all");

当消息被正确发送时,System.out.println(元数据);行正确打印元数据,显示消息正在正确发送,但是当我删除主题以模拟InvalidTopicException而不是执行catch方法中的代码时,我只得到以下警告:[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 97 : {em_om_messages_local=UNKNOWN_TOPIC_OR_PARTITION}
使用的Kafka库是:org.apache.Kafka.clients我正在eclipse调试器上运行程序
无论主题有多少个分区,我都会得到相同的错误Java版本17.0.6+10 Spring未使用

x6yk4ghg

x6yk4ghg1#

检查javadoc

  • InvalidTopicException*

例如,主题名称太长,包含无效字符等。
如果您发送的是一个 valid 的主题名,则不会抛出该异常,而是会得到一个警告日志,这不是一个可以捕获的异常。
也没有理由throw(e)。使用instanceof检查回调中Exception的类型

相关问题