python-3.x Apache_beam[gcp] -分组依据结果以分隔文件夹(GCP)

lf5gs5x2  于 2023-02-26  发布在  Python
关注(0)|答案(1)|浏览(126)

输入数据需要根据特定列进行隔离,并存储在GCP中的单独文件夹中。在下面的代码片段中,计划从writeEachGroupToGCP类写入GCP。尝试从此类创建子管道或直接写入gcp,均失败。如果有更好的选择,请提供建议
InputData:包含列(ID、Ename、HireDate、ManagerName)的员工数据

class writeEachGroupToGCP(DoFn):
  def process(self, data,gcp_out_prefix):
    (partition, tble)=data
    #TODO: Write to GCP, gcp_out_prefix+"/Manager={}".format(partition)
    
    return data

p1 = beam.Pipeline()

(
    p1
     | beam.io.ReadFromText("indata/dept_data.txt")
     | beam.Map(lambda x: str(x).split(","))
     | beam.Map(lambda x: (x[3], x)) 
     | beam.GroupByKey()
     | beam.ParDo(writeEachGroupToGCP())
)
p1.run()
des4xlb0

des4xlb01#

您似乎希望根据ManagerName列将数据写入GCP。您可以使用Apache Beam中的Google Cloud Storage(GCS)API实现此操作。请尝试以下操作:

from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.io.gcp.internal.clients.storage import StorageObjects
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import DoFn, ParDo, GroupByKey, Pipeline

import json

class writeEachGroupToGCP(DoFn):
  def __init__(self, gcs_path):
    self.gcs_path = gcs_path

  def process(self, data):
    (manager, rows) = data
    with storage.Client().bucket(self.gcs_path).blob("Manager={}/data.json".format(manager)).open('w') as f:
      for row in rows:
        f.write(json.dumps(row) + '\n')
    return data

options = PipelineOptions()
gcp_project = options.view_as(GoogleCloudOptions).project
gcp_staging_location = options.view_as(GoogleCloudOptions).staging_location
gcp_temp_location = options.view_as(GoogleCloudOptions).temp_location
gcp_input_path = "gs://<your-bucket>/<your-input-path>"
gcp_output_path = "gs://<your-bucket>/<your-output-path>"

with Pipeline(options=options) as p1:
  (
    p1
    | "Read Data" >> beam.io.ReadFromText(gcp_input_path)
    | "Parse CSV" >> beam.Map(lambda x: tuple(x.split(",")))
    | "Group By Manager" >> beam.GroupByKey()
    | "Write To GCS" >> beam.ParDo(writeEachGroupToGCP(gcp_output_path))
  )

第四章

相关问题