我在Kubernetes上使用git-sync来同步git仓库中的DAG,我可以成功导入DAG,但我发现一个问题,即旧更改与新更改一起保留在Airflow UI中。
示例:
在我的远程存储库中,我有以下文件:dags/dag_test.py
此文件的内容如下:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner' :'airflow',
'depends_on_past': False,
'start_date' : datetime(2023, 1, 1),
'retries' : 0
}
dag = DAG(dag_id='my_dag3', default_args=default_args, catchup=False, schedule_interval='@once')
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
DAG my_dag3
由Airflow拾取并在UI中可见。DAG可以成功运行,到目前为止一切顺利。
现在,我向master分支推送一个更改,将dag_id
重命名为dag_id='my_dag4'
,不对文件或文件名进行其他更改-这应该有效地定义了一个新的DAG dag4
并删除dag3
。
当此更改到达UI时,dag3
将继续保留,并且可以继续运行一小段时间。同时,您可以看到旁边的替换dag4
。两个DAG仍然可以运行。
此外,当我查看dag3
的代码时,它现在包含更改后的代码,即代码与DAG的定义不匹配:
这两个DAG会在短时间内继续可见,直到最终dag3
消失。即使这个问题发生的时间很短,它也可能会导致运行过时代码的一些严重错误。为什么会发生这种情况,我如何确保Airflow UI只显示基于当前存储库快照的DAG?
我的部署:
气流图:第8.6.1节。
气流图像:2.4.3-Python
在minikube中本地运行
1条答案
按热度按时间3ks5zfa01#
现在,我将一个更改推送到master分支,将dag_id重命名为dag_id ='my_dag4',不对文件或文件名进行其他更改-这应该有效地定义了一个新的DAG dag 4并删除dag 3。
不正确。重命名现有的dag_id将注册新的dag,但不会删除以前的dag记录。这些记录仍存在于数据库中。如果要删除它们,则需要单击以前的dag_id上的删除按钮以清除所有关联的记录(dag运行、任务示例等)。请记住,用户可能需要保留以前的dag记录以用于历史记录/审计/其他原因。
注意,Airflow的工作方式是seralized dags。这意味着您的代码同步到DB中,所有Airflow服务(包括Web服务器)都看到数据库表。因此,从Web服务器的Angular 来看,它看到dag表中的两条记录。它不知道您对python文件做了什么更改。
在Airflow 2.5.0中,添加了parsing_cleanup_interval配置以自动停用过时DAG。
检查应停用的过时DAG(预期文件中不再存在的DAG)以及不再引用并应标记为孤立的数据集的频率(秒)。