python 气流传感器无法访问上下文变量

6ju8rftf  于 2023-05-05  发布在  Python
关注(0)|答案(1)|浏览(90)

我正在尝试构建一个传感器,它可以读取dag参数(当你用config触发dag时,你可以改变这些参数),以知道要等待多长时间。

from airflow.decorators import dag, task, task_group
from datetime import date, datetime, timedelta
import re

params = {
    "time":"8h"
}

def parse_time(time_str):
    regex = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
    parts = regex.match(time_str)
    if parts is None: return timedelta()
    time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
    return timedelta(**time_params)

@dag(
    dag_id="test",
    start_date=datetime(2023, 5, 1),
    schedule_interval =  None,
    catchup=False,
    default_args={"retries":0},
    params=params,
    tags=["test","debug"],
)
def test():
    @task.sensor(
        task_id=f"run_after",
        poke_interval=60 * 5,
        timeout=60 * 60 * 24 * 3,
        mode="reschedule"
    )
    def run_after(**context):
        run_after = context["params"].get("time","0h")
        print(run_after)
        target_time = parse_time(run_after)
        time_since_midnight = datetime.now() - datetime.strptime(context["data_interval_end"].strftime("%Y%m%d"),"%Y%m%d")
        return time_since_midnight > target_time
    
    t=run_after()
test()

似乎无法在传感器中访问上下文变量(空dict)...我在正常任务中访问它没有问题。我做错了吗?是否有变通办法?(我想我可以在另一个任务中访问参数,并通过XComs将数据发送到传感器,但这会使dag更加复杂,而且似乎不是正确的方法)。
谢谢你的帮助

jyztefdp

jyztefdp1#

好的,我可以通过

from airflow.operators.python import get_current_context

然后在任务内部

def run_after():
    context=get_current_context()

所以问题解决了,无论如何,如果上下文也在kwargs中可用,那就太好了。
标记问题已解决。

相关问题