django 获取任务的celery节拍触发时间

qqrboqgw  于 2023-08-08  发布在  Go
关注(0)|答案(3)|浏览(121)

我试图找到一种方法来获取触发celery beat启动任务的时间条件。
获取datetime.now()的时间经常偏离任务按celery beat排队的时间,因为所有celery worker都很忙碌。举例来说:我把任务设置为每天12:30执行,但由于所有工作人员都很忙碌,所以任务在12:31开始运行。
我需要知道哪个时间条件触发了任务,而不管任务执行的时间。
编辑:
我是这样定义我的周期性任务的:

CELERYBEAT_SCHEDULE = {
'periodic_clear_task': {
    'task': 'app.tasks.periodic_clear_task',
    'schedule': crontab(hour=2),
    'args': ()
},
'periodic_update_task': {
    'task': 'app.tasks.periodic_update_task',
    'schedule': crontab(minute='00,30'),
    'args': ()
},
}

字符串

kq0g1dla

kq0g1dla1#

类似地,对于这个问题中的另一个answer,我使用了expires(秒)来解决这个问题。使用Celery Beat时间表时,可以按如下方式指定:

limit = 60

CELERYBEAT_SCHEDULE = {
    ...,

    'periodic_update_task': {
        'task': 'app.tasks.periodic_update_task',
        'schedule': crontab(minute='00,30'),
        'args': (),
        'options': {'expires': 60 * 60 * 24 * limit }
    },
}

字符串
这里的限制很重要,因为现在我们分配了一个到期值,如果达到这个限制,任务将不会执行。因此,需要将其选择为足够大,以便根据需要处理任务。请记住,到期值以秒为单位指定。更多信息在这里。
当指定expires参数时,Celery会自动将秒值转换为内部使用的日期时间值。要访问函数中的过期时间,必须绑定该方法,然后从任务请求对象中检索。此任务对象应与下面的任务对象类似。

<Context: {'lang': 'py', 'task': 'app.tasks.periodic_update_task', 'expires': '2021-10-23T21:11:26.031443+01:00', 'id': 'ef0e301a-b9b3-4930-8fa3-ca3fdf6b36cb', ... }>


一旦你有时间,你可以做简单的计算,以重新获得触发时间。

@app.task(bind=True)
def periodic_update_task(self):
    trigger_time = parser.parse(self.request.expires)-timedelta(days=limit))

inn6fuwd

inn6fuwd2#

我遇到了同样的问题,我想在周期性celery 任务中使用任务触发时间而不是执行时间。我找不到celery 任务中有任务触发时间的确切字段。作为一种方法,我已经使用了到期时间的任务。

task_expires_day =30

@app.on_after_configure.connect
def df_beat_setup(sender, **kwargs):
    pipeline_config = {}
    sender.add_periodic_task(
            crontab(
                    minute=0,
                    hour=8,
            ),
            df_scheduler_task.s(),
            args=(pipeline_config),
            name="df_trigger",
            queue="df_scheduler_queue",
            expires=60*60*24*task_expires_day,
            options={"time": datetime.now()}
    )

@app.task(name="df_scheduler_queue", bind=True, acks_late=True)
def df_scheduler_task(task: "celery.Task", pipeline_config: Dict, time: str) -> dict:
    task_trigger_time = parser.parse(task.request.expires)-timedelta(days=task_expires_day))
    
    ...

字符串

yc0p9oo0

yc0p9oo03#

after_task_publish信号可能会有所帮助。

演示代码

from django.core.cache import cache
from django.utils import timezone
from celery import Celery
from celery.signals import after_task_publish

app = Celery()
app.conf.beat_schedule = {
    "test-every-minute": {
        "task": "xxx.tasks.test",
        "schedule": 60
    }
}

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    info = headers if "task" in headers else body
    task_id = info["id"]
    trigger_time = timezone.now()
    cache.set(task_id, trigger_time)  # cache trigger_time
    print(f"task {task_id} sent at {trigger_time}")

@app.task(bind=True)
def test(self):
    execute_time = timezone.now()
    task_id = self.request.id
    print(f"task {task_id} executed at {execute_time}")
    trigger_time = cache.get(task_id)  # get trigger_time
    print(f"task {task_id} triggered at {trigger_time}")

字符串

日志

Beat

[2023-07-31 02:17:40,573: INFO/MainProcess] Scheduler: Sending due task test-every-minute (xxx.tasks.test)
[2023-07-31 02:17:40,576: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f sent at 2023-07-31 02:17:40.575942+00:00

Worker

[2023-07-31 02:17:40,582: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f executed at 2023-07-31 02:17:40.582441+00:00
[2023-07-31 02:17:40,585: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f triggered at 2023-07-31 02:17:40.575942+00:00
[2023-07-31 02:17:40,597: INFO/MainProcess] Task rule.tasks.tasks.test[041af551-01e7-45f8-931f-f3b2fbbad14f] succeeded in 0.015511923003941774s: None

相关问题