我想使用flume收集python脚本中的日志,所以我按照用户指南用netcat源代码配置flume,然后使用telnet和 nc
对于测试,它工作得很好。
我的配置代码:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
然后我使用python连接flume,并向它发送一些单词,如下所示:
import socket
def netcat(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((hostname, port))
s.send("test words 1\n")
s.send("test words 2\n")
s.send("test words 3\n")
s.send("test words 4\n")
s.shutdown(socket.SHUT_WR)
s.close()
if _name_ == "_main_":
netcat("127.0.0.1",44444)
出现问题时,Flume只能接收2行。Flume日志: 2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 } 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }
我在ubuntu&java1.8和centos&java1.7上得到了相同的结果,在python中的telnet模型也得到了相同的结果。
config或python脚本有什么问题吗?或者有人对这个案子有意见吗?
1条答案
按热度按时间hfyxw5xn1#
之所以会发生这种情况,是因为你没有等到回复回来。默认情况下,flume的netcat源将为每个事件发送一条“ok”消息。您在发送响应之前终止了连接,这将导致进一步消息的处理失败(因为管道已从客户端断开)。
要解决此问题,需要对flume.conf进行以下更改:
这消除了发送“ok”的要求,从而停止了故障。
或者,您可以将python更改为每次在关闭连接之前等待发送“ok”消息。人为地,在中添加一个sleep语句也可以解决这个问题,尽管您会假设处理消息可能需要多长时间。正常情况下可以,但可能有其他情况导致处理延迟。