从mongodb转换一组特定的记录

xwmevbvl  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(244)

我有一个周期性触发的批处理作业,它将数据写入mongodb。这项工作需要大约10分钟,然后我想接收这些数据,并用apacheflink做一些转换(Map、过滤、清理…)。记录之间有一些依赖关系,这意味着我必须一起处理它们。例如,我喜欢转换客户id为45666的最新批处理作业中的所有记录。结果将是一个聚合记录。
有没有什么最佳实践或方法可以做到这一点而不必自己去实现(从最新的工作中获取客户ID,为每个客户选择记录和转换,标记转换后的客户等等)?
我不能流式传输它,因为我必须转换多个记录在一起,而不是一个接一个。
目前我正在使用springbatch、mongodb、kafka和apacheflink。

eqqqjvef

eqqqjvef1#

可以想象,您可以将mongodb变更流连接到flink,并将其用作所描述任务的基础。涉及10-35gb数据的事实并不排除使用flink流的可能性,因为您可以将flink配置为在其状态无法放入堆时溢出到磁盘。
不过,在得出结论认为这是一种明智的做法之前,我想更好地了解情况。

相关问题