我试图使用ast找到变量的第一个赋值。例如
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
pg_connection0="airflow_db1"
pg_connection=pg_connection0
with DAG("demo_processing_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None) as dag:
task1 = BashOperator(
task_id="dag_report",
bash_command="airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
)
postgres_to_gcs_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs',
postgres_conn_id=pg_connection,
sql=f'SELECT * FROM public.dag_code;',
bucket="mybucket",
filename=f'data/dag_code.csv',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
我需要输出为postgres_conn_id=“airflow_db1”
我尝试探索ast库,但我得到的输出为pg_connection=“pg_connection0”
下面是我尝试的代码。
import ast
input_code = """
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
pg_connection0="airflow_db1"
pg_connection=pg_connection0
with DAG("demo_processing_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None) as dag:
task1 = BashOperator(
task_id="dag_report",
bash_command="airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
)
postgres_to_gcs_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs',
postgres_conn_id=pg_connection,
sql=f'SELECT * FROM public.dag_code;',
bucket="mybucket",
filename=f'data/dag_code.csv',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
"""
ast_tree = ast.parse(input_code)
# find the variable assignment for postgres_conn_id and extract its value
for node in ast.walk(ast_tree):
if isinstance(node, ast.Assign) and node.targets[0].id == 'pg_connection':
if isinstance(node.value, ast.Name):
postgres_conn_id = node.targets[0].id + '="' + node.value.id + '"'
else:
print( node.value.s)
postgres_conn_id = node.targets[0].id + '="' + node.value.s + '"'
break
# print the output
print(postgres_conn_id)
1条答案
按热度按时间g6ll5ycj1#
遍历树中的节点,直到找到对常数进行的赋值。