如何使用ast在python中找到变量的第一次赋值

nlejzf6q  于 2023-04-13  发布在  Python
关注(0)|答案(1)|浏览(99)

我试图使用ast找到变量的第一个赋值。例如

import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator

pg_connection0="airflow_db1"
pg_connection=pg_connection0

with DAG("demo_processing_dag",
            start_date=datetime.datetime(2021, 1, 1),
            schedule_interval=None) as dag:

        task1 = BashOperator(
                task_id="dag_report",
                bash_command="airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
            )
        postgres_to_gcs_task = PostgresToGCSOperator(
            task_id=f'postgres_to_gcs',
            postgres_conn_id=pg_connection,
            sql=f'SELECT * FROM public.dag_code;',
            bucket="mybucket",
            filename=f'data/dag_code.csv',
            export_format='csv',
            gzip=False,
            use_server_side_cursor=False,
        )

我需要输出为postgres_conn_id=“airflow_db1”
我尝试探索ast库,但我得到的输出为pg_connection=“pg_connection0”
下面是我尝试的代码。

import ast

input_code = """
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator

pg_connection0="airflow_db1"
pg_connection=pg_connection0

with DAG("demo_processing_dag",
            start_date=datetime.datetime(2021, 1, 1),
            schedule_interval=None) as dag:

        task1 = BashOperator(
                task_id="dag_report",
                bash_command="airflow dags report --output json >> /home/airflow/gcs/data/parse.json"
            )
        postgres_to_gcs_task = PostgresToGCSOperator(
            task_id=f'postgres_to_gcs',
            postgres_conn_id=pg_connection,
            sql=f'SELECT * FROM public.dag_code;',
            bucket="mybucket",
            filename=f'data/dag_code.csv',
            export_format='csv',
            gzip=False,
            use_server_side_cursor=False,
        )
"""

ast_tree = ast.parse(input_code)

# find the variable assignment for postgres_conn_id and extract its value
for node in ast.walk(ast_tree):
    if isinstance(node, ast.Assign) and node.targets[0].id == 'pg_connection':
        if isinstance(node.value, ast.Name):

            postgres_conn_id = node.targets[0].id + '="' + node.value.id + '"'
        else:
            print( node.value.s)
            postgres_conn_id = node.targets[0].id + '="' + node.value.s + '"'
        break

# print the output
print(postgres_conn_id)
g6ll5ycj

g6ll5ycj1#

遍历树中的节点,直到找到对常数进行的赋值。

for node in ast.walk(ast_tree):
    if isinstance(node, ast.keyword) and node.arg == 'postgres_conn_id':
        value = node.value
        while True:
            for node2 in ast.walk(ast_tree):
                if not isinstance(node2, ast.Assign):
                  continue
                if node2.targets[0].id == value.id:
                   value = node2.value
                   break
            if isinstance(value, ast.Constant):
                break

        print(f'{node.arg} = "{value.value}"')

相关问题