我写了这些代码行,但是我不知道如何在consumer.receive
中的控制台上打印一些值。这段代码可以工作,因为我在RabbitMQ CloudAMQP上检查了一些值,但是我在控制台上看不到任何变化。
问题就在这里:
Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) + " received");
完整代码:
// publisher
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqps://guest:guest@localhost");
using (IConnection connection = factory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
for (int i = 1; i <= 100; i++)
{
byte [] bytemessage = Encoding.UTF8.GetBytes($"is - {i}");
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "kuyruk", routingKey: "", basicProperties: properties, body: bytemessage);
}
}
// Consumer
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqps://guest:guest@localhost");
using (IConnection connection = factory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
// Here consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
string queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "kuyruk", routingKey: "");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
consumer.Received += (sender, e) =>
{
Thread.Sleep(500);
Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) + " received");
channel.BasicAck(e.DeliveryTag, false);
};
Console.Read();
}
1条答案
按热度按时间bfrts1fy1#
首先,您必须创建队列。当您在此点之后发布消息时,该消息将被排队。如果您先发布消息,则该消息将消失,因为没有可将消息发送到的队列。
代码中的另一个错误是:您正在创建使用者通道。您定义了消息到达时要执行的操作。然后,当您的代码运行到下面的发布进程时,您退出 Package 您的使用者通道的using块。因此,使用者已关闭,无法使用消息。