python-3.x 如何在每次外部事件状态为真时触发气流中的DAG(基于事件的触发)

jfgube3f  于 2022-11-26  发布在  Python
关注(0)|答案(1)|浏览(114)

Airflow的基本概念是不允许在不规则的时间间隔内触发一个Dag。实际上,我想在每次有新文件放到远程服务器上时触发一个dag(比如https,sftp,s3 ...),但是Airflow需要一个定义好的data_interval。例如,使用HttpSensor只在预定的时间窗口内工作一次。在我当前的例子中,我使用redis来保持当前文件的状态。

""" DAG for operational District heating """
import json
from datetime import datetime

import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor

def check_up_for_new_file(
        response: requests.models.Response,
) -> bool:
    """ uses redis to check if a new file is on the server"""
    current_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in response.headers.items()
    }

    conn = redis.Redis(host='redis', port=6379)
    recent_header = conn.hgetall("header_dict")

    recent_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in recent_header.items()
    }

    if 'Content-Length' not in recent_header.keys():
        conn.hmset("header_dict", current_header)
        return False

    if recent_header['Content-Length'] != current_header['Content-Length']:
        conn.hmset("header_dict", current_header)
        return True
    else:
        return False

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['info@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'concurrency': 6
}

with DAG(
        dag_id='check_ext',
        start_date=datetime(2022, 11, 24),
        tags=['test'],
        catchup=False,
        default_args=default_args,
) as dag:
    check_for_new_file = HttpSensor(
        task_id='check_up_for_new_file',
        http_conn_id='_conn_id',
        endpoint='<some-url>',
        poke_interval=20,
        dag=dag,
        response_check=check_up_for_new_file
    )
    invoke_lambda_function_dwd_obs = AwsLambdaInvokeFunctionOperator(
        task_id='run_etl_of_dwd_observation_data',
        function_name='DWD_OBSERVATION_DATA',
        payload=json.dumps({"source_type": "dwd_national"}),
    )
    check_for_new_file >> invoke_lambda_function_dwd_obs

如何在成功后重新启动此dag以再次检查新文件?

szqfcxe2

szqfcxe21#

您必须注意以下两点,以使Dag在每次传感器识别外部事件时运行。
1.计划间隔:使用预设None
1.使用TriggerDagRun运算符
它是通过设计创建一个无限循环来 checkout 外部

""" DAG for operational District heating """
import json
from datetime import datetime

import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator

def check_up_for_new_file(
        response: requests.models.Response,
) -> bool:
    """ uses redis to check if a new file is on the server"""
    current_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in response.headers.items()
    }

    conn = redis.Redis(host='redis', port=6379)
    recent_header = conn.hgetall("header_dict")

    recent_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in recent_header.items()
    }

    if 'Content-Length' not in recent_header.keys():
        conn.hmset("header_dict", current_header)
        return False

    if recent_header['Content-Length'] != current_header['Content-Length']:
        conn.hmset("header_dict", current_header)
        return True
    else:
        return False

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['info@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'concurrency': 6
}

with DAG(
        dag_id='check_ext',
        start_date=datetime(2022, 11, 24),
        tags=['test'],
        catchup=False,
        schedule_interval=None,
        default_args=default_args,
) as dag:
    check_for_new_file = HttpSensor(
        task_id='check_up_for_new_file',
        http_conn_id='_conn_id',
        endpoint='<some-url>',
        poke_interval=20,
        dag=dag,
        response_check=check_up_for_new_file
    )
    invoke_lambda_function_dwd_obs = AwsLambdaInvokeFunctionOperator(
        task_id='run_etl_of_dwd_observation_data',
        function_name='DWD_OBSERVATION_DATA',
        payload=json.dumps({"source_type": "dwd_national"}),
    )
    restart_dag = TriggerDagRunOperator(
        task_id='restart_dag_dwd',
        trigger_dag_id='obs_dwd',
        dag=dag
    )
    check_for_new_file >> invoke_lambda_function_dwd_obs >> restart_dag

对于一些不熟悉HttpSensor的人来说,服务器的基本路径必须定义为AIRFLOW_CONN_{_CONN_ID}=https://remote_server.com环境变量。然后,您可以通过匹配_CONN_ID来调用连接。

相关问题