如何拥有sync producer批处理消息?

ycl3bljg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我们使用的是kafka0.8async生产者,但它正在删除消息(并且没有来自另一个线程的aysnc响应,或者我们可以继续使用async)。
我们已经设定了 batch.num.messages 而我们的消费者并没有改变。我读到了 batch.num.messages 只适用于异步生产者和不同步,所以我需要批自己。我们正在使用 compression.codec=snappy 以及我们自己的序列化程序类。
我的问题有两个:
我可以假设我可以使用我们自己的序列化程序类,然后自己发送消息吗?
我是否需要担心Kafka可能使用的任何特殊的快速选项/参数?

j0pj023g

j0pj023g1#

是的,因为 batch.num.messages 仅控制异步生产者的行为。这在相关参数指南中有明确说明:
使用异步模式时一批中要发送的消息数。生产者将等待此数量的消息准备发送或达到queue.buffer.max.ms。
为了对sync producer进行批处理,您必须发送消息列表:

public void trySend(List<M> messages) {
    List<KeyedMessage<String, M>> keyedMessages = Lists.newArrayListWithExpectedSize(messages.size());
    for (M m : messages) {
        keyedMessages.add(new KeyedMessage<String, M>(topic, m));
    }
    try {
        producer.send(keyedMessages);
    } catch (Exception ex) {
        log.error(ex)
    }
}

注意,我正在使用 kafka.javaapi.producer.Producer 在这里。
一次 send 执行,发送批处理。
我可以假设我可以使用我们自己的序列化程序类,然后自己发送消息吗?我是否需要担心Kafka可能使用的任何特殊的快速选项/参数?
压缩和序列化程序都是正交特性,它们不影响批处理,但实际应用于单个消息。
请注意,将会有api更改,并且将统一异步/同步api。

相关问题