python-3.x 使用电子邮件操作员通知Airflow DAG中其他操作员的状态

o7jaxewo  于 2023-02-10  发布在  Python
关注(0)|答案(1)|浏览(133)

假设气流中有一个dag,其定义文件如下所示:

import airflow
from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner' : 'airflow',
    'retries' : 0
}

dag = DAG(
    dag_id = 'email_notification_test',
    start_date = airflow.utils.dates.days_ago(2),
    default_args = default_args,
    schedule_interval = None,
    catchup=False
)

start = DummyOperator(task_id = 'start',
                        dag = dag)

def built_to_fail(ds, **kwargs):
    raise Exception('This operator fails')

def built_to_succeed(ds, **kwargs):
    print('This Operator succeeds')
    return

operator_that_fails = PythonOperator(
    task_id='operator_that_fails',
    python_callable=built_to_fail,
    dag = dag
)

operator_that_succeeds = PythonOperator(
    task_id='operator_that_succeeds',
    python_callable=built_to_succeed,
    dag = dag
)

email = EmailOperator(
    task_id='send_email',
    to='<email address>',
    subject='DAG Run Complete',
    html_content="""run_id: {{ run_id }} </p>
                    dag_run: {{ dag_run }} </p>
                    dag_run.id: {{ dag_run.id }} </p>
                    dag_run.state: {{ dag_run.state }}""",
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

start >> [operator_that_fails, operator_that_succeeds] >> email

DAG TLDR:DAG有两个操作符,一个失败,一个成功。在两个操作符都执行完毕后,运行第三个任务-电子邮件操作符-发送前面操作符状态的通知摘要。以下是webui图形视图作为直观帮助:

正如我在email操作符的html_content部分所演示的,你可以使用jinja来引用对象和它们的属性,但是我真正需要的不仅仅是报告dag本身的状态,还有已经运行的单个操作符的状态,比如:

html_content="""operator_that_fails status : {{ <dynamic reference to preceeding task status> }} </p>
               operator_that_succeeds status: {{ <ditto for the other task> }}"""

我试图通过研究Airflow对象模型文档(即this page for the "dag run" object)来实现这一点,但无法找到获取先前任务状态的好方法。
有人知道怎样才能最好地实现我的目标吗?

ipakzgxi

ipakzgxi1#

您可以使用模板来访问上下文,如下所示:

def contact_endpoints(**context):
    return ("subject text", "body text")

execute =  PythonOperator(
        task_id='source_task_id',
        python_callable=contact_endpoints,
        provide_context=True,
        do_xcom_push = True
    )

email = EmailOperator(
    task_id='send_mail',
    to="email@email.com",
    subject=f"""[{dag.dag_id}] {{{{ task_instance.xcom_pull(task_ids='source_task_id', key='return_value')[0] }}}}""",
    html_content=f"""
        <br><br>
        <h3>Report</h3>
        {{{{ task_instance.xcom_pull(task_ids='source_task_id', key='return_value')[1] }}}}
        """)

在我的例子中,id在xcom-pull中被引用的任务返回一个元组(“part to insert in subject”,“part to insert in body”),必须在引用任务上将do_xcom_push设置为true。

相关问题