更新日期:2020年7月25日星期六
总的问题是:我有一个包含 celery.group
链条没有给我我想的结果。在你告诉我之前 results.save()
然后恢复。我试过了,但我运气不好。下面是celery 工人的代码,我正在执行以获得所需的结果。
我有一个celery 工人正在运行,看起来像这样:
[2020-07-23 13:41:04,278: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-07-23 13:41:04,284: DEBUG/MainProcess] | Worker: Building graph...
[2020-07-23 13:41:04,285: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Building graph...
[2020-07-23 13:41:04,402: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Gossip, Heart, Agent, Tasks, Control, event loop}
-------------- celery@worker1.deon-aarininc v4.4.0 (cliffs)
---*****-----
--*******---- Linux-5.4.0-42-generic-x86_64-with-glibc2.29 2020-07-23 13:41:04
-***--- * ---
-**---------- [config]
-**---------- .> app: tasks:0x7fe4b1e98580
-**---------- .> transport: redis://localhost:6379/1
-**---------- .> results: redis://localhost:6379/0
-***--- * --- .> concurrency: 4 (prefork)
--*******---- .> task events: OFF (enable -E to monitor tasks in this worker)
---*****-----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
下面的函数是我调用来创建并行进程链的函数。api调用-->转换dict-->并行进程-->结果。结果显示了工作人员在终端中的运行位置,但我需要他们去找解释器。
@app.task
def setup_chunk(nested_dict):
"""
Setup nested data structure for parallel run
- equivalant to an O(n) for loop call
Args:
nested_dict - nested dictionary
Returns:
list of jobs
"""
return [nested_dict[i::len(nested_dict)] for i in range(len(nested_dict))]
@app.task
def process_step(board, header=HEADER):
"""
Process step in run
Args:
nested_dict - nested dictionary
Returns:
results of running Celery in parallel
"""
api_bid_url = ""
return api_request_get(api_bid_url, HEADER)
@app.task
def process_group(chunk):
return group([process_step.s(chunk[i]) for i in range(len(chunk))]).apply_async()
$ ipython3
[LN x]: pipeline = (setup_chunk.s(board_data['data']) | \
...: process_group.s()).apply_async()
而这正是通过 ipython3
解释器:所有作业任务ID。
Out[42]:
[['0abee691-ace4-47b8-83cf-b29502c5499d', None],
[[['08873986-5193-4035-8442-1c46039a93f1', None], None],
[['12adf051-1a78-4b9a-ba49-8b0ad09aa157', None], None],
[['043811f5-aba1-4942-bf9d-f90cac0623cd', None], None],
[['d46f1f61-c753-49cd-a9d4-bf7a3e69a037', None], None],
[['b62ee40f-a0d3-4410-b36c-5c3f7967f61a', None], None],
[['334b6272-3667-4dd3-9eab-c3c05baad760', None], None],
[['5a75d948-6941-4598-8b97-c60b8960bbd0', None], None],
[['18a93b78-435e-4489-b25c-bb699addd160', None], None],
[['3c0f9834-eb1c-417c-b71e-03872ba19e90', None], None],
[['a7d0ff28-d139-4f02-8975-8575613aef7c', None], None],
[['94edd270-e612-4b68-932e-bb16c2a8af35', None], None],
[['13c95d8c-c65b-407e-8f34-3dd9fa171591', None], None],
[['fb689945-1fed-49e4-b430-48344a83fcf4', None], None]]]
我要的是从候机楼的工人那里出来。
暂无答案!
目前还没有任何答案,快来回答吧!