为什么Kafka消费者没有听到第一条信息?

yqyhoc1h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(291)

在我的测试程序中,我启动listener。然后循环发送消息。如果我发送一条消息,它就不会列出该消息。如果我发两条信息,它只听一条。如果我发三条,它只听两条。为什么?
制作人

KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

if (log.isDebugEnabled()) {
    log.debug("producing messages to topic : " + topic + "file : " + payload.get("name"));
}

for (int i = 0; i < 3; i++) {
    producer.send(message);
    System.out.println("producing ..");
}

消费者

public void run() {

    try {
        ConsumerIterator<byte[], byte[]> itr = m_stream.iterator();
        log.info("Kafka listener is ready to listen..");
        System.out.println("listens....");

        while (itr.hasNext()) {
            byte[] data = itr.next().message();
            System.out.println("Message received : " + data);
        }
    }
}

消费者财产

enable.auto.commit=true

auto.commit.interval.ms=101

session.timeout.ms=7000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

zookeeper.connect=zk1.xx\:2181

heartbeat.interval.ms=1000

auto.offset.reset=smallest

serializer.class=kafka.serializer.DefaultEncoder

bootstrap.servers=kk1.xx\:9092

group.id=test

consumer.timeout.ms=-1

fetch.min.bytes=1

receive.buffer.bytes=262144
toiithl6

toiithl61#

我通过在producer中设置以下属性来修复此问题。

request.required.acks=1

相关问题