使用下面这样的生产者配置,我将创建一个在整个应用程序中使用的单例生产者:
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.consul1:9092,kafka.consul2:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
我已连接到k8s托管的kafka群集。经纪人的 advertised.listeners
配置为返回ip地址而不是主机名。虽然正常情况下一切正常工作,但问题发生在Kafka重启时,有时ip地址会发生变化。由于生产者只知道旧的ip,它一直试图连接到该主机发送消息,没有消息通过。
我注意到 org.apache.kafka.common.errors.TimeoutException
发送失败时引发异常。当前消息是异步发送的:
producer.send(data,
(RecordMetadata recordMetadata, Exception e) -> {
if (e != null) {
LOGGER.error("Exception while sending message to kafka", e);
}
});
现在应该如何处理timeoutexception?由于生产者是在整个应用程序中共享的,因此在回调中关闭和重新创建听起来并不正确。
1条答案
按热度按时间f45qwnt81#
根据回调接口上的javadocs
TimeoutException
是一个可重试的异常,可以通过增加retries
制作人的名字。在Kafka文档中,您可以找到
retries
配置:重试次数(默认值为0):设置一个大于零的值将导致客户端重新发送其发送失败且可能出现暂时性错误的任何记录。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批发送到单个分区,并且第一个失败并重试,但第二个成功,则第二个批中的记录可能会首先出现。