python 使用Apache Beam构造具有唯一ID的新列

qnakjoqk  于 2023-02-28  发布在  Python
关注(0)|答案(2)|浏览(136)

我是Apache Beam的新手,我尝试在索引0处创建一个新列,在多个列上使用groupbykey,并构造一个新的唯一ID。
我还想将新数据写入一个换行符分隔的JSON格式文件(其中每行是一个unique_id,以及属于该unique_id的对象数组)。
我现在写的是:

import apache_beam as beam

pipe = beam.Pipeline()

id = (pipe
            |beam.io.ReadFromText('data.csv')
            |beam.Map(lambda x:x.split(","))
            |beam.Map(print))

它基本上将每一行转换为字符串列表。
这篇文章有样本数据输入和解决方案使用Pandas这样做,但我如何实现同样的管道使用梁?
谢谢大家!

yi0zb3m4

yi0zb3m41#

你试过这样的CombinePerKey吗?

import apache_beam as beam

p = beam.Pipeline()

test = (
    p
    | beam.Create([(0, "ttt"), (0, "ttt1"), (0, "ttt2"), (1, "xxx"), (1, "xxx2"), (2, "yyy")])
    | beam.CombinePerKey(lambda v: ",".join(v))
    | beam.Map(print)
)
6pp0gazn

6pp0gazn2#

像链接示例中那样,唯一ID是从0到n_groups的整数对您来说重要吗?
如果没有,那么我认为这里没有任何使用分组操作的必要。

import apache_beam as beam

def make_unique_id(row):
  """
  Example function for extracting a unique ID from the row.

  You could wrap the value in uuid.UUID to make a more standard format for the ID.
  """
  return ",".join([row[0], row[1]])

pipe = beam.Pipeline()

id = (pipe
            | beam.io.ReadFromText('data.csv')
            | beam.Map(lambda x: x.split(","))
            | beam.Map(lambda x: [make_unique_id(x)] + x)
            | beam.Map(print))

相关问题