我对何时以及多久使用一次ApacheBeam(python)管道对象本身感到困惑,希望有人能提供帮助。我正在linux虚拟机中本地使用python和docker进行开发。
我尽量避免做很多事情
# create initial pipeline with a million chained transforms
with beam.Pipeline() as p:
p | "tranform " >> | "transform ad naseum.." >>
# create another pipeline likely recreating a pcollection i already had above
with beam.Pipeline() as p:
p | "tranform " >> | "transform ad naseum.." >>
当我尝试做类似的事情时(再次使用伪代码)
with beam.Pipeline() as p:
data_pc = (
p
| "Read red blue csv files" >> beam.io.fileio.MatchFiles("./foo*.csv")
| "Match files" >> beam.io.fileio.ReadMatches()
| "Remove ids" >> beam.FlatMap(remove_id)
)
blue_list = { }
distinct_pc = data_pc | "get distinct blue rows into blue dict" >>
# more pcollection / transform operations here w/o using p object
# insert blue list in mysql
# do some things with data from mysql and blue list and data_pc pcollection
# etc etc
p | "write out as needed" >>
它将在一定程度上起作用,但我会遇到各种管道错误,这表明我对如何使用管道本身存在一些误解,下面我将更笼统地描述我的应用程序。
应用程序读取csv文件(稍后它将从云中的某些内容中读取此数据),对该数据执行许多操作,将其中一些数据放入mysql数据库,从mysql获取数据,将该数据与一些原始数据组合,然后再次将一些最终数据发送到mysql,并可能将其中一些写入输出文件。
我正试图在一条管道中完成以上所有工作 with
代码块。。。
暂无答案!
目前还没有任何答案,快来回答吧!