在Windows服务中处理RabbitMQ事件的正确方法是什么?

qaxu7uf2  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(284)

我以前从未创建过Windows服务,现在被要求这样做。我需要创建一个服务来处理RabbitMQ使用者引发的事件。
在我看来,一旦挂接了事件处理程序,就需要一些东西来“保持服务的活动”;否则服务将结束,处理程序也将消失。我看到的代码示例似乎表明您将应用程序置于while(true)循环中--这感觉不对。
这是我目前掌握的情况

public class MyWorker : BackgroundService
{
   protected override async Task ExcecuteAsync(CancellationToken stoppingToken)
   {
       var _factory = new ConnectionFactory() {
          HostName = "localhost",
          UserName = "guest",
          Password = "guest"
       };

      var _connection = _factory.CreateConnection();
      var _channel = _connection.CreateModel();

      _factory.DispatchConsumersAsync = true;
      var _consumer = new AsyncEventingBasicConsumer(_channel);
      _consumer.Received += async (sender, ea) => {
          // Code to handle queue item here
          _channel.BasicAck(ea.DeliveryTag,true);
          await Task.Yield();
      };
      var tag = _consumer.BasicConsume("MyQueue",false,_consumer);

      while(!stoppingToken.IsCancellationRequested){};

      _consumer.BasicCancel(tag);
}

这是在服务中进行此操作的正确方式吗?while循环是否会消耗CPU周期?

8zzbczxx

8zzbczxx1#

是的,你的代码会运行得很好。我花了点时间扩展了一下-

namespace rabbitmq_backgroundservice;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class Worker : BackgroundService
{
    private const string _queueName = "MyQueue";
    private readonly TimeSpan _stoppingCheckInterval = TimeSpan.FromSeconds(5);
    private readonly ILogger<Worker> _logger;
    private readonly IConnection _connection;
    private readonly IModel _channel;
    private readonly EventingBasicConsumer _consumer;
    private readonly string _consumerTag;

    public Worker(ILogger<Worker> logger)
    {
        _logger = logger;

        var factory = new ConnectionFactory {
            HostName = "localhost",
            UserName = "guest",
            Password = "guest"
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.QueueDeclare(_queueName, true, true);

        _consumer = new EventingBasicConsumer(_channel);
        _consumer.Received += ReceivedHandler;

        _consumerTag = _channel.BasicConsume(_queueName, false, _consumer);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using (_connection)
        using (_channel)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
                await Task.Delay(_stoppingCheckInterval, stoppingToken);
            }

            _logger.LogInformation("Worker STOPPING at: {time}", DateTimeOffset.Now);
            _channel.BasicCancel(_consumerTag);
        }
    }

    private void ReceivedHandler(object? sender, BasicDeliverEventArgs ea)
    {
        var tag = ea.DeliveryTag;
        _logger.LogInformation("Received message. tag: {tag}  at: {time}", tag, DateTimeOffset.Now);
        _channel.BasicAck(tag, false);
    }
}

存储库在此处:
https://github.com/lukebakken/rabbitmq-backgroundservice
我引用了以下内容:
https://learn.microsoft.com/en-us/dotnet/core/extensions/windows-service

**注意:**RabbitMQ团队监控rabbitmq-users邮件列表,仅在某些时候回答StackOverflow上的问题。

相关问题