我尝试用Apache Flume创建一个代理,但我是新手,不太了解。代理必须从Netcat接收数据并将其保存在HDFS文件系统中。代理将接收的数据将是,例如,以下这些:
1、E1、埃内科、多诺斯蒂亚
1、E2、安内、比尔博
2、E3、于伦、拜奥纳
2,E4,杰克,伦敦
在netcat中,我可以一行一行地写,这不成问题,但是如果一行以数字1开头,则必须将该行保存在名为manager的目录(位于HDFS中)中,如果不是,则保存在名为developer的目录(位于HDFS中)中。
我已经完成了下面的配置文件,代理可以正常启动。我还可以从netcat发送数据,代理似乎可以正确侦听,因为OK返回。但是netcat发送的行没有到达HDFS,我创建的目录(manager和developer)总是空的。
我已经使用以下命令在HDFS根目录中创建了目录:Hadoop文件系统-mkdir../../<directory_name>
在日志文件(/var/log/flume-ng/flume.log)中未显示错误。
请帮帮我。我已经检查了很多东西,我不知道我还能做什么。
下面是Apache Flume配置文件:
a1.sources=r1
a1.channels=c1 c2
a1.sinks = k1 k2
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream
a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream
a1.sources.r1.channels = c2 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
1条答案
按热度按时间niknxzdl1#
问题是拦截器没有定义,一旦定义了,一切都能正常工作。
拦截器必须在使用拦截器的块之前定义。