flume avro接收器是否可以连接到node.js服务器?

1bqhqjot  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(410)

第一次堆栈溢出问题。。。将尽量包含更多细节。
我试图通过avro接收器将apacheflume日志数据传输到node.js服务器,监听特定端口。我打算使用collective media的node avro库来帮助实现avro二进制格式和json之间的序列化,这样我就可以处理node.js中的数据(我通过socket.io pub/sub将数据传递给客户机)。
我确信我已经正确地配置了flume,因为我看到数据流经通道并输出到控制台(仅用于调试,我也将数据下沉到控制台)。不过,当我启用avro接收器并启动侦听同一端口的node.js服务器时,flume在尝试执行avro传输时会抛出一个异常:

2013-02-15 22:06:09,858 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Failed to send batch
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
    ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Exception thrown from remote handler
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
    ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: NettyTransceiver closed
    at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
    ... 6 more
Caused by: java.io.IOException: NettyTransceiver closed
    at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338)
    at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:236)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:623)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:101)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more
2013-02-15 22:06:14,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] Avro sink k1: Building RpcClient with hostname: 127.0.0.1, port: 4242

我不确定的是,如何确定node.js服务是否至少收到了消息。我对node.js很陌生,所以这没什么帮助,但下面是设置侦听器的代码片段:

var flumeSink = require('http').createServer(flumeHandler);
flumeSink.listen(8000);
function flumeHandler (req, res) {
    console.log("Got it!");
    //var schema = avro.prepareSchema("string");
    //var buffer = schema.encode("foo");
    //var value = schema.decode(buffer);
}

我想我把node.js端设置错了。我使用的是http模块,它可能不是正确的模块。也许我需要考虑在node.js中编写一个自定义接收器?感谢指点/帮助!

dced5bon

dced5bon1#

在这种情况下,avro接收器可能不是您所需要的,因为它被设计用于在Flume之间进行通信(这是构建Flume连接拓扑的方式)。
如果要创建不在标准列表中的接收器,则需要构建自定义接收器并使用中定义的自定义配置https://flume.apache.org/flumeuserguide.html#custom-我试过了,效果很好。
或者使用现有的东西:
https://github.com/josealvarezmuguerza/flume-http-sink
我从没用过这个模块。刚用谷歌搜索过来。
对于avro部分,只需使用morphlines将源代码转换为avro,然后将每个事件发布到node.js服务器。
希望这能给你一点启发。
开始密码!

相关问题