如何将XCom值作为参数传递给PythonVirtualenvOperator?

ev7lccsx  于 2023-01-04  发布在  Python
关注(0)|答案(1)|浏览(144)

气流2.2.4
我想创建PythonVirtualenvOperator,它将从另一个任务中提取它的需求列表。

with DAG(="crawler") as dag:

    @task(task_id="read_requirements")
    def read_requirements():
        requirements_file_path = "requirements.txt"
        requirements = []
        with open(requirements_file_path, "r") as f:
            requirements = f.readlines()
        return requirements

    def crawler():
        print("hello_world")

    crawler = PythonVirtualenvOperator(
        task_id="crawler",
        python_callable=crawler,
        requirements="{{ ti.pull('read_requirements') }}",
    )

    read_requirements() >> crawler

不幸的是,PythonVirtualenvOperator中的requirements字段不能识别模板,也不能将其作为普通字符串读取。

c3frrgcw

c3frrgcw1#

我设法通过将PythonVirtualenvOperator Package 在管理上下文的装饰函数中来解决这个问题。

with DAG(="crawler") as dag:

    @task(task_id="read_requirements")
    def read_requirements(scraper_path):
        requirements_file_path = f"{scraper_path}/requirements.txt"
        requirements = []
        with open(requirements_file_path, "r") as f:
            requirements = f.readlines()
        return requirements

    scraper_dir = Variable.get("workspace_dir")

    requirements = read_requirements(scraper_dir)
    
    def crawler():
        print("hello_world")

    @task(task_id="crawler_task")
    def crawler_task():
        context = airflow.operators.python.get_current_context()
        ti = context["ti"]
        requirements = ti.xcom_pull(task_ids="read_requirements")
        venv_crawler = PythonVirtualenvOperator(
            task_id="venv_crawler",
            requirements=requirements,
            python_callable=crawler,
            dag=dag,
        )
        venv_crawler.execute(context=context)

    requirements >> crawler_task()

这感觉不自然,谢天谢地,你可以通过.txt文件在较新的气流版本,但它确实这样工作。

相关问题