postgresql celery任务在postgres数据库中的结果是字节格式吗?

7qhs6swi  于 2024-01-07  发布在  PostgreSQL
关注(0)|答案(2)|浏览(170)

我是Celery的新手。我尝试分别使用redis和postgres db作为我的结果后端。虽然对于redis后端,结果检索成功,对于postgres后端,检索的结果似乎是字节格式。我想知道我是否在代码/设置中遗漏了什么,或者它是预期的?
我在我的笔记本电脑上在docker容器中启动了redis和postgres,如下所示:

docker run -d --name redis_broker -p 6379:6379 redis:latest
docker run -d --name pg_backend -p 5432:5432 -e POSTGRES_PASSWORD=123 postgres:latest

字符串
然后我写了以下非常简单的Python代码:

tasks.py

from celery import Celery

app = Celery('tasks',
             #backend='redis://localhost:6379/0',
             backend='db+postgresql://postgres:123@localhost:5432/celery',
             broker='redis://localhost:6379/0')

@app.task(name='tasks.add')
def add(x:int, y:int) -> int:
    z = x+y
    return z

来电.py

from tasks import add

result = add.delay(3,2)
print(result.get())


接下来,我在终端中启动celery worker,如下所示:

celery -A tasks workder --loglevel=INFO


在另一个终端窗口中,我运行caller.py如下:

python caller.py


完成caller.py后,终端打印出5,这是正确的。
然后我尝试从我的结果后端检索python的结果。而对于redis后端,我得到了正确的结果:

>>> import redis
>>> backend = redis.Redis(host='localhost', port=6379)
>>> backend.get('celery-task-meta-1523014d-0ea7-42c2-83b9-272bcdb72891')
b'{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2021-11-13T07:23:55.012370", "task_id": "1523014d-0ea7-42c2-83b9-272bcdb72891"}'


对于postgres后端,我得到了以下内容:

>>> import sqlalchemy
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> cursor.fetchall()
(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]


我也试过在postgres中查询

celery=> select id, task_id, result, status from celery_taskmeta;
 id |               task_id                |              result              | status
----+--------------------------------------+----------------------------------+---------
  1 | eb21609d-0f84-47c7-afb4-9bccbe41eda4 | \x80054b052e                     | SUCCESS
(1 rows)

celery=>


在这种情况下,如果我的应用程序的其他组件需要从我的(postgres)结果后端检索任务结果,我如何读取这个“内存对象”?
我想知道我是否在我的设置或代码中遗漏了什么?
提前感谢您的任何建议!

s4chpxco

s4chpxco1#

好吧,我是通过查看Celery(https://github.com/celery/celery/blob/master/celery/backends/database/models.py)的源代码自己弄明白的。在Celery源代码中,对于使用SQLAlchemy的数据库后端,结果是序列化的,其类型是:PickleType。因此,我的问题的答案很简单!只需简单地通过调用pickle.loads()将其序列化,如下所示:

>>> import sqlalchemy, pickle
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> res = cursor.fetchall()
>>> res
[(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
>>> pickle.loads(res[0][3])
5

字符串

**编辑:**以上示例代码仅用于故障排除,在生产环境下,应使用ORM(对象关系Map),此时序列化/非序列化由SQLAlchemy自动处理,无需显式调用pickle方法。

fquxozlt

fquxozlt2#

你可以使用AsyncResult方法得到结果。
添加caller.py

from tasks import app  # tasks is your tasks.py module

result = app.AsyncResult(task_id)
print("status", result.status, "result", result.result)

字符串

相关问题