我是Flink的新手,需要一些技术方面的帮助。
当前方案:
我有一个在GKE上运行并写入记录的flink应用程序(来自Kafka源代码)到BigQuery。我可以将记录写入BigQuery,没有任何问题。目前,记录被一个接一个地写入到接收器中,因此每个Kafka消息都有自己的对BigQuery的插入API调用,这并不理想,因为我们需要执行批量插入,并且单独插入每个记录将非常昂贵。我正在使用API。
新要求:
在写入BigQuery之前缓冲记录理想情况下,我希望在将记录写入接收器之前根据大小/时间缓冲记录。
我不知道如何在Flink中实现这个功能,因此我正在寻找实现这个功能的方法
1条答案
按热度按时间mctunoxg1#
您可以在数据流上使用进程窗口函数。
例如,如果您使用10秒的滚动窗口,它将收集该时间范围内的所有记录,然后您可以下沉
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/windows/#processwindowfunction