Airflow的基本概念是不允许在不规则的时间间隔内触发一个Dag。实际上,我想在每次有新文件放到远程服务器上时触发一个dag(比如https,sftp,s3 ...),但是Airflow需要一个定义好的data_interval。例如,使用HttpSensor只在预定的时间窗口内工作一次。在我当前的例子中,我使用redis来保持当前文件的状态。
""" DAG for operational District heating """
import json
from datetime import datetime
import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor
def check_up_for_new_file(
response: requests.models.Response,
) -> bool:
""" uses redis to check if a new file is on the server"""
current_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in response.headers.items()
}
conn = redis.Redis(host='redis', port=6379)
recent_header = conn.hgetall("header_dict")
recent_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in recent_header.items()
}
if 'Content-Length' not in recent_header.keys():
conn.hmset("header_dict", current_header)
return False
if recent_header['Content-Length'] != current_header['Content-Length']:
conn.hmset("header_dict", current_header)
return True
else:
return False
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['info@airflow.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'concurrency': 6
}
with DAG(
dag_id='check_ext',
start_date=datetime(2022, 11, 24),
tags=['test'],
catchup=False,
default_args=default_args,
) as dag:
check_for_new_file = HttpSensor(
task_id='check_up_for_new_file',
http_conn_id='_conn_id',
endpoint='<some-url>',
poke_interval=20,
dag=dag,
response_check=check_up_for_new_file
)
invoke_lambda_function_dwd_obs = AwsLambdaInvokeFunctionOperator(
task_id='run_etl_of_dwd_observation_data',
function_name='DWD_OBSERVATION_DATA',
payload=json.dumps({"source_type": "dwd_national"}),
)
check_for_new_file >> invoke_lambda_function_dwd_obs
如何在成功后重新启动此dag以再次检查新文件?
1条答案
按热度按时间szqfcxe21#
您必须注意以下两点,以使Dag在每次传感器识别外部事件时运行。
1.计划间隔:使用预设
None
1.使用TriggerDagRun运算符
它是通过设计创建一个无限循环来 checkout 外部
对于一些不熟悉HttpSensor的人来说,服务器的基本路径必须定义为
AIRFLOW_CONN_{_CONN_ID}=https://remote_server.com
环境变量。然后,您可以通过匹配_CONN_ID来调用连接。