我使用flink从定期附加数据的文件中连续读取数据。我试图在flink中使用readfile方法,但对如何在这个方法的参数中提到fileinputformat感到困惑。我的文件格式是json。有人能帮我吗?谢谢
sc4hvdpw1#
flink的inputformat不适合从并发写入的文件中读取。考虑到这个需求,我假设您正在寻找一种将文件作为流使用并使用flink的datastreamapi处理它的方法。在这种情况下,您需要实现一个sourcefunction来跟踪文件的大小和进度,并连续读取文件。但是,我不推荐这种设计。我宁愿定期启动一个新文件,并将其移动到一个专门的文件夹消费一旦它已完成和下一个文件已启动。
1条答案
按热度按时间sc4hvdpw1#
flink的inputformat不适合从并发写入的文件中读取。
考虑到这个需求,我假设您正在寻找一种将文件作为流使用并使用flink的datastreamapi处理它的方法。在这种情况下,您需要实现一个sourcefunction来跟踪文件的大小和进度,并连续读取文件。
但是,我不推荐这种设计。我宁愿定期启动一个新文件,并将其移动到一个专门的文件夹消费一旦它已完成和下一个文件已启动。