postgresql 如何在Airflow PostgresOperator中添加模板数据库

4smxwvx5  于 2024-01-07  发布在  PostgreSQL
关注(0)|答案(1)|浏览(130)

我有一个带有PostgresOperator的Airflow dag来执行SQL查询。我想切换到不同的数据库(使用相同的连接)与配置(运行w/config)。但它给出了一个错误database "{{ dag_run.conf['database'] }}" does not exist
我尝试创建一个自定义的postgresql操作符与类似的代码here
我在下面添加了代码,但它给了我同样的错误(不是渲染模板)

from airflow.providers.postgres.operators.postgres import PostgresOperator as _PostgresOperator
class PostgresOperator(_PostgresOperator):
    template_fields = [*_PostgresOperator.template_fields, "database"]

字符串

kxkpmulp

kxkpmulp1#

PostgresOperator已弃用。您应该迁移到SQLExecuteQueryOperator
PostgresOperator中的database的等价物是SQLExecuteQueryOperator中的schema,如下所示。
由于SQLExecuteQueryOperator是泛型操作符,它允许通过hook_params传递不同的钩子参数。然而,hook_params不是模板字段,因此您应该子类SQLExecuteQueryOperator将其添加为模板字段。

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

class MySQLExecuteQueryOperator(SQLExecuteQueryOperator):
    template_fields: Sequence[str] = tuple({"hook_params"} | set(SQLExecuteQueryOperator.template_fields))

with DAG('sql_example', start_date=datetime(2023, 01, 1), schedule=None):

    MySQLExecuteQueryOperator(task_id='postgres',
                              conn_id='postgres_default',
                              sql="SELECT 1",
                              hook_params={"schema": "{{ ds }}"} # ds is not a valid schema just to show render works
                              )

字符串
这将给予给予:


的数据
显然,2023-07-28不是一个有效的模式,但这只是为了表明渲染是有效的。

相关问题