Flask中使用rabbitmq和Celery的异步任务

dw1jzc5e  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(1)|浏览(229)

我创建了一个Django restful API。我向Django API发送一个post请求,然后它向rabbitmq发送一条消息。我有一个微服务(Flask),它监听该队列并处理消息代理发送的信息。我希望Flask能够异步处理django发送的队列中的消息。我尝试使用Celery,但我被卡住了,因为我看到任务已接收但未处理。下面是我的代码(抱歉,混乱):

cel.conf.update(app.config, broker_url='amqp://guest:guest@localhost:5672/', result_backend='rpc://')

@cel.task()
def process_task(data):
    print("Processing task with data:", data)
    time.sleep(5)
    print("Processing task with data:", data)
    return "TASK IS DONE"

@app.route('/persons/')
def hello_world():
    def test_x(ch, method, properties, body):
        decode_data = body.decode('utf-8')
        print("Process start")
        process_task.apply_async(args=('a',))
        print("Done!")
    
        ch.basic_ack(delivery_tag=method.delivery_tag)

    connection_parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(connection_parameters)
    channel = connection.channel()
    channel.queue_declare(queue='test_micro', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='test_micro', on_message_callback=test_x)

    print('Started test creating Consuming')
    channel.start_consuming()```

字符串

jdgnovmf

jdgnovmf1#

首先,我认为你正在使架构复杂化。如果你想异步处理任务,你不必在中间使用flask层。你可以直接使用celery和django,两者都能很好地相互支持。你不需要鼠兔。在django中创建相同的端点,然后直接从那里排队任务。
对于处理任务,您必须在不同的进程中启动celery worker,通常通过celery -A celery_app worker --loglevel INFO在CLI上启动。我猜你的任务没有得到处理,因为你没有开始任何celery 工人。

相关问题