我使用flume代理通过flume代理收集外部数据。外部数据批处理几乎是每10秒1mb。我将flume代理配置如下。
# Flume agent configuration as /flume/conf/agent.conf
agent.sources = netcat-source
agent.channels = kafka-channel
agent.sinks = logger-sink
########################################
# Netcat Source
########################################
agent.sources.netcat-source.type = netcat
agent.sources.netcat-source.bind = 0.0.0.0
agent.sources.netcat-source.port = 4141
agent.sources.netcat-source.max-line-length = 500000
agent.sources.netcat-source.channels = kafka-channel
########################################
# Kafka Channel
########################################
agent.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafka-channel.brokerList = 10.212.136.108:9092,10.212.136.108:9092
agent.channels.kafka-channel.zookeeperConnect = 10.212.136.108:2181,10.212.136.108:2181/kafka
agent.channels.kafka-channel.topic = channel
agent.channels.kafka-channel.groupId = fcd-group
########################################
# Logger Sink
########################################
agent.sinks.logger-sink.type = logger
agent.sinks.logger-sink.channel = kafka-channel
我按以下方式激活了代理。
flume-ng agent -n agent -c /flume/conf -f /flume/conf/agent.conf
不幸的是,原来netcat的源代码工作得很好,而channel或sink出现了一些问题。从ubuntu的资源监视器中,我可以看到以下性能。网络性能。蓝色的曲线表示输入,而红色的曲线表示没有其他应用程序运行网络io时的输出,我确信这个数字说明了我的flume代理发生了什么。
当我通过console consumer查看主题“channel”中的Kafka内容时,什么也没有得到。另外,当我检查flume.log时,我只得到flume的状态输出,没有数据。
我已经用
nc -lk 4141 >> my_data_check_file
我的频道或Flume怎么了
p、 当我使用内存通道,文件通道时,事情变得同样棘手。
1条答案
按热度按时间wwwo4jvm1#
啊,终于,我自己解决了这个问题!
关键点是行delimeter'\n'。
在flume源代码netcatsource.java中,我们有一个棘手的行,如下所示
代码强制输入数据以'\n'结尾。否则,频道将不接收任何事件。我们可以根据需要更改此字符,并将自定义源代码放入$flume\u home/lib中