kubernetes Airflow -导入包含依赖项的DAG

pkln4tw6  于 2023-11-17  发布在  Kubernetes
关注(0)|答案(1)|浏览(153)

我使用官方的Helm Chart在kubernetes上部署了airflow。我使用的是KubernetesExecutor和git-sync。
我正在为我的Web服务器和我的工人使用单独的Docker映像-每个DAG都有自己的Docker映像。我在airflow主页上遇到DAG导入错误。例如,如果我的一个DAG使用pandas,那么我将得到

Broken DAG: [/opt/airflow/dags/repo/dags/airflow_demo/ieso.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/repo/dags/project1/dag1.py", line 7, in <module>
    from pandas import read_parquet
ModuleNotFoundError: No module named 'pandas'

字符串
我没有在webserver或scheduler docker镜像上安装pandas,因为如果我理解正确的话,你不应该在这些镜像上安装单独的依赖项。当我在scheduler pod上运行airflow dags list-import-errors时,我得到了同样的错误。我确实在worker pod上安装了pandas,但它没有运行,因为通过这些错误无法发现DAG。
如何在不将pandas安装到调度程序或Web服务器的情况下使airflow发现此DAG?我知道在两者上安装它将解决此问题,但我对这种方式不感兴趣。

wfveoks0

wfveoks01#

这是伟大的制作一个答案奠定了OP的评论😜
在评论中,@user430953提供了Airflow文档的链接,其中指出:
影响DAG加载时间的一个重要因素可能被Python开发人员忽略,那就是顶级导入可能会花费大量时间,并且会产生大量开销,这可以通过将它们转换为Python调用内的本地导入来轻松避免。
这是有道理的:调度程序将监视Dag文件夹,并尝试通过执行在该文件夹中找到的.py文件来加载DAG类的示例,就好像这些文件是常规的"可导入" Python模块一样(因为他们是)然后是“钓鱼”实体(对象?示例?)的类型DAG的模块的变量。这发生,更具体地说,在这里,然后在这里(非常广泛地说,如果你好奇的话)
与任何常规Python模块一样,Python解释器将在导入时"运行"代码,因此将尝试任何顶级导入。如果模块导入调度器的Python解释器没有的包,则将抛出ImportError
由于OP表示...
我确实在工人舱里安装了Pandas
.这意味着实际的工作是可行的。我们只需要避免调度程序运行此导入。
最快的方法是将import语句移动到函数本身,因此pandas模块仅在实际工作执行时导入(毫不奇怪,在 * worker * 机器中.
所以从:

import pandas as pd

with DAG(...) as dag:
    @task
    def some_pandas_work(**context):
        # ...

字符串
去...

with DAG(...) as dag:
    @task
    def some_pandas_work(**context):
        import pandas as pd
        # ...


。应该可以

相关问题