python 如何从Airflow中的变量呈现默认的电子邮件地址列表?

z18hc3ub  于 2023-01-01  发布在  Python
关注(0)|答案(2)|浏览(109)

我有许多Airflow DAG,我希望在任何任务失败时自动向同一收件人列表(存储在Airflow变量中)发送电子邮件通知,因此我使用在DAG级别定义的以下默认操作员配置:

dag = DAG(
    ...
    default_args = {
        ...
        "email": "{{ ','.join( var.json.get("my_email_list", []) ) }}",
        "email_on_failure": True,
        ...
    },
    ...
)

不幸的是,email参数似乎不支持模板化,它只是原样传递到电子邮件后端而没有呈现,因此我的方法不起作用。
有谁能为我的特殊情况提出一个合适的解决方案吗?我真的不想在源代码中硬编码电子邮件地址列表,因为将它们存储在Airflow变量中会提供更大的灵活性。

xv8emn3q

xv8emn3q1#

有两种方法可以读取Airflow变量:
1.使用jinja模板在任务执行时读取它们
1.在创建任务之前,使用类Variable在调度程序中读取它们:

from airflow.models import Variable

emails = Variable.get("my_email_list", deserialize_json=True)
bhmjp9jg

bhmjp9jg2#

以下是我的解决方案,虽然有点笨拙,因为email既没有在BaseOperator中模板化,也没有在任务级别调整template_fields的方法(我所说的任务是指一个操作符的配置示例),而且我并不真的想为每个内置操作符定义虚拟子类,只是为了将email添加到template_fields,例如:

from airflow.operators.python import PythonOperator

class MyPythonOperator(PythonOperator):
    template_fields = PythonOperator.template_fields + ("email",)  # this sucks!

所以我决定坚持使用下面类似猴子打补丁的方法,动态地将自定义模板字段添加到当前模块作用域中所有可见/可访问的操作符中(函数必须在共享/公共模块中的某个地方定义,然后在每个DAG的模块级别导入和调用):

from airflow.models import BaseOperator

def extend_operator_template_fields_with(
    extra_template_fields,
    base_operator_class=BaseOperator,
) -> None:
    for operator_class in base_operator_class.__subclasses__():
        # Use a dict (w/o values) instead of a set to keep the original order of template fields for the operator class.
        template_fields_dict = dict.fromkeys(operator_class.template_fields)
        template_fields_dict.update(dict.fromkeys(extra_template_fields))
        operator_class.template_fields = tuple(template_fields_dict.keys())

        extend_operator_template_fields_with(extra_template_fields, base_operator_class=operator_class)

我仍然愿意接受一个更优雅的解决方案,只是还没有找到一个更好的(我使用的是气流2.2.5)。

相关问题