python-3.x 在Celery中按id分别检索所有链接任务结果

njthzxwz  于 2023-02-26  发布在  Python
关注(0)|答案(2)|浏览(210)

我正在尝试检索存储在mysql结果后端的所有celery链接任务的结果。
例如,我有下面两个celery 任务,

@celery.task(name='celery_fl.add')
def add(x, y, value=None):
    if value is None:
        try:
            return x + y
        except TypeError:
            return None
    return value

@celery.task(name='celery_fl.mul')
def mul(x, y, value=None):
    if value is None:
        try:
            return x * y
        except TypeError:
            return None
    return value

这是我怎么把它们锁起来的

parent = (add.s(2, 2) | mul.s(8)).apply_async()

这里parent.get()的输出将是最后一个链接任务的结果。parent.parent.get()将给予我第一个链接任务的输出。
我试图实现的是,我希望在稍后阶段使用任务id获得相同的输出。

task_id = 'bc5fc4b1-613e-4ef0-b5c8-900999d9a6f1'
parent = AsyncResult(task_id, app=celery)

假设我拥有的task_id属于链式事件中的第二个任务(父任务),那么如果我键入parent.parent.get(),我应该得到第一个链式任务的结果。但是不知何故,我得到的值是None。有没有其他方法可以得到具有task_id的任务,而不是AsyncResult()?

uhry853o

uhry853o1#

当使用mysql后端来存储结果时,每个链接任务的结果都是单独存储的,但是任务示例不再可用,没有它就不可能使用主任务(Ref - Celery任务)来检索子任务的结果。
因此,为了检索所有任务的结果,每个任务的任务ID应该存储在数据库中的某个位置。
使用flask(python)的示例,

chain = (s3_init.s(order.name, order.id)|create_order_sheet.s(order.id, order.name) | create_order_info.s(order.id, order.name))
res = chain()
process = {
   's3_init': res.parent.parent.parent.parent.parent.parent.id,
   'order_sheet': res.parent.parent.parent.parent.id,
   'order_info': res.parent.parent.parent.id
}
order.update(process_id=json.dumps(process))

然后,您可以简单地从数据库中获取任务ID,并使用celepi.result.AsyncResult(task_id)按ID检索每个任务(ref - Async results)。

hgb9j2n6

hgb9j2n62#

下面是一个获取最顶层父对象的解决方案:

from celery import chain, Celery

app = Celery("my-tasks")

@app.task
def run_task(item_id):
    res = chain(
        long_task_1.s(item_id),
        long_task_2.s(),
        long_task_3.s(),
    ).delay()

    while getattr(res, "parent", None):
        res = res.parent

    item = MyItem.objects.get(id=item_id)
    item.celery_root_task_id = res.id
    item.save()

    return res

然后,您可以稍后使用以下命令检索所有子项:

from celery.result import AsyncResult

root_result = AsyncResult(obj.celery_task_id, app=app)

task_results = ", ".join(
    [
        f"{t._cache.get('task_name')}: {t.status}"
        for t, _ in root_result.collect()
    ]
)

相关问题