python 如何从Airflow捕获错误日志并通过邮件发送?

7hiiyaii  于 2023-01-08  发布在  Python
关注(0)|答案(1)|浏览(245)

我们正在尝试报告Airflow作业执行期间发生的故障,并从日志中捕获异常,然后通过电子邮件发送。
目前,我们在用pyt编写的失败电子邮件中显示以下内容。

failure_msg = """
            :red_circle: Task Failed.
            *Dag*: {dag}  
            *Task*: {task}
            *Execution Time*: {exec_date}
            *Log Url*: {log_url} 
            """.format(
        dag=context.get('task_instance').dag_id,
        task=context.get('task_instance').task_id,
        ti=context.get('task_instance'),
        exec_date=context.get('execution_date'),
        log_url=context.get('task_instance').log_url

我正在寻找捕捉异常消息从气流。上述消息显示高层次的信息,如达格ID,任务ID,网址等。
参考下面的Airflow文档,但到目前为止还没有获得捕获确切异常消息的任何方法。
目前,我在其中一个DAG中手动抛出错误,如下所示

def scan_table():
    try:
        raise ValueError('File not parsed completely/correctly')
        logging.info(message)
    except Exception as error:
        raise ValueError('File not parsed completely/correctly inside exception block')
        print("Error while fetching data from backend", error)

尝试使用此exception=context.get('task_instance').log.exception,但显示为

<bound method Logger.exception of <Logger airflow.task (INFO)>>

在Airflow UI的DAG日志输出中,异常抛出为:

[2023-01-04, 09:05:07 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/bitnami/airflow/dags/git_airflow-dags/scan_table.py", line 37, in scan_table
    raise ValueError('File not parsed completely/correctly')
ValueError: File not parsed completely/correctly

我想在Python代码片段的failure_msg中捕获这部分日志和打印。

zxlwwiss

zxlwwiss1#

我能找到答案。
我们可以使用下面的代码访问Airflow抛出的异常。
context.get(“异常”)

相关问题