我正在使用hdfs文件观察服务加载配置文件,一旦它在我的flink流作业中被更改。
watcher服务的源代码:hdfs file watcher
我在这里面临的问题是,watcher服务对整个hdfs中的更改做出React,而不仅仅是我传递的目录。
我的代码:
public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
HdfsAdmin admin = new HdfsAdmin( URI.create("hdfs://stage.my-org.in:8020/tmp/anurag/"), new Configuration() );
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
while( true ) {
EventBatch events = eventStream.take();
for( Event event : events.getEvents() ) {
switch( event.getEventType() ) {
case CREATE:
System.out.print( "event type = " + event.getEventType() );
CreateEvent createEvent = (CreateEvent) event;
System.out.print( " path = " + createEvent.getPath() + "\n");
break;
default:
break;
}
}
}
}
程序输出:
event type = CREATE path = /tmp/anurag/newFile.txt
event type = CREATE path = /tmp/newFile2.txt
请帮助我解决这个问题,这样我就可以在特定的目录中观看作为uri传递的文件
谢谢期待
注意:如果您尝试运行此程序,请以hdfs用户身份运行,否则您将得到org.apache.hadoop.security.accesscontrolexception
2条答案
按热度按时间uqxowvwt1#
现在,我使用hadoopapi每30秒获取一次文件,读取它的修改时间,如果它大于再次加载文件。
xriantvc2#
inotifyeventstream只不过是解析到对象中的hdfs事件日志,它会将hdfs中的所有事件发送给您,无论您在构造函数中设置的是哪个目录,这就是您需要使用超组成员运行该代码的原因之一。
解决方案是在事件发生时对其进行过滤,只从您想要的目录中获取那些事件。比如: