希望每个人都有一个伟大的周末,我有以下的订阅方法
public void Subscribe(string queueName, Func<string, Task<bool>> onMessageReceived, Action<Exception> onError)
{
var channel = _connection.CreateModel();
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
//receive only one message at a time and wait for the method to return before receiving the next message
channel.BasicQos(0, 1, false);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
try
{
Console.WriteLine(" [x] Received {0} , consumer-id {1}", message, ea.ConsumerTag);
if (!await onMessageReceived(message))
{
throw new Exception("Message processing failed");
}
channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception e)
{
onError(e);
channel.BasicNack(ea.DeliveryTag, false, true);
}
};
channel.BasicConsume(queueName, false, consumer);
}
它在我的开发环境中运行良好,问题是当我在Docker机器上旋转这个坏男孩的一些示例时,它们似乎在大约30分钟后死亡,没有错误,没有任何东西,执行继续,但没有收到任何消息,它们从rabbitMQ消费者的数组中消失。
我错过了什么?
1条答案
按热度按时间vuv7lop31#
消费者应更改为
工厂应该有参数