我有下面提到的不同的模型,我创建了kafka producer并调用了不同的方法,但不确定什么是正确的编程方法,这样流就不会中断,性能就不会受到影响。请帮忙。
模型1:
for(int i=1; i < 100; i++){
Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
producer.close();
}
模型2:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
producer.close();
}
模型3:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
}
producer.close();
模型4:
for(int i=1; i < 100; i++){
Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
producer.flush();
producer.close();
}
模型5:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
producer.flush();
producer.close();
}
模型6:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topicName,
String.valueOf(i)
);
producer.send(data);
producer.flush();
}
producer.close();
2条答案
按热度按时间j9per5c41#
您可以使用以下示例:
众所周知,send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
我们可以将buffer.memory或batch.size设置为自动刷新
nkhmeac62#
模型3似乎应该是正确的以下变化