我正在尝试获取列,然后使用值创建文件名。
我尝试了下面的方法,应该会创建一个csv文件,其名称为指定列中的第一个值,但当我尝试使用它时,它却显示列表为空
bq_data = []
get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='SK22',
table_id='current_times',
max_results='100',
selected_fields='current_timestamps',
)
def process_data_from_bq(**kwargs):
ti = kwargs['ti']
global bq_data
bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
process_data = PythonOperator(
task_id='process_data_from_bq',
python_callable=process_data_from_bq,
provide_context=True)
run_export = BigQueryToCloudStorageOperator(
task_id=f"save_data_on_storage{str(bq_data[0])}",
source_project_dataset_table="a-data-set",
destination_cloud_storage_uris=[f"gs://europe-west1-airflow-bucket/data/test{bq_data[0]}.csv"],
export_format="CSV",
field_delimiter=",",
print_header=False,
dag=dag,
)
get_data >> process_data >> run_export
1条答案
按热度按时间g6ll5ycj1#
我觉得在
BigQueryGetDataOperator
和BigQueryToCloudStorageOperator
之间不需要使用PythonOperator
,可以直接在BigQueryToCloudStorageOperator
中使用xcom pull
:destination_cloud_storage_uris
是一个templated参数,您可以在其中传递Jinja
模板语法。我没有测试语法,但它应该工作。
我也不建议你使用像
bq_data
这样的全局变量在运算符之间传递数据,因为这样做行不通,你需要想办法直接在运算符中使用xcom
(Jinja
模板或者获取当前Context
的运算符)。我还注意到您没有使用最新的
Airflow
运算符:如果您希望使用BigQueryGetDataOperator操作符提供的所有列表,并从中计算目标URI列表,我建议您使用另一种解决方案:
一些解释:
BigQueryToGCSOperator
的自定义运算符execute
方法中,我可以访问操作符的当前上下文BigQueryGetDataOperator
提供的BQ
中检索列表。我假设它是Dict列表,但您必须确认这一点GCS
URI列表GCS
URI分配给运算符中的相应字段这个解决方案的优点是,您可以更灵活地应用基于xcom值的逻辑。
缺点是有点冗长。