java—在kafka中,当消息被使用时,为什么不更新偏移量

shyt4zoc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(547)

我正在实现kafka消费者类来接收消息。我每次只想收到新的信息。因此,我设定 enable.auto.commit 是的。然而,抵消似乎根本没有改变。尽管主题、消费群体和划分一直是相同的。
这是我的消费代码:

consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

不管我运行它多少次,它总是显示偏移量为0。因此,它总是从一开始就接收所有消息。我错过了什么?
编辑:根据matthias的回答,我决定手动提交偏移量。然而 commitSync() 会被吊死。 commitAsync() 差不多吧。我稍后会解释“那种”。以下是代码的作用:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

这就是代码的行为方式。假设我有100条留言。现在生产者发送2条新消息。在消费者调查之前,它会显示当前的偏移位置为102,应该是100。因此,不会打印新消息。这几乎像是在生产者发送消息之后更新了偏移量。

zi8p0yeb

zi8p0yeb1#

自动提交仅在使用使用者组管理时有效,为此,您需要“订阅”主题,而不是手动“分配”分区。
比较 KafkaConsumer . 这是一本很长的读物,但需要理解如何正确使用消费者的微妙细节:https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
此外,如果启用了自动提交,它将在 poll (即打电话给 poll() 可能会将上一次调用返回的消息提交给 poll() )而不是在遍历returne消息时。这也意味着,您的提交将“跳转”向前,比如从提交的偏移量0到100(如果您通过轮询为单个分区接收了100条消息)。

相关问题