我怎样才能从一个有多个分区的Kafka Topic(在我的例子中是三个分区)中读取最新(最后)的消息?

3zwtqj6y  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(117)

在我的.net C#项目中(使用Confluent Kafka库)目前我使用以下代码从Kafka主题中读取最新消息。但使用此代码我可以从定义的分区中读取最新消息。但Kafka服务器每次都将我的主题的值写入不同的分区(我的Kafka主题是为分区0、1、2配置的)。所以分区中最后一条(最新的)消息并不总是从数据源端发送给Kafka的最新消息。
我如何使我的代码适应三个分区?Kafka Confluent中是否有简单的函数?或者我必须每次从所有分区读取带有Offset.End的消息,检查它们的时间戳,并确定哪个是最新的?

CancellationTokenSource source = new CancellationTokenSource();
        CancellationToken cancellationToken = source.Token;
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("My_Topic");               

            while (var_true)
            {
                TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
                consumer.Assign(tps);
                var consumeResult = consumer.Consume(cancellationToken);                      
                
                Kafka_message_total = consumeResult.Message.Value;

                // additional code to send the message value to an application

                System.Threading.Thread.Sleep(2000);

            }

            consumer.Close();
        }

字符串

carvr3hs

carvr3hs1#

当您禁用消费者组提交并设置AutoOffsetReset=latest时,对于所有分区,它将始终从主题的末尾开始阅读。
我是否必须每次从所有分区读取带有Offset.End的消息,检查它们的时间戳,并确定哪个是最新的?
是的。你可以使用Offset.End,或者在任何时候将消费者搜索到主题的末尾,或者计算结束偏移量并减去1,然后在那里搜索。
你只需要一个TopicPartition对象的循环来分配每个分区。然而,这并不像使用订阅API的消费者组那样并行化。
我看到消息有时到达P-0,有时到达P-1,有时到达P-3。我不知道Kafka是如何决定何时将传入消息写入哪个分区的
Kafka文档解释了生产者如何基于哈希算法对记录键进行分区,或者如何对空键进行循环

相关问题