nfs(netapp服务器)->flink->s3

uajslkp6  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(733)

我是flink(java)的新手,正在尝试将netapp文件服务器上的xml文件作为文件路径移动到flink安装的服务器上。
如何用s3实时进行批处理或流处理,以获取到达文件夹的文件并将其接收。
我在flink starter中找不到任何从本地文件系统读取文件的示例,flink至少是这个用例的正确选择吗?如果是,我在哪里可以找到资源来监听文件夹和管理检查点/保存点?

yzckvree

yzckvree1#

这个问题的完整工作代码在下面的链接中。您需要启用检查点才能将.inprogress文件移动到实际文件
//每1000毫秒启动一个检查点env.enablecheckpointing(1000);
streamingfilesink未将数据摄取到s3

mepcadol

mepcadol2#

如果您的目标只是将文件复制到s3,那么有更简单、更合适的工具可以实现。也许同步是合适的。
假设使用flink是有意义的(例如,因为您希望对数据执行一些有状态的转换),那么您的所有任务管理器(worker)都需要使用相同的uri访问要处理的文件。为此,可以使用file://uri。
您可以执行以下操作来监视目录并在新文件出现时接收它们:

StreamExecutionEnvironment env =    
  StreamExecutionEnvironment.getExecutionEnvironment();

// monitor directory, checking for new files
// every 100 milliseconds

TextInputFormat format = new TextInputFormat(
  new org.apache.flink.core.fs.Path("file:///tmp/dir/"));

DataStream<String> inputStream = env.readFile(
  format, 
  "file:///tmp/dir/",
  FileProcessingMode.PROCESS_CONTINUOUSLY, 
  100, 
  FilePathFilter.createDefaultFilter());

请注意文档中的警告:
如果watchtype连续设置为fileprocessingmode.process\u,则在修改文件时,将完全重新处理其内容。这可能会打破“恰好一次”的语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
这意味着您应该以原子方式将准备接收的文件移动到监视的文件夹中。
您可以使用流文件接收器写入s3。flink的写操作,例如 writeUsingOutputFormat() ,不参与检查点,因此在这种情况下,这不是一个好的选择。

相关问题