我的应用程序有一个Stream队列,我想让它在满足特定条件时停止消费。由于start_consuming永远被BlockingConnection
阻塞,所以看起来唯一的出路是在回调中调用stop_consuming。
不幸的是,这不起作用,因为stop_consuming
将拒绝所有待定消息(来自pika docs)
注意:挂起的不可确认消息将丢失;未决的可确认消息将被拒绝。
...您无法拒绝流队列,因为此错误消息确认:operation basic.reject caused a connection exception not_implemented: "basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'"
下面是一个简单的例子。服务器代码:
# !/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()
channel_stream.queue_declare(
"stream-queue",
auto_delete=False, exclusive=False, durable=True,
arguments={
'x-queue-type': 'stream',
}
)
channel_stream.basic_qos(
prefetch_count=1,
)
class Server(object):
def __init__(self):
channel_stream.basic_consume(
queue="stream-queue",
on_message_callback=self.stream_callback,
)
def stream_callback(self, channel, method, props, body):
print(f"received '{body.decode()}' via {method.routing_key}")
channel_stream.stop_consuming()
server = Server()
try:
channel_stream.start_consuming()
except KeyboardInterrupt:
connection.close()
和客户端代码:
# !/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()
channel_stream.queue_declare(
"stream-queue",
durable=True,
arguments={
'x-queue-type': 'stream',
}
)
# This publishes twice: once to trigger the server's callback
# and again to get a message in the queue that the server is forced
# to nack, causing a crash.
for i in range(2):
channel_stream.basic_publish(
exchange='',
routing_key='stream-queue',
body=f"stream data".encode()
)
connection.close()
而全输出:
~/anaconda3/envs/py310/bin/python ~/workspace/rabbitmq_train/stream_bug/server_stream_only.py
received 'stream data' via stream-queue
Traceback (most recent call last):
File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 36, in <module>
channel_stream.start_consuming()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1880, in start_consuming
self._process_data_events(time_limit=None)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 2041, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 848, in process_data_events
self._dispatch_channel_events()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
impl_channel._get_cookie()._dispatch_events()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1507, in _dispatch_events
consumer_info.on_message_callback(self, evt.method,
File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 30, in stream_callback
channel_stream.stop_consuming()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1893, in stop_consuming
self._cancel_all_consumers()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1494, in _cancel_all_consumers
self.basic_cancel(consumer_tag)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1802, in basic_cancel
self._flush_output(
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1350, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.ConnectionClosedByBroker: (540, "NOT_IMPLEMENTED - basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'")
Process finished with exit code 1
这是我启动服务器的方式:
docker run -it --rm -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
1条答案
按热度按时间watbbzwu1#
你不能对不同类型的队列使用相同的
channel
。一般来说,这不是最佳实践。你应该为每个消费者提供一个通道。若要解决您的问题:
多线程将是:
Note about multi-thread