我正在尝试检索存储在mysql结果后端的所有celery链接任务的结果。
例如,我有下面两个celery 任务,
@celery.task(name='celery_fl.add')
def add(x, y, value=None):
if value is None:
try:
return x + y
except TypeError:
return None
return value
@celery.task(name='celery_fl.mul')
def mul(x, y, value=None):
if value is None:
try:
return x * y
except TypeError:
return None
return value
这是我怎么把它们锁起来的
parent = (add.s(2, 2) | mul.s(8)).apply_async()
这里parent.get()的输出将是最后一个链接任务的结果。parent.parent.get()将给予我第一个链接任务的输出。
我试图实现的是,我希望在稍后阶段使用任务id获得相同的输出。
task_id = 'bc5fc4b1-613e-4ef0-b5c8-900999d9a6f1'
parent = AsyncResult(task_id, app=celery)
假设我拥有的task_id属于链式事件中的第二个任务(父任务),那么如果我键入parent.parent.get(),我应该得到第一个链式任务的结果。但是不知何故,我得到的值是None。有没有其他方法可以得到具有task_id的任务,而不是AsyncResult()?
2条答案
按热度按时间uhry853o1#
当使用mysql后端来存储结果时,每个链接任务的结果都是单独存储的,但是任务示例不再可用,没有它就不可能使用主任务(Ref - Celery任务)来检索子任务的结果。
因此,为了检索所有任务的结果,每个任务的任务ID应该存储在数据库中的某个位置。
使用flask(python)的示例,
然后,您可以简单地从数据库中获取任务ID,并使用celepi.result.AsyncResult(task_id)按ID检索每个任务(ref - Async results)。
hgb9j2n62#
下面是一个获取最顶层父对象的解决方案:
然后,您可以稍后使用以下命令检索所有子项: