当RabbitMQ队列中添加新消息时,如何获得通知?

pexxcrt2  于 2023-02-22  发布在  RabbitMQ
关注(0)|答案(2)|浏览(253)

我是RabbitMQ的新手。我知道通过使用while循环,我可以在队列中查找新添加的消息。但我不想这样做。我也尝试了EventingBasicConsumer Received事件处理程序,但它只触发订阅者代码第一次运行时出现的消息。任何新添加的消息都不会触发此代码。有没有一种方法可以让RibbitMQ在队列有新消息时通知我?我希望我的Web应用程序在有新消息时得到RabbitMQ的通知。

Receiver : 
 public static class RabbitMQConsumer2
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                channel.QueueDeclare("AAA", true, false, false, null);
                channel.QueueBind(queue: "AAA",
                                  exchange: "logs",
                                  routingKey: "");

                Debug.WriteLine(" [*] Waiting for logs.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Debug.WriteLine(" [x] {0}", message);
                };
                channel.BasicConsume(queue: "AAA",
                                     noAck: true,
                                     consumer: consumer);

                Debug.WriteLine(" Press [enter] to exit.");
            }
        }
    }
gzszwxb4

gzszwxb41#

问题是你在调用BasicConsume之后立即关闭了通道和连接(因为它到达了你的usings作用域的末尾),你需要启动一个长期的后台线程,在那里连接和通道保持活动状态,然后你的接收处理程序会在有消息的时候立即执行。
事件消费者通常是长期存在的,更适合Windows服务或控制台应用程序。然而,如果你真的需要它在一个Web应用程序中,你真的只需要它坐在那里在后台接收消息,那么你可以启动一个线程或长期任务,并锚它在一些静态对象,使它不会死。但不知道你的用例,我不能说这是最好的建议。

hs1rzwqc

hs1rzwqc2#

我创建了一个控制台应用程序,并从多个启动项目区启动了我的web API。
所以,我同意“Vanlighty”的回答。
你可以这样试试

internal class Program
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri("amqps://asdasd:kaPv9awYo7O_SsSSASassam4GZ@woodpecker.rmq.cloudamqp.com/asdasd")
        };

        using var connection = factory.CreateConnection();

        var channel = connection.CreateModel();
        channel.QueueDeclare("Test QueueDeclare", exclusive: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume("Test QueueDeclare", true, consumer);

        consumer.Received += (sender, e) =>
        {
            Console.WriteLine(Encoding.UTF8.GetString(e.Body.Span));
        };
        Console.ReadLine();
    }
}

相关问题