我创建了一个Django restful API。我向Django API发送一个post请求,然后它向rabbitmq发送一条消息。我有一个微服务(Flask),它监听该队列并处理消息代理发送的信息。我希望Flask能够异步处理django发送的队列中的消息。我尝试使用Celery,但我被卡住了,因为我看到任务已接收但未处理。下面是我的代码(抱歉,混乱):
cel.conf.update(app.config, broker_url='amqp://guest:guest@localhost:5672/', result_backend='rpc://')
@cel.task()
def process_task(data):
print("Processing task with data:", data)
time.sleep(5)
print("Processing task with data:", data)
return "TASK IS DONE"
@app.route('/persons/')
def hello_world():
def test_x(ch, method, properties, body):
decode_data = body.decode('utf-8')
print("Process start")
process_task.apply_async(args=('a',))
print("Done!")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection_parameters = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.queue_declare(queue='test_micro', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_micro', on_message_callback=test_x)
print('Started test creating Consuming')
channel.start_consuming()```
字符串
1条答案
按热度按时间jdgnovmf1#
首先,我认为你正在使架构复杂化。如果你想异步处理任务,你不必在中间使用flask层。你可以直接使用celery和django,两者都能很好地相互支持。你不需要鼠兔。在django中创建相同的端点,然后直接从那里排队任务。
对于处理任务,您必须在不同的进程中启动celery worker,通常通过
celery -A celery_app worker --loglevel INFO
在CLI上启动。我猜你的任务没有得到处理,因为你没有开始任何celery 工人。