kafka producer config立即发送消息

fykwrbwg  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(520)

我使用的是kafka producer 0.8.2,我尝试向主题发送一条消息,以立即发送消息的方式。我有一个控制台消费者来观察消息是否到达。我注意到消息不会立即发送,除非在发送后立即运行producer.close(),这不是我想要做的。
正确的生产者配置设置是什么?我正在使用以下内容(我知道它看起来像是一堆不同的配置/版本,但我在文档中找不到像我期望的那样工作的东西):

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersStr);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
ndasle7k

ndasle7k1#

老职位,但我已经挣扎了很多,以错过一个职位在这里。
我偶然发现同样的行为试图运行Kafka的例子和这个 .get() 是唯一能给Kafka传递信息的东西。的javadoc KafkaProducer.send(…) 说明此方法是异步的。在我的测试代码中,消息因此被发送到kafka,而我的代码继续运行,实际上刚刚到达运行的末尾,并在消息实际发送到内部之前终止 Future .
所以这个 .get() 就在街上几个街区 Future 直到它实现。这实际上消除了 Future . 一个更干净的方法可以是等一等 Thread.sleep(…) 就在那之后 .send(…) (取决于您的用例)。

kulphzqa

kulphzqa2#

我找到了一个似乎合理的解决方案,包括在producer的send()命令返回的future上运行get()。我将send命令更改为:

producer.send(record);

以下内容:

producer.send(record).get();

如果这种方法有什么问题的话,很高兴听到更有经验的Kafka用户的意见?另外,我还想了解是否有一个用于生产者实现相同功能的配置设置(也就是说,在不运行get()的情况下立即发送一条消息)。

相关问题