如何在ASP.NET Core App中使用异步操作的Kafka消费者

wz1wpwve  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(114)

我使用Kafka消费者作为Asp.NET Core 6应用程序中的BackgroundService。最初,我在ExecuteAsync中没有Task.Run,但是consumer.Consume方法阻止了应用程序的启动。将Kafka循环 Package 到任务中解决了这个问题。但是在consumer.Consume方法之后需要调用一些异步方法。如何做到这一点,使没有阻塞?
我试过,例如,像这样:FooAsync().GetAwaiter().GetResult();但这不是推荐的方法。
我将感谢任何想法。

  1. public class KafkaConsumer : BackgroundService
  2. {
  3. private readonly ICommand _command;
  4. public KafkaConsumer(ICommand command) => _command = command;
  5. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  6. {
  7. return Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken);
  8. }
  9. private void StartConsumerLoop(CancellationToken stoppingToken)
  10. {
  11. using var consumer = new ConsumerBuilder<Ignore, string>(...).Build();
  12. try
  13. {
  14. consumer.Subscribe(_consumerOptions.Topics);
  15. while (!stoppingToken.IsCancellationRequested)
  16. {
  17. ConsumeResult<Ignore, string> result = consumer.Consume(stoppingToken);
  18. // How to call some async method:
  19. // await _command.FooAsync();
  20. }
  21. }
  22. catch (Exception ex) { }
  23. finally { consumer.Close(); }
  24. }
  25. }
5vf7fwbs

5vf7fwbs1#

使用.GetAwaiter().GetResult()可能会导致死锁。所以不建议使用它。相反,你可以使用numc/await不阻塞线程,并将等待操作完成。
下面是修改后的代码,你可以给予它试试:

  1. public class KafkaConsumer : BackgroundService
  2. {
  3. private readonly ICommand _command;
  4. public KafkaConsumer(ICommand command) => _command = command;
  5. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  6. {
  7. return StartConsumerLoop(stoppingToken);
  8. }
  9. private async Task StartConsumerLoop(CancellationToken stoppingToken)
  10. {
  11. using var consumer = new ConsumerBuilder<Ignore, string>(...).Build();
  12. try
  13. {
  14. consumer.Subscribe(_consumerOptions.Topics);
  15. while (!stoppingToken.IsCancellationRequested)
  16. {
  17. ConsumeResult<Ignore, string> result = consumer.Consume(stoppingToken);
  18. await _command.FooAsync();
  19. }
  20. }
  21. catch (Exception ex)
  22. {
  23. // Handle the exception appropriately
  24. }
  25. finally
  26. {
  27. consumer.Close();
  28. }
  29. }
  30. }

这个链接提到了类似的问题:https://github.com/confluentinc/confluent-kafka-dotnet/issues/487

展开查看全部

相关问题