我有一个假脱机目录,其中所有的json文件都存在,传入的文件将被添加到这个目录的每一秒钟,我必须反序列化传入的json文件,并获取需要字段,并附加到hdfs目录。
我所做的是创建了一个flume conf文件,其中以假脱机目录中的文件作为源,并使用1个sink将json文件直接放入hdfs中。
我必须在下沉之前将这个json转换成结构格式,并将其放入hdfs中。最重要的是,它不是一个twitter数据。我必须实现纯粹的Flume。
我使用以下Flume配置来完成这项工作:
agent_slave_1.channels.fileChannel1_1.type = file
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir
agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1
agent_slave_1.sinks = hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1
但我不知道如何使用反序列化程序。
有人能帮我一个想法如何反序列化输入的json文件吗?如果我需要用java编写代码,请帮助我,我需要使用什么接口?如果可能的话,给点提示。
1条答案
按热度按时间4uqofj5v1#
最好的猜测是编写一个自定义拦截器,将json转换为所需的hdfs格式。它还有一个好处,就是填充可以在hdfs路径中使用的头文件。
下面是如何配置拦截器:
课程将如下所示:
别忘了把这个类打包到一个jar中,并把它放到flume的lib目录中。