我目前正在编写我的第一个flink应用程序,并希望监视新文件的文件夹。不幸的是,我找不到很多关于这个主题的例子。
我找到了 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
函数来监视目录。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputStream = env.readFile(new TextInputFormat(new Path(filePath)),filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
inputStream.print();
env.execute("Flink streaming test");
}
我认为这个函数监视文件夹并在文件添加到文件夹时读取文件。但是,只读取添加的第一个文件。我想我不太明白它的工作原理。有人能给我解释一下为什么不能这样做,正确的方法是什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!