python Airflow -如何从BigQuery表中获取数据并将其用作列表?

hsgswve4  于 2023-01-29  发布在  Python
关注(0)|答案(1)|浏览(197)

我正在尝试获取列,然后使用值创建文件名。
我尝试了下面的方法,应该会创建一个csv文件,其名称为指定列中的第一个值,但当我尝试使用它时,它却显示列表为空

bq_data = []
get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='SK22',
    table_id='current_times',
    max_results='100',
    selected_fields='current_timestamps',
)

def process_data_from_bq(**kwargs):
    ti = kwargs['ti']
    global bq_data
    bq_data = ti.xcom_pull(task_ids='get_data_from_bq')

process_data = PythonOperator(
        task_id='process_data_from_bq',
        python_callable=process_data_from_bq,
        provide_context=True)
run_export = BigQueryToCloudStorageOperator(
        task_id=f"save_data_on_storage{str(bq_data[0])}",
        source_project_dataset_table="a-data-set",
        destination_cloud_storage_uris=[f"gs://europe-west1-airflow-bucket/data/test{bq_data[0]}.csv"],
        export_format="CSV",
        field_delimiter=",",
        print_header=False,
        dag=dag,
    )

get_data >> process_data >> run_export
g6ll5ycj

g6ll5ycj1#

我觉得在BigQueryGetDataOperatorBigQueryToCloudStorageOperator之间不需要使用PythonOperator,可以直接在BigQueryToCloudStorageOperator中使用xcom pull

get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='SK22',
    table_id='current_times',
    max_results='100',
    selected_fields='current_timestamps',
)

run_export = BigQueryToCloudStorageOperator(
        task_id="save_data_on_storage",
        source_project_dataset_table="a-data-set",
        destination_cloud_storage_uris=[f"gs://europe-west1-airflow-bucket/data/test" + "{{ ti.xcom_pull(task_ids='get_data_from_bq')[0] }}" + ".csv"],
        export_format="CSV",
        field_delimiter=",",
        print_header=False,
        dag=dag,
    )

get_data >> run_export

destination_cloud_storage_uris是一个templated参数,您可以在其中传递Jinja模板语法。
我没有测试语法,但它应该工作。
我也不建议你使用像bq_data这样的全局变量在运算符之间传递数据,因为这样做行不通,你需要想办法直接在运算符中使用xcomJinja模板或者获取当前Context的运算符)。
我还注意到您没有使用最新的Airflow运算符:

  • 大查询到云存储操作符-〉BigQueryToGCSOperator
    如果您希望使用BigQueryGetDataOperator操作符提供的所有列表,并从中计算目标URI列表,我建议您使用另一种解决方案
from typing import List, Dict

from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator

class CustomBigQueryToGCSOperator(BigQueryToGCSOperator):

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

    def execute(self, context):
        task_instance = context['task_instance']
        data_from_bq: List[Dict] = task_instance.xcom_pull('get_data_from_bq')

        destination_cloud_storage_uris: List[str] = list(map(self.to_destination_cloud_storage_uris, data_from_bq))

        self.destination_cloud_storage_uris = destination_cloud_storage_uris

        super(CustomBigQueryToGCSOperator, self).execute(context)

    def to_destination_cloud_storage_uris(self, data_from_bq: Dict) -> str:
        return f"gs://europe-west1-airflow-bucket/data/test{data_from_bq['your_field']}.csv"

一些解释:

  • 我创建了一个扩展BigQueryToGCSOperator的自定义运算符
  • execute方法中,我可以访问操作符的当前上下文
  • 从上下文中,我可以从BigQueryGetDataOperator提供的BQ中检索列表。我假设它是Dict列表,但您必须确认这一点
  • 我从这个字典列表计算出一个目标GCS URI列表
  • 我将计算出的目标GCS URI分配给运算符中的相应字段

这个解决方案的优点是,您可以更灵活地应用基于xcom值的逻辑。
缺点是有点冗长。

相关问题