node.js到flume ng

hs1ihplo  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(625)

是否有一个功能项目可以将数据从node.js移动到flume ng中,而无需中间文件。
也许我遗漏了一些东西,我本以为将数据从node.js移动到flume是一种更常见的需求,但事实似乎并非如此。
我已经发现了一些项目,似乎已经尝试了这一点,但所有似乎已经放弃了大约3年前,并没有与当前版本的功能。似乎有一些将与旧版本的flume一起工作,但是api与flume ng发生了实质性的变化,它们不再适用。
我找到了node.js的avro和thrift模块,thrift现在支持node.js,这似乎表明这应该是直截了当的,但这不起作用,可能没有足够的信息来说明flume ng使用哪种传输/协议,或者我只是不够了解。
在我重新发明轮子之前,有人能给我指出正确的方向吗?
这是我的当前节点代码。它产生了一个经济效益。


# !/usr/local/bin/node

var thrift = require('thrift');
//var ThriftTransports = require('thrift/transport');
//var ThriftProtocols = require('thrift/protocol');

var Flume = require('./gen-nodejs/ThriftSourceProtocol');
var ttypes = require('./gen-nodejs/flume_types');

transport = thrift.TBufferedTransport();
protocol = thrift.TBinaryProtocol();
//transport = ThriftTransports.TBufferedTransport();
//protocol = ThriftProtocols.TBinaryProtocol();
var connection = thrift.createConnection("127.0.0.1", 51515,{
        transport: transport,
        protocol: protocol
});

connection.on('error', function(err) {
  console.error(err);
});

var client = thrift.createClient(Flume, connection);

var myEvent = new ttypes.ThriftFlumeEvent();
myEvent.headers = {};
myEvent.body = "body";

client.append(myEvent, function(err,data) {
  if (err) {
    // handle err
  } else {
    // data == [ttypes.ColumnOrSuperColumn, ...]
  }
  connection.end();
});
jdzmm42g

jdzmm42g1#

这是flume-1894中实现的thrift服务器的设置,文件名为thriftsource.java:

args.protocolFactory(new TCompactProtocol.Factory());
  args.inputTransportFactory(new TFastFramedTransport.Factory());
  args.outputTransportFactory(new TFastFramedTransport.Factory());
  args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));

要使其工作,您需要在客户端使用兼容堆栈:
紧凑协议
快速帧传输
thriftrpcclient.java中已经有一个客户机实现,所以您不需要再发明轮子了。

相关问题