如何使用Apache Beam Python pipeline读取Excel数据?我得到一个不支持的操作数错误

g6ll5ycj  于 2023-05-30  发布在  Apache
关注(0)|答案(1)|浏览(183)

我尝试使用Apache Beam Python管道读取云存储桶中的Excel文件,但它不起作用。我试着用Pandas阅读,但我不能使用Pcollection中的数据。
你知道怎么做吗?

def read_data_from_excel_file():
  bucket_name = "nidec-ga-transient"
  blob_name = "ConcessoesERestricoes.xlsx"

  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)
  blob = bucket.blob(blob_name)

  data_bytes = blob.download_as_bytes()

  df = pd.read_excel(data_bytes, 'Lista de Gargalos')
  return df

Pipeline = (
  pipeline_load_data
    | "Importar Dados CloudStorage" >> read_data_from_excel_file()
  # | "Write_to_BQ" >> beam.io.WriteToBigQuery(
  #                            tabela,
  #                            schema=table_schema,
  #                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  #                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  #                            custom_gcs_temp_location = 'gs://ddc-test-262213-staging/henrique.klock@dojo.technology/temp' )
 )

当我运行代码时,我得到了这个错误:
TypeError:>>不支持的操作数类型:'str'和'NoneType'

jxct1oxe

jxct1oxe1#

在Apache Beam Python管道中阅读Excel数据可以使用apache_beam.io.fileio模块来实现。此模块提供以并行和分布式方式读取文件的功能。

def read_data_from_excel_file(file_pattern):
    def process_excel_file(element):
        file_name = element.metadata.path
        df = pd.read_excel(file_name, 'Lista de Gargalos')
        return df.values.tolist()

    return (
        beam
        | "MatchFiles" >> beam.io.fileio.MatchFiles(file_pattern)
        | "ReadMatches" >> beam.io.fileio.ReadMatches()
        | "ProcessExcelFile" >> beam.Map(process_excel_file)
    )

file_pattern = "gs://nidec-ga-transient/ConcessoesERestricoes.xlsx"
pipeline = beam.Pipeline()

(
    pipeline
    | "Importar Dados CloudStorage" >> read_data_from_excel_file(file_pattern)
    | "DoSomething" >> beam.Map(print)
    # Add more transformations as needed
)

pipeline.run().wait_until_finish()

相关问题