RabbitMQ:快速处理耗时的事件

jdgnovmf  于 2023-04-30  发布在  RabbitMQ
关注(0)|答案(1)|浏览(198)

我正在试用RabbitMQ,希望有人能给予我一些提示。
我尝试做以下事情:使用一个消费者尽可能快地处理耗时的事件。
测试:5个事件,每个事件需要2秒来处理。
第一次尝试:来自RabbitMQ(https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html)的'Hello world'示例。

static void Main(string[] args)
{
    // Sender

    var factory = new ConnectionFactory { HostName = "localhost"};
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

    const string message = "Hello World!";

    for (int i = 1; i <= 5; i++)
    {
        var body = Encoding.UTF8.GetBytes($"{message} {i}");
        channel.BasicPublish(exchange: string.Empty,
                                routingKey: "hello",
                                basicProperties: null,
                                body: body);

        Console.WriteLine($"Sent {message} {i}");
    }

    // Consumer

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Thread.Sleep(2000);
        Console.WriteLine($"Received {message}");
    };

    channel.BasicConsume(queue: "hello",
                            autoAck: true,
                            consumer: consumer);

    Console.WriteLine("Press [enter] to exit.");
    Console.ReadLine();
}

结论:太慢,因为事件是同步处理的。
第二次尝试:使用AsyncEventingBasicConsumer进行异步处理。
代码变更:

  • DispatchConsumersAsync-属性在ConnectionFactory中设置
  • 使用AsyncEventingBasicConsumer
  • 消费者。接收为异步
static void Main(string[] args)
{
    // Sender

    var factory = new ConnectionFactory { HostName = "localhost", DispatchConsumersAsync = true };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

    const string message = "Hello World!";

    for (int i = 1; i <= 5; i++)
    {
        var body = Encoding.UTF8.GetBytes($"{message} {i}");
        channel.BasicPublish(exchange: string.Empty,
                                routingKey: "hello",
                                basicProperties: null,
                                body: body);

        Console.WriteLine($"Sent {message} {i}");
    }

    // Consumer

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.Received += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        await Task.Delay(2000);
        Console.WriteLine($"Received {message}");
    };

    channel.BasicConsume(queue: "hello",
                            autoAck: true,
                            consumer: consumer);

    Console.WriteLine("Press [enter] to exit.");
    Console.ReadLine();
}

结论:太慢,因为事件仍然是同步处理的。
第三次尝试:使用者的异步事件处理程序。代码变更:

  • DispatchConsumersAsync-property在ConnectionFactory中被删除。
  • EventingBasicConsumer再次使用
static void Main(string[] args)
{
    // Sender

    var factory = new ConnectionFactory { HostName = "localhost" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

    const string message = "Hello World!";

    for (int i = 1; i <= 101; i++)
    {
        var body = Encoding.UTF8.GetBytes($"{message} {i}");
        channel.BasicPublish(exchange: string.Empty,
                                routingKey: "hello",
                                basicProperties: null,
                                body: body);

        Console.WriteLine($"Sent {message} {i}");
    }

    // Consumer

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        await Task.Delay(2000);
        Console.WriteLine($"Received {message}");
    };

    channel.BasicConsume(queue: "hello",
                            autoAck: true,
                            consumer: consumer);

    Console.WriteLine("Press [enter] to exit.");
    Console.ReadLine();
}

结论:这就是我想要的结果。即使发布了1000个事件,消息的处理速度也非常快。
但是,这是否适合在生产环境中使用呢?我希望有人能帮助澄清这一点或分享他们在生产环境中快速处理事件的经验。

qvtsj1bj

qvtsj1bj1#

你的第二种方法绝对是正确的方法。我已经在生产中使用它很长一段时间了,它的速度非常快。
此外,如果您使用这种方法,则应该并行处理消息。只要确保您设置了一个适当的预取值。
此外,您可能需要考虑尝试手动确认。

相关问题