控制台.WriteLine在使用者.received上不工作- RabbitMQ C#

x33g5p2x  于 2022-12-29  发布在  RabbitMQ
关注(0)|答案(1)|浏览(218)

我写了这些代码行,但是我不知道如何在consumer.receive中的控制台上打印一些值。这段代码可以工作,因为我在RabbitMQ CloudAMQP上检查了一些值,但是我在控制台上看不到任何变化。
问题就在这里:

Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) + " received");

完整代码:

// publisher
 ConnectionFactory factory = new ConnectionFactory();
 factory.Uri = new Uri("amqps://guest:guest@localhost");
 
 using (IConnection connection = factory.CreateConnection())
 using (IModel channel = connection.CreateModel())
 {
      channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);

      for (int i = 1; i <= 100; i++)
      {
         byte [] bytemessage = Encoding.UTF8.GetBytes($"is - {i}");
 
         IBasicProperties properties = channel.CreateBasicProperties();
         properties.Persistent = true;
 
         channel.BasicPublish(exchange: "kuyruk", routingKey: "", basicProperties: properties, body: bytemessage);
     }     
 }
 
 // Consumer
 ConnectionFactory factory = new ConnectionFactory();
 factory.Uri = new Uri("amqps://guest:guest@localhost");
 
 using (IConnection connection = factory.CreateConnection())
 using (IModel channel = connection.CreateModel())
 {
     channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
 
     // Here consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
     string queueName = channel.QueueDeclare().QueueName;
     channel.QueueBind(queue: queueName, exchange: "kuyruk", routingKey: "");
 
     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
     channel.BasicConsume(queueName, false, consumer);
     consumer.Received += (sender, e) =>
     {
         Thread.Sleep(500);
         Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) + " received");
         channel.BasicAck(e.DeliveryTag, false);
     };

     Console.Read();     
 }
bfrts1fy

bfrts1fy1#

首先,您必须创建队列。当您在此点之后发布消息时,该消息将被排队。如果您先发布消息,则该消息将消失,因为没有可将消息发送到的队列。
代码中的另一个错误是:您正在创建使用者通道。您定义了消息到达时要执行的操作。然后,当您的代码运行到下面的发布进程时,您退出 Package 您的使用者通道的using块。因此,使用者已关闭,无法使用消息。

//consumer

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory
{
    Uri = new Uri("amqp://guest:guest@localhost")
};

using (IConnection connection = factory.CreateConnection())
using (IModel consumeChannel = connection.CreateModel())
{
    consumeChannel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);

    //#region Her Consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
    string queueName = consumeChannel.QueueDeclare().QueueName;
    consumeChannel.QueueBind(queue: queueName, exchange: "kuyruk", routingKey: "");
    //#endregion

    consumeChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

    var consumer = new EventingBasicConsumer(consumeChannel);
    consumeChannel.BasicConsume(queueName, false, consumer);
    consumer.Received += (sender, e) =>
                         {
                             Thread.Sleep(500);
                             Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) + " recieved");
                             consumeChannel.BasicAck(e.DeliveryTag, false);
                         };

    //publisher
    using (IModel publishChannel = connection.CreateModel())
    {
        publishChannel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
        for (int i = 1; i <= 100; i++)
        {
            var str = $"is - {i}";
            byte[] byteMessage = Encoding.UTF8.GetBytes(str);

            IBasicProperties properties = publishChannel.CreateBasicProperties();
            properties.Persistent = true;

            publishChannel.BasicPublish(exchange: "kuyruk", routingKey: "", basicProperties: properties, body: byteMessage);
        }
    }

    Console.Read();
}

相关问题