这是我的配置:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
和
try {
producer.send(record);
} catch (Throwable ex) {
log.error(ex, "exception.");
}
但我们发现信息不见了。
如果网络抖动会导致这种情况?
我们需要回电吗?
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {}
})
1条答案
按热度按时间nbysray51#
Kafka制作者基本上有三种不同的模式向Kafka生成消息:
开火然后忘记
同步
异步
只打电话的时候
producer.send(record)
你在使用火和遗忘模式。配置acks
只有在所有复制都已确认该消息时,才能确保该消息被视为对kafka的成功写入。但是,您的制作人不会等待答复。正如您所提到的,您可以使用回调来理解来自代理的回复。这将是非同步模式。但也不要忘记
flush()
缓冲记录。另一种选择是应用同步方法,这可以通过阻塞方法实现
get
等待来自代理的ack响应。只需将唯一一行代码更改为编辑:将重试次数增加到大于
1
. 制作人Callback
可以返回可以通过多次重试解决的可检索异常。为了理解所有回调异常,您可以看看关于生产者回调的另一个so问题。