hdfs文件监视程序

frebpwbc  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(362)

我可以在hdfs上使用文件监视程序吗?
场景:文件连续登陆到hdfs上。我想在文件数达到阈值(可以是文件数或文件大小)后启动spark作业。
是否可以在hdfs上实现文件观察程序来实现这一点。如果是的话,那么有人能提出建议吗?有什么不同的选择吗?Zookeeper或Zookeeper能做到吗?
任何帮助都将不胜感激。谢谢。

qkf9rpyu

qkf9rpyu1#

旧线程。。。万一,如果有人想在 Scala ```
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.client.HdfsAdmin
import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}

object HDFSTest extends App {
val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
val eventStream = admin.getInotifyEventStream()

while( true ) {
val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
events.getEvents.toList.foreach { event ⇒
println(s"event type = ${event.getEventType}")
event match {
case create: CreateEvent ⇒
println("CREATE: " + create.getPath)

    case rename: RenameEvent ⇒
      println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)

    case append: AppendEvent ⇒
      println("APPEND: " + append.getPath)

    case other ⇒
      println("other: " + other)
  }
}

}
}

万一,如果你想使用一个冒充的用户。。。设置环境变量: `HADOOP_USER_NAME=user-name` 
nhjlsmyf

nhjlsmyf2#

oozie协调员可以做到这一点。可以根据数据可用性触发oozie协调器操作。编写数据触发协调器。根据done标志触发协调器操作。done标志只是一个空文件。因此,当达到阈值时,在目录中写入一个空文件。

tzcvj98z

tzcvj98z3#

hadoop 2.6简介 DFSInotifyEventInputStream 你可以用来做这个。你可以从 HdfsAdmin 然后打电话给我 .take() 或者 .poll() 得到所有的事件。事件类型包括delete、append和create,它们应该包含您要查找的内容。
下面是一个基本的例子。一定要把它当作 hdfs 作为管理接口的用户需要hdfs root。

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
    HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
        EventBatch events = eventStream.take();
        for( Event event : events.getEvents() ) {
            System.out.println( "event type = " + event.getEventType() );
            switch( event.getEventType() ) {
                case CREATE:
                    CreateEvent createEvent = (CreateEvent) event;
                    System.out.println( "  path = " + createEvent.getPath() );
                    break;
                default:
                    break;
            }
        }
    }
}

以下是一篇博文,内容更为详细:
http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1

相关问题