kafka消费者民意调查最新消息

5lwkijsr  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(414)

我用Kafka编程Kafka消费者。我希望当我的消费者开始时,它只轮询新到达的消息(即消息在消费者开始时间之后到达),而不是消费者偏移量处的消息。

// Construct the configuration
Configuration config = {
    { "metadata.broker.list", "127.0.0.1:9092" },
    { "group.id", "1" },
    // Disable auto commit
    { "enable.auto.commit", false },
    // Set offest to latest to receive latest message when consumer start working
    { "auto.offset.reset", "latest" },
};

// Create the consumer
Consumer consumer(config);

consumer.set_assignment_callback([](TopicPartitionList& partitions) {
    cout << "Got assigned: " << partitions << endl;
});

// Print the revoked partitions on revocation
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
    cout << "Got revoked: " << partitions << endl;
});

string topic_name = "test_topic";
// Subscribe to the topic
consumer.subscribe({ topic_name });

据我所知,配置 auto.offset.reset 设置为 latest 仅当使用者在开始读取分配的分区时没有提交的偏移量时才起作用。所以我想我应该打电话给你 consumer.poll() 没有承诺,但感觉不对,我担心我会打破一些沿途。有人能告诉我怎样才能达到我的要求吗?

jmo0nnb3

jmo0nnb31#

如果“enable.auto.commit”设置为false,并且您没有在代码中提交偏移量,那么每次使用者启动时,如果auto.offset.reset=earliest,它都会从主题中的第一条消息开始消耗消息。
auto.offset.reset的默认值是“latest”,这意味着缺少有效的偏移量,使用者将开始读取最新记录(在使用者开始运行后写入的记录)。
根据您上面的问题,auto.offset.reset=latest应该可以解决您的问题。
但是如果你需要一个基于实时的补偿,你需要在你的消费者中应用时间过滤器。这意味着从主题获取消息,将偏移时间与消息负载中的某个自定义字段或消息的meta属性(consumerrecord.timestamp())进行比较,并相应地进行进一步处理。
也可以参考这个答案从Kafka检索基于时间戳的数据

相关问题