输入数据需要根据特定列进行隔离,并存储在GCP中的单独文件夹中。在下面的代码片段中,计划从writeEachGroupToGCP类写入GCP。尝试从此类创建子管道或直接写入gcp,均失败。如果有更好的选择,请提供建议
InputData:包含列(ID、Ename、HireDate、ManagerName)的员工数据
class writeEachGroupToGCP(DoFn):
def process(self, data,gcp_out_prefix):
(partition, tble)=data
#TODO: Write to GCP, gcp_out_prefix+"/Manager={}".format(partition)
return data
p1 = beam.Pipeline()
(
p1
| beam.io.ReadFromText("indata/dept_data.txt")
| beam.Map(lambda x: str(x).split(","))
| beam.Map(lambda x: (x[3], x))
| beam.GroupByKey()
| beam.ParDo(writeEachGroupToGCP())
)
p1.run()
1条答案
按热度按时间des4xlb01#
您似乎希望根据ManagerName列将数据写入GCP。您可以使用Apache Beam中的Google Cloud Storage(GCS)API实现此操作。请尝试以下操作:
第四章