数据流中的valueerror:无效的gcs位置:无

pprl5pva  于 2021-09-08  发布在  Java
关注(0)|答案(0)|浏览(274)

我正在尝试从gcs bucket加载数据,并将内容发布到pubsub和bigquery。以下是我的管道选项:

  1. options = PipelineOptions(
  2. project = project,
  3. temp_location = "gs://dataflow-example-bucket6721/temp21/",
  4. region = 'us-east1',
  5. job_name = "dataflow2-pubsub-09072021",
  6. machine_type = 'e2-standard-2',
  7. )

这是我的管道

  1. data = p | 'CreateData' >> beam.Create(sum([fileName()], []))
  2. jsonFile = data | "filterJson" >> beam.Filter(filterJsonfile)
  3. JsonData = jsonFile | "JsonData" >> beam.Map(readFromJson)
  4. split_data = JsonData | 'Split Data' >> ParDo(CheckForValidData()).with_outputs("ValidData", "InvalidData")
  5. ValidData = split_data.ValidData
  6. InvalidData = split_data.InvalidData
  7. data_ = split_data[None]
  8. publish_data = ValidData | "Publish msg" >> ParDo(publishMsg())
  9. ToBQ = ValidData | "To BQ" >> beam.io.WriteToBigQuery(
  10. table_spec,
  11. #schema=table_schema,
  12. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  13. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

数据在interactiverunner中流动良好,但在dataflowrunner中显示错误,如
valueerror:无效的gcs位置:无。使用文件加载方法写入bigquery需要提供一个gcs位置来写入要加载到bigquery中的文件。请通过writetobigquery构造函数中的自定义\u gcs\u temp\u位置或回退选项--temp\u location提供gcs存储桶,或将method=“streaming\u inserts”传递给writetobigquery[运行“[15]:到bq/bigquerybatchfileloads/generatefileprefix”]
显示地面军事系统位置错误,建议添加临时位置。但我已经添加了临时位置。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题