创建kafka producer并调用send()、flush()和close()方法的正确顺序是什么?

w8f9ii69  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(1117)

我有下面提到的不同的模型,我创建了kafka producer并调用了不同的方法,但不确定什么是正确的编程方法,这样流就不会中断,性能就不会受到影响。请帮忙。
模型1:

for(int i=1; i < 100; i++){
    Producer<String, String> producer = new KafkaProducer<String, String>(props);

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName, 
        String.valueOf(i)
    );

    producer.send(data);
    producer.close();
}

模型2:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++) {

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName, 
        String.valueOf(i)
    );

    producer.send(data);
    producer.close();
}

模型3:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName, 
        String.valueOf(i)
    );

    producer.send(data);
}
producer.close();

模型4:

for(int i=1; i < 100; i++){
    Producer<String, String> producer = new KafkaProducer<String, String>(props);

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName,
        String.valueOf(i)
    );

    producer.send(data);
    producer.flush();
    producer.close();
}

模型5:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){
    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName, 
        String.valueOf(i)
    );

    producer.send(data);
    producer.flush();
    producer.close();
}

模型6:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i=1; i < 100; i++){

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
        topicName, 
        String.valueOf(i)
    );

    producer.send(data);
    producer.flush();
}
producer.close();
j9per5c4

j9per5c41#

您可以使用以下示例:

Properties props = new Properties();
props.put("batch.size", 16384);
props.put("buffer.memory", 33554432);
Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

众所周知,send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
我们可以将buffer.memory或batch.size设置为自动刷新

nkhmeac6

nkhmeac62#

模型3似乎应该是正确的以下变化

Producer<String, String> producer = new KafkaProducer<String, String>(props);
    try {
        for (int i = 1; i < 100; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(topicName, String.valueOf(i));
            producer.send(data);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        producer.close();
    }

相关问题