我正在尝试从gcs bucket加载数据,并将内容发布到pubsub和bigquery。以下是我的管道选项:
options = PipelineOptions(
project = project,
temp_location = "gs://dataflow-example-bucket6721/temp21/",
region = 'us-east1',
job_name = "dataflow2-pubsub-09072021",
machine_type = 'e2-standard-2',
)
这是我的管道
data = p | 'CreateData' >> beam.Create(sum([fileName()], []))
jsonFile = data | "filterJson" >> beam.Filter(filterJsonfile)
JsonData = jsonFile | "JsonData" >> beam.Map(readFromJson)
split_data = JsonData | 'Split Data' >> ParDo(CheckForValidData()).with_outputs("ValidData", "InvalidData")
ValidData = split_data.ValidData
InvalidData = split_data.InvalidData
data_ = split_data[None]
publish_data = ValidData | "Publish msg" >> ParDo(publishMsg())
ToBQ = ValidData | "To BQ" >> beam.io.WriteToBigQuery(
table_spec,
#schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
数据在interactiverunner中流动良好,但在dataflowrunner中显示错误,如
valueerror:无效的gcs位置:无。使用文件加载方法写入bigquery需要提供一个gcs位置来写入要加载到bigquery中的文件。请通过writetobigquery构造函数中的自定义\u gcs\u temp\u位置或回退选项--temp\u location提供gcs存储桶,或将method=“streaming\u inserts”传递给writetobigquery[运行“[15]:到bq/bigquerybatchfileloads/generatefileprefix”]
显示地面军事系统位置错误,建议添加临时位置。但我已经添加了临时位置。
暂无答案!
目前还没有任何答案,快来回答吧!