我使用Kafka消费者作为Asp.NET Core 6应用程序中的BackgroundService
。最初,我在ExecuteAsync
中没有Task.Run
,但是consumer.Consume
方法阻止了应用程序的启动。将Kafka循环 Package 到任务中解决了这个问题。但是在consumer.Consume
方法之后需要调用一些异步方法。如何做到这一点,使没有阻塞?
我试过,例如,像这样:FooAsync().GetAwaiter().GetResult();
但这不是推荐的方法。
我将感谢任何想法。
public class KafkaConsumer : BackgroundService
{
private readonly ICommand _command;
public KafkaConsumer(ICommand command) => _command = command;
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken);
}
private void StartConsumerLoop(CancellationToken stoppingToken)
{
using var consumer = new ConsumerBuilder<Ignore, string>(...).Build();
try
{
consumer.Subscribe(_consumerOptions.Topics);
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<Ignore, string> result = consumer.Consume(stoppingToken);
// How to call some async method:
// await _command.FooAsync();
}
}
catch (Exception ex) { }
finally { consumer.Close(); }
}
}
1条答案
按热度按时间5vf7fwbs1#
使用.GetAwaiter().GetResult()可能会导致死锁。所以不建议使用它。相反,你可以使用numc/await不阻塞线程,并将等待操作完成。
下面是修改后的代码,你可以给予它试试:
这个链接提到了类似的问题:https://github.com/confluentinc/confluent-kafka-dotnet/issues/487