我有许多Airflow DAG,我希望在任何任务失败时自动向同一收件人列表(存储在Airflow变量中)发送电子邮件通知,因此我使用在DAG级别定义的以下默认操作员配置:
dag = DAG(
...
default_args = {
...
"email": "{{ ','.join( var.json.get("my_email_list", []) ) }}",
"email_on_failure": True,
...
},
...
)
不幸的是,email
参数似乎不支持模板化,它只是原样传递到电子邮件后端而没有呈现,因此我的方法不起作用。
有谁能为我的特殊情况提出一个合适的解决方案吗?我真的不想在源代码中硬编码电子邮件地址列表,因为将它们存储在Airflow变量中会提供更大的灵活性。
2条答案
按热度按时间xv8emn3q1#
有两种方法可以读取Airflow变量:
1.使用jinja模板在任务执行时读取它们
1.在创建任务之前,使用类
Variable
在调度程序中读取它们:bhmjp9jg2#
以下是我的解决方案,虽然有点笨拙,因为
email
既没有在BaseOperator
中模板化,也没有在任务级别调整template_fields
的方法(我所说的任务是指一个操作符的配置示例),而且我并不真的想为每个内置操作符定义虚拟子类,只是为了将email
添加到template_fields
,例如:所以我决定坚持使用下面类似猴子打补丁的方法,动态地将自定义模板字段添加到当前模块作用域中所有可见/可访问的操作符中(函数必须在共享/公共模块中的某个地方定义,然后在每个DAG的模块级别导入和调用):
我仍然愿意接受一个更优雅的解决方案,只是还没有找到一个更好的(我使用的是气流2.2.5)。