我正在尝试限制一个celery 任务的速度,下面是我的做法:
from project.celery import app
app.control.rate_limit('task_a', '10/m')
它运行得很好。但是,有一个陷阱。这个工人负责的其他任务也被阻止了。
假设已经调度了100个task_a
,由于它是有速率限制的,所以需要10分钟才能全部执行完,在这段时间里,task_b
也已经被调度了,只有在task_a
完成后才会执行。
- 是否可以不阻止
task_b
?**
- 是否可以不阻止
从表面上看,这就是它的工作原理,只是在阅读文档后我没有得到这样的印象。
其他选项包括:
- 仅为此任务分离工作进程和队列
- 将
eta
添加到任务task_a
,以便将所有任务都安排在夜间运行 - 这种情况下的最佳做法是什么?**
2条答案
按热度按时间bnlyeluc1#
这应该是任务声明的一部分,以便在每个任务的基础上工作。您通过
control
执行此操作的方式可能是它对其他任务具有此副作用的原因多阅读
请注意,这是每个工作进程示例的速率限制,而不是全局速率限制。要强制实施全局速率限制(例如,对于具有每秒最大请求数的API),必须限制到给定的队列。
您可能需要在单独的队列中执行此操作
jm81lzqq2#
最简单(不需要编码)的方法是将任务分离到它自己的队列中,并运行一个专用的工作器。
这并不可耻,有许多Celery队列和工人,每个人只专注于一个特定类型的工作,这是完全没有问题的。作为一个额外的好处,你可以得到一些更多的执行控制,你可以很容易地打开/关闭工人暂停某些过程,如果需要的话,等等。
另一方面,让许多专门的工作者在大部分时间里空闲(等待特定的作业排队)并不是特别内存高效。
因此,如果您需要对更多任务进行速率限制,并希望特定的工作线程大部分时间处于空闲状态,则可以考虑提高效率并实现Token Bucket。这样,您的所有工作线程都可以是通用的,并且您可以随着总体负载的增加而自然地扩展它们,因为您知道工作分配不会再受到单个任务速率限制的影响。