从cli手动触发的dag错误地用celeryexector记录任务成功

mpbci0fu  于 2021-06-10  发布在  Redis
关注(0)|答案(1)|浏览(487)

我用的是气流版本 1.10.11 ,并有一个简单的测试dag,如下所示:

from datetime import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from backend.logic.games import refresh_game_data

dag = DAG(
  dag_id='update_game_data',
  schedule_interval='@once',
  start_date=dt.utcnow()
  )

def refresh_game_with_context(**kwargs):
    game_id = kwargs['dag_run'].conf['game_id']
    refresh_game_data(game_id)

refresh_game_data_task = PythonOperator(
    task_id="refresh_game_data",
    python_callable=refresh_game_with_context,
    dag=dag,
    provide_context=True
)

refresh_game_data_task

当我通过ui手动调用任务时,它会成功,不会出现任何问题,并生成预期的输出:

调用cli时,我得到以下输出:

root@b324f7099e97:/home/backend# airflow trigger_dag 'update_game_data' --conf '{"game_id":3}'
[2020-07-30 02:46:06,264] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:06,267] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:06,267] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
  result = self._query(query)
Created <DagRun update_game_data @ 2020-07-30 02:46:07+00:00: manual__2020-07-30T02:46:07+00:00, externally triggered: True>

与python中的本地客户端类似的结果:

In [1]: from airflow.api.client.local_client import Client 
   ...:  
   ...: afc = Client(None, None) 
   ...: res = afc.trigger_dag(dag_id='update_game_data', conf={"game_id": 3})                                                                        
[2020-07-30 02:46:43,612] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:43,618] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:43,619] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
  result = self._query(query)

但是,在这两种情况下,任务实际上都不会发送到airflow celery worker,就像通过ui触发时一样。奇怪的是,在这两种情况下,ui都显示dag运行成功,但没有日志数据,也没有将dag中的任务标记为成功:

我确信这就是问题所在,但是可以用一个指向正确方向的指针。我使用redis作为我的任务后端,rmq作为我的代理,mysql作为我的metadb,如下所述。

7ajki6be

7ajki6be1#

我从外部触发的dag样本开始,没有任何问题。然而,当我开始交换自己的逻辑时,它开始崩溃。然后我发现这篇文章警告不要使用动态开始日期。有趣的是,虽然动态开始日期似乎可以摆脱气流调度器,但外部触发的示例使用了一个,而且它是有效的。
我两个都试过了 datetime.now() 以及 datetime.utcnow() 在这两种情况下,都会抛出一个错误,即任务执行日期必须大于开始日期。我试着把时间戳转换成日期,但是我不喜欢时间戳中没有tz信息。最终解决的问题是放弃了对 utcnow()/now() 并将我的dag设置为固定的开始日期,如下所示:

from datetime import datetime as dt
...
dag = DAG(
  dag_id='update_game_data',
  schedule_interval=None,
  start_date=dt(2000, 1, 1)
  )

通过固定的开始日期和遗漏的tz信息,我能够得到这个工作。为什么,确切地说,这是工作的,为什么上面的dag工作通过用户界面,而不是通过外部触发是一个问题,我将留给我们中间的气流忍者。这对于一次性、外部触发的DAG很好。如果/当我们的应用程序开始使用airflow作为beat调度程序时,我们将看到是否遇到同样的问题。目前,我们将celery 与redis结合使用来管理作业计划。

相关问题