我们正在尝试报告Airflow作业执行期间发生的故障,并从日志中捕获异常,然后通过电子邮件发送。
目前,我们在用pyt编写的失败电子邮件中显示以下内容。
failure_msg = """
:red_circle: Task Failed.
*Dag*: {dag}
*Task*: {task}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag=context.get('task_instance').dag_id,
task=context.get('task_instance').task_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url
我正在寻找捕捉异常消息从气流。上述消息显示高层次的信息,如达格ID,任务ID,网址等。
参考下面的Airflow文档,但到目前为止还没有获得捕获确切异常消息的任何方法。
目前,我在其中一个DAG中手动抛出错误,如下所示
def scan_table():
try:
raise ValueError('File not parsed completely/correctly')
logging.info(message)
except Exception as error:
raise ValueError('File not parsed completely/correctly inside exception block')
print("Error while fetching data from backend", error)
尝试使用此exception=context.get('task_instance').log.exception
,但显示为
<bound method Logger.exception of <Logger airflow.task (INFO)>>
在Airflow UI的DAG日志输出中,异常抛出为:
[2023-01-04, 09:05:07 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/bitnami/airflow/dags/git_airflow-dags/scan_table.py", line 37, in scan_table
raise ValueError('File not parsed completely/correctly')
ValueError: File not parsed completely/correctly
我想在Python代码片段的failure_msg中捕获这部分日志和打印。
1条答案
按热度按时间zxlwwiss1#
我能找到答案。
我们可以使用下面的代码访问Airflow抛出的异常。
context.get(“异常”)