StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
2条答案
按热度按时间yzckvree1#
这个问题的完整工作代码在下面的链接中。您需要启用检查点才能将.inprogress文件移动到实际文件
//每1000毫秒启动一个检查点env.enablecheckpointing(1000);
streamingfilesink未将数据摄取到s3
mepcadol2#
如果您的目标只是将文件复制到s3,那么有更简单、更合适的工具可以实现。也许同步是合适的。
假设使用flink是有意义的(例如,因为您希望对数据执行一些有状态的转换),那么您的所有任务管理器(worker)都需要使用相同的uri访问要处理的文件。为此,可以使用file://uri。
您可以执行以下操作来监视目录并在新文件出现时接收它们:
请注意文档中的警告:
如果watchtype连续设置为fileprocessingmode.process\u,则在修改文件时,将完全重新处理其内容。这可能会打破“恰好一次”的语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
这意味着您应该以原子方式将准备接收的文件移动到监视的文件夹中。
您可以使用流文件接收器写入s3。flink的写操作,例如
writeUsingOutputFormat()
,不参与检查点,因此在这种情况下,这不是一个好的选择。