我想使用合流dotnet客户端在应用程序启动中处理一个主题。假设以下示例:
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
当Kafka中没有新消息时,c.consume将被阻止。因为我想用它来启动应用程序(比如缓存预热),所以当我发现没有新消息时,我想继续我的代码。
我知道有一个超负荷设置超时像 c.Consume(timeout)
但是这种方法的问题是,如果您的主题中有一条消息,并且读取该消息的持续时间超过了超时时间,则会收到不需要的空输出。
2条答案
按热度按时间44u64gxh1#
消费者不应该知道生产者。
现在,如果您想知道您从开始消费的那一刻起就已经阅读了主题中的所有内容,您可以:
在开始消耗之前加载最新的偏移量。
然后开始消费信息。
如果消息的偏移量与之前加载的最新偏移量相同,请停止使用。
我不是一个
C#
但是从我在dotnet confluent文档中读到的内容来看QueryWatermarkOffsets
在消费者得到最老和最新的抵消。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/confluent.kafka.consumer.html#confluent_kafka_consumer_querywatermarkoffsets_confluent_kafka_topicpartition_然后,在
Message
你有一个班级吗Offset
存取器。所以整个事情不应该太难实现。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/confluent.kafka.message.html#confluent_kafka_message_offsetbxfogqkk2#
你可以用
OnPartitionEOF
事件,指示已到达分区结尾。