celery-result后端不返回组结果

z5btuh9x  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(186)

更新日期:2020年7月25日星期六
总的问题是:我有一个包含 celery.group 链条没有给我我想的结果。在你告诉我之前 results.save() 然后恢复。我试过了,但我运气不好。下面是celery 工人的代码,我正在执行以获得所需的结果。
我有一个celery 工人正在运行,看起来像这样:

[2020-07-23 13:41:04,278: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-07-23 13:41:04,284: DEBUG/MainProcess] | Worker: Building graph...
[2020-07-23 13:41:04,285: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Building graph...
[2020-07-23 13:41:04,402: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Gossip, Heart, Agent, Tasks, Control, event loop}

 -------------- celery@worker1.deon-aarininc v4.4.0 (cliffs)
---*****----- 
--*******---- Linux-5.4.0-42-generic-x86_64-with-glibc2.29 2020-07-23 13:41:04
-***--- * --- 
-**---------- [config]
-**---------- .> app:         tasks:0x7fe4b1e98580
-**---------- .> transport:   redis://localhost:6379/1
-**---------- .> results:     redis://localhost:6379/0
-***--- * --- .> concurrency: 4 (prefork)
--*******---- .> task events: OFF (enable -E to monitor tasks in this worker)
---*****----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

下面的函数是我调用来创建并行进程链的函数。api调用-->转换dict-->并行进程-->结果。结果显示了工作人员在终端中的运行位置,但我需要他们去找解释器。

@app.task
def setup_chunk(nested_dict):
    """
    Setup nested data structure for parallel run
        - equivalant to an O(n) for loop call 

    Args:
        nested_dict - nested dictionary
    Returns:
        list of jobs
    """
    return [nested_dict[i::len(nested_dict)] for i in range(len(nested_dict))]

@app.task
def process_step(board, header=HEADER):
    """
    Process step in run

    Args:
        nested_dict - nested dictionary
    Returns:
        results of running Celery in parallel
    """

    api_bid_url = ""

    return api_request_get(api_bid_url, HEADER)

@app.task
def process_group(chunk):
    return group([process_step.s(chunk[i]) for i in range(len(chunk))]).apply_async()

$ ipython3

[LN x]: pipeline = (setup_chunk.s(board_data['data']) | \ 
    ...:             process_group.s()).apply_async()

而这正是通过 ipython3 解释器:所有作业任务ID。

Out[42]: 
[['0abee691-ace4-47b8-83cf-b29502c5499d', None],
 [[['08873986-5193-4035-8442-1c46039a93f1', None], None],
  [['12adf051-1a78-4b9a-ba49-8b0ad09aa157', None], None],
  [['043811f5-aba1-4942-bf9d-f90cac0623cd', None], None],
  [['d46f1f61-c753-49cd-a9d4-bf7a3e69a037', None], None],
  [['b62ee40f-a0d3-4410-b36c-5c3f7967f61a', None], None],
  [['334b6272-3667-4dd3-9eab-c3c05baad760', None], None],
  [['5a75d948-6941-4598-8b97-c60b8960bbd0', None], None],
  [['18a93b78-435e-4489-b25c-bb699addd160', None], None],
  [['3c0f9834-eb1c-417c-b71e-03872ba19e90', None], None],
  [['a7d0ff28-d139-4f02-8975-8575613aef7c', None], None],
  [['94edd270-e612-4b68-932e-bb16c2a8af35', None], None],
  [['13c95d8c-c65b-407e-8f34-3dd9fa171591', None], None],
  [['fb689945-1fed-49e4-b430-48344a83fcf4', None], None]]]

我要的是从候机楼的工人那里出来。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题