object HDFSTest extends App { val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() ) val eventStream = admin.getInotifyEventStream()
while( true ) { val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS) events.getEvents.toList.foreach { event ⇒ println(s"event type = ${event.getEventType}") event match { case create: CreateEvent ⇒ println("CREATE: " + create.getPath)
case rename: RenameEvent ⇒
println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)
case append: AppendEvent ⇒
println("APPEND: " + append.getPath)
case other ⇒
println("other: " + other)
}
}
3条答案
按热度按时间qkf9rpyu1#
旧线程。。。万一,如果有人想在
Scala
```import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.client.HdfsAdmin
import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}
object HDFSTest extends App {
val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
val eventStream = admin.getInotifyEventStream()
while( true ) {
val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
events.getEvents.toList.foreach { event ⇒
println(s"event type = ${event.getEventType}")
event match {
case create: CreateEvent ⇒
println("CREATE: " + create.getPath)
}
}
nhjlsmyf2#
oozie协调员可以做到这一点。可以根据数据可用性触发oozie协调器操作。编写数据触发协调器。根据done标志触发协调器操作。done标志只是一个空文件。因此,当达到阈值时,在目录中写入一个空文件。
tzcvj98z3#
hadoop 2.6简介
DFSInotifyEventInputStream
你可以用来做这个。你可以从HdfsAdmin
然后打电话给我.take()
或者.poll()
得到所有的事件。事件类型包括delete、append和create,它们应该包含您要查找的内容。下面是一个基本的例子。一定要把它当作
hdfs
作为管理接口的用户需要hdfs root。以下是一篇博文,内容更为详细:
http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1