我正在GCP数据流上运行apache beam pipeline。数据流pipeline建议以下项目
可以在以下转换之后插入融合中断以增加并行性:ReadFromGCS/匹配全部/匹配文件模式/ParMultiDo(匹配)。转换分别具有以下输出与输入元素计数比率:1006307.
我的管道看起来像这样
PCollection<String> records = p.apply("ReadFromGCS", TextIO.read().from(options.getInput())
.withHintMatchesManyFiles());
PCollection<Document> documents = records.apply("ConvertToDocument", ParDo.of(new ProcessJSON(options.getBatch())));
// Write to MongoDB using ParDo transform sink
documents.apply("WriteToMongoDB", MongoDbIO.write()
.withUri("mongodb+srv://"+options.getMongo())
.withDatabase(options.getDatabase())
.withCollection(options.getCollection())
.withBatchSize(options.getBatchSize()));
我的输入是模式“gs://test-bucket/test/*.json”的gcs桶,它包含百万个json文件。我想了解建议的含义以及如何按照建议增加并行性。在我的情况下,我的数据流。
我尝试了此文档,但无法找到解决此问题的方法https://cloud.google.com/dataflow/docs/guides/using-dataflow-insights?&_ga=2.162468924.-1096227812.1671933466#high-fan-out
随附屏幕截图image
1条答案
按热度按时间tct7dpnv1#
有关如何强制执行/防止融合的一些背景信息,请查看融合优化。
一种非常常见的方法是,如果有一些自然的方式来分组,则使用GroupByKey,或者如果您只想均匀地分布,则使用Reshuffle.ViaRandomKey之类的操作。