我是Apache Beam的新手,我尝试在索引0处创建一个新列,在多个列上使用groupbykey,并构造一个新的唯一ID。
我还想将新数据写入一个换行符分隔的JSON格式文件(其中每行是一个unique_id,以及属于该unique_id的对象数组)。
我现在写的是:
import apache_beam as beam
pipe = beam.Pipeline()
id = (pipe
|beam.io.ReadFromText('data.csv')
|beam.Map(lambda x:x.split(","))
|beam.Map(print))
它基本上将每一行转换为字符串列表。
这篇文章有样本数据输入和解决方案使用Pandas这样做,但我如何实现同样的管道使用梁?
谢谢大家!
2条答案
按热度按时间yi0zb3m41#
你试过这样的CombinePerKey吗?
6pp0gazn2#
像链接示例中那样,唯一ID是从0到n_groups的整数对您来说重要吗?
如果没有,那么我认为这里没有任何使用分组操作的必要。