rabbitmq 使用auto_ack=True的消息时出现pika异常错误

esyap4oy  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(273)

我正在使用pika和托管的CloudAMQP示例。
正在以短脉冲将消息发布到队列:~10条消息/秒,然后几分钟内什么都没有。消费者有时可能需要长达~30秒的时间来处理一条消息。我的简单消费者代码是:

import pika, os, time

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=True)

channel.start_consuming()
connection.close()

有时,我会看到以下行为:

  • 在几次快速爆发中向队列发布了约20-30条消息
  • 消费者获取所有排队的消息,一次自动确认它们,即它们都从队列中“消失”
  • 在处理自动确认的消息后,Pika将引发以下异常:
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')

(full追溯如下)
我通过禁用auto_ack=True和手动确认消息来解决我的问题(见下文)。
是否有其他方法可以解决这个问题?EOF异常的发生是因为CloudAMQP/RabbitMQ服务器没有及时得到心跳,并关闭了连接?还是因为pika的内部超时?谢谢!
追溯:

Traceback (most recent call last):
 File "/app/app.py", line 146, in <module>
   pika_obj['pika_channel'].start_consuming()
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
   self._process_data_events(time_limit=None)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
   self.connection.process_data_events(time_limit=time_limit)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
   self._flush_output(common_terminator)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
   raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
cs7cruho

cs7cruho1#

我能够通过引入一个简单的更改来修复上面的代码:设置auto_ack=False,处理完每条报文后手工调用basic_ack

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)
  # ack the message manually
  ch.basic_ack(delivery_tag=method.delivery_tag)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=False)

channel.start_consuming()
connection.close()
c9qzyr3d

c9qzyr3d2#

这个问题已经很久了,但有一点让我很清楚,那就是当消费者使用从队列接收的消息时,队列一直处于停滞状态。我认为这随时都会导致灾难。MQ是异步介质是有原因的,但这种设计扼杀了这个目的。我有一个类似的设置,消息每分钟都会突发到达,但在回调函数中,我只是将它们放入本地队列(queue.Queue),这允许MQ客户端(pika)立即确认提供者队列上的消息。在本地端,我有一个线程不断地轮询内部队列中的消息并使用它们。我仍然将消息保存到磁盘中,以防本地端发生任何故障,但是我解除了MQ保留我的消息超过需要的时间的责任。希望这对其他正在寻找这种情况的解决方案的人有所帮助。

相关问题