在我的.net C#项目中(使用Confluent Kafka库)目前我使用以下代码从Kafka主题中读取最新消息。但使用此代码我可以从定义的分区中读取最新消息。但Kafka服务器每次都将我的主题的值写入不同的分区(我的Kafka主题是为分区0、1、2配置的)。所以分区中最后一条(最新的)消息并不总是从数据源端发送给Kafka的最新消息。
我如何使我的代码适应三个分区?Kafka Confluent中是否有简单的函数?或者我必须每次从所有分区读取带有Offset.End的消息,检查它们的时间戳,并确定哪个是最新的?
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("My_Topic");
while (var_true)
{
TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
consumer.Assign(tps);
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
// additional code to send the message value to an application
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
字符串
1条答案
按热度按时间carvr3hs1#
当您禁用消费者组提交并设置AutoOffsetReset=latest时,对于所有分区,它将始终从主题的末尾开始阅读。
我是否必须每次从所有分区读取带有Offset.End的消息,检查它们的时间戳,并确定哪个是最新的?
是的。你可以使用Offset.End,或者在任何时候将消费者搜索到主题的末尾,或者计算结束偏移量并减去1,然后在那里搜索。
你只需要一个TopicPartition对象的循环来分配每个分区。然而,这并不像使用订阅API的消费者组那样并行化。
我看到消息有时到达P-0,有时到达P-1,有时到达P-3。我不知道Kafka是如何决定何时将传入消息写入哪个分区的
Kafka文档解释了生产者如何基于哈希算法对记录键进行分区,或者如何对空键进行循环