没有消息时从kafka消费者处返回

nhjlsmyf  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(552)

我想使用合流dotnet客户端在应用程序启动中处理一个主题。假设以下示例:

while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

当Kafka中没有新消息时,c.consume将被阻止。因为我想用它来启动应用程序(比如缓存预热),所以当我发现没有新消息时,我想继续我的代码。
我知道有一个超负荷设置超时像 c.Consume(timeout) 但是这种方法的问题是,如果您的主题中有一条消息,并且读取该消息的持续时间超过了超时时间,则会收到不需要的空输出。

44u64gxh

44u64gxh1#

消费者不应该知道生产者。
现在,如果您想知道您从开始消费的那一刻起就已经阅读了主题中的所有内容,您可以:
在开始消耗之前加载最新的偏移量。
然后开始消费信息。
如果消息的偏移量与之前加载的最新偏移量相同,请停止使用。
我不是一个 C# 但是从我在dotnet confluent文档中读到的内容来看 QueryWatermarkOffsets 在消费者得到最老和最新的抵消。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/confluent.kafka.consumer.html#confluent_kafka_consumer_querywatermarkoffsets_confluent_kafka_topicpartition_
然后,在 Message 你有一个班级吗 Offset 存取器。所以整个事情不应该太难实现。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/confluent.kafka.message.html#confluent_kafka_message_offset

bxfogqkk

bxfogqkk2#

你可以用 OnPartitionEOF 事件,指示已到达分区结尾。

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}

相关问题