你好,我正在努力寻找这是否可行。我有一个从存储中加载数据并应用一些转换的管道,转换结束后,我将数据上传到bigquery上,但我总是需要处理我所有的数据,但这个过程超时开始有点辛辣的gcp计费方面。
我试图找出一些方法,我只能截短一些分区或删除一些数据范围来插入一个新的分区。
我创建了这个伪代码实际上我在做什么。
//Create My Pipeline
Pipeline p = ....;
//Do some transformations
PCollection<SomeModel> processed_data = p.apply(ParDo....);
//Format the results to Write on BQ
PCollection<TableRow> rows = processed_data.apply("Format to BQ",Map...);
//Save on Bq with partion by Date
rows.apply("Save it",
BigQueryIO.writeTableRows().to(tableName).withSchema(TableSchema)
.withTimePartitioning(new TimePartitioning().setField("date"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
最后,我使用bigqueryio编写bigquery,但我总是需要截断表
obs:至少我需要每天重新处理过去的30天,这样我就不能每天增加数据了。
ob2:我已经搜索了很多,但找不到本地方法,我看到有人建议将数据加载到表中,并创建一个计划作业将其导入生产表或某种类型,或者在dofn内部调用bigquery api删除数据,然后再启动生产管道。
暂无答案!
目前还没有任何答案,快来回答吧!