如何在ASP.NET Core App中使用异步操作的Kafka消费者

wz1wpwve  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(112)

我使用Kafka消费者作为Asp.NET Core 6应用程序中的BackgroundService。最初,我在ExecuteAsync中没有Task.Run,但是consumer.Consume方法阻止了应用程序的启动。将Kafka循环 Package 到任务中解决了这个问题。但是在consumer.Consume方法之后需要调用一些异步方法。如何做到这一点,使没有阻塞?
我试过,例如,像这样:FooAsync().GetAwaiter().GetResult();但这不是推荐的方法。
我将感谢任何想法。

public class KafkaConsumer : BackgroundService
{
    private readonly ICommand _command;

    public KafkaConsumer(ICommand command) => _command = command;

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken);
    }

    private void StartConsumerLoop(CancellationToken stoppingToken)
    {
        using var consumer = new ConsumerBuilder<Ignore, string>(...).Build();
        try
        {
            consumer.Subscribe(_consumerOptions.Topics);
            while (!stoppingToken.IsCancellationRequested)
            {
                ConsumeResult<Ignore, string> result = consumer.Consume(stoppingToken);

                // How to call some async method:
                // await _command.FooAsync();
            }
        }
        catch (Exception ex) { }
        finally { consumer.Close(); }
    }
}
5vf7fwbs

5vf7fwbs1#

使用.GetAwaiter().GetResult()可能会导致死锁。所以不建议使用它。相反,你可以使用numc/await不阻塞线程,并将等待操作完成。
下面是修改后的代码,你可以给予它试试:

public class KafkaConsumer : BackgroundService
{
    private readonly ICommand _command;

    public KafkaConsumer(ICommand command) => _command = command;

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return StartConsumerLoop(stoppingToken);
    }

    private async Task StartConsumerLoop(CancellationToken stoppingToken)
    {
        using var consumer = new ConsumerBuilder<Ignore, string>(...).Build();
        try
        {
            consumer.Subscribe(_consumerOptions.Topics);
            while (!stoppingToken.IsCancellationRequested)
            {
                ConsumeResult<Ignore, string> result = consumer.Consume(stoppingToken);

                await _command.FooAsync();
            }
        }
        catch (Exception ex) 
        { 
            // Handle the exception appropriately
        }
        finally 
        {
            consumer.Close();
        }
    }
}

这个链接提到了类似的问题:https://github.com/confluentinc/confluent-kafka-dotnet/issues/487

相关问题