在融合的kafka dotnet中丢失的消息

iklwldmw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(458)

在我最近的c#项目中,我正在使用confluent kafka包。我用以下方式创建了一个制作人:

prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};

foreach(msg in msglist){
    using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
        producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
    }
}

但问题是,我的一些信息没有传达给消费者。他们在某处迷路了。但是,如果我对生产者使用wait,那么所有消息都会被传递。如何在没有等待的情况下传递我所有的信息(我有一个单独的分区)

vtwuwzda

vtwuwzda1#

首先,你应该只使用一个 Producer 把你的 msgList ,因为创建新的 Producer 因为每封信都很贵。
你能做的就是 Produce() 方法 Flush() . 与 Produce() 您将异步发送消息,而无需等待响应。然后打电话给 Flush() 将阻止,直到所有飞行中的消息都被传递。

var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();

foreach (var msg in msglist)
{
    producer.Produce(topic, new Message<Null, string> { Value = msg });
}

producer.Flush();

没有 await 或者 Flush() 您可能会丢失消息,因为您的生产者可能在传递所有消息之前被释放。

相关问题