我正在使用Airflow的python操作符调用python函数。ERROR发生在try/except块中。
def python_callable_new():
print("Inside python callable ...")
import psycopg2
try:
print("attempting database connection from python method.. ")
conn = psycopg2.connect('postgres_defined_connection')
print("success. ")
except Exception as error:
print("failed: ")
print (error)
return 'End of callable. '
with dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
do_python_task = PythonOperator(
task_id = 'do-py-operation',
python_callable= python_callable_new,
)
extract_source_data = PostgresOperator(
task_id='extract-cb-source-data',
postgres_conn_id='postgres_defined_connection',
sql='./sql_scripts/extract_csv_data.sql'
)
# csv_to_postgres
start_task >> do_python_task >> extract_source_data >> stop_task
字符串
基本上,我的问题是
- 我如何使用我的“postgres_defined_connection”**在我的python函数中连接到postgres?
- 当我使用PostgresOperator时,它连接得很好,正如在extract_source_data任务中所看到的那样,但我需要在可调用函数中使用它。
- 出现的错误是invalid dsn:missing“=”after“postgres_defined_connection”in connection info string
(FYI- 我将postgres_defined_connection存储在一个单独的connections.py中,该网站使用sqlalchemy引擎和PostgresHook)
1条答案
按热度按时间oxcyiej71#
psycopg2.connect
需要连接参数。如果您将连接参数格式化为空格分隔的键/值对,则可以向它们传递单个字符串。这就是为什么它会给您错误消息缺少“="。请参阅psycopg documentation以了解更多信息。
要在Airflow中连接到Postgres数据库,您可以利用PostgresHook,前提是您已经创建了连接。
字符串
你也可以用纯Python代码来实现。
型