我正在尝试构建一个传感器,它可以读取dag参数(当你用config触发dag时,你可以改变这些参数),以知道要等待多长时间。
from airflow.decorators import dag, task, task_group
from datetime import date, datetime, timedelta
import re
params = {
"time":"8h"
}
def parse_time(time_str):
regex = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
parts = regex.match(time_str)
if parts is None: return timedelta()
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params)
@dag(
dag_id="test",
start_date=datetime(2023, 5, 1),
schedule_interval = None,
catchup=False,
default_args={"retries":0},
params=params,
tags=["test","debug"],
)
def test():
@task.sensor(
task_id=f"run_after",
poke_interval=60 * 5,
timeout=60 * 60 * 24 * 3,
mode="reschedule"
)
def run_after(**context):
run_after = context["params"].get("time","0h")
print(run_after)
target_time = parse_time(run_after)
time_since_midnight = datetime.now() - datetime.strptime(context["data_interval_end"].strftime("%Y%m%d"),"%Y%m%d")
return time_since_midnight > target_time
t=run_after()
test()
似乎无法在传感器中访问上下文变量(空dict)...我在正常任务中访问它没有问题。我做错了吗?是否有变通办法?(我想我可以在另一个任务中访问参数,并通过XComs将数据发送到传感器,但这会使dag更加复杂,而且似乎不是正确的方法)。
谢谢你的帮助
1条答案
按热度按时间jyztefdp1#
好的,我可以通过
然后在任务内部
所以问题解决了,无论如何,如果上下文也在kwargs中可用,那就太好了。
标记问题已解决。