我正在运行一个flask应用程序,该应用程序旨在接收零星但繁重的工作。我实现了一系列运行flask应用程序、celery 、redis(作为代理)和memcached(作为后端)的docker容器。我使用celery 将处理重量分成块,然后使用get()检索所有结果:
# Multithreading
jobs = group(processing_fn.s(c) for c in chunks)
result = jobs.apply_async()
while not result.ready() :
time.sleep(30)
resultset = result.get()
虽然这确实工作得又快又好,但我在命令celery 释放用于存储任务结果的ram时遇到了麻烦。最终,服务器内存耗尽,必须重新启动,这远远不是最佳状态。
我试过用 .forget
在结果集上(甚至在结果集中的每个结果上):
result = result.get()
result.forget()
...
resultset = result.get()
for r in result :
r.forget()
然而,这些都没有释放记忆。。。有什么想法吗?
以下是celery 应用程序的示例化方式:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker = "redis://redis:6379/",
backend = "cache+memcached://memcached:11211"
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args,**kwargs):
with app.app_context():
return self.run(*args,**kwargs)
celery.Task = ContextTask
return celery
...
celery = make_celery(app)
1条答案
按热度按时间yhuiod9q1#
默认情况下,celery 将任何任务的结果存储1天:
请参阅:您可以通过:celery \任务\结果\过期或https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-过期