我使用nodejs在node red中重新编写了一个kafka生产者节点。我注意到kafka10增加了一个关于在发送给生产者的消息上附加时间戳的特性,但我只找到了java示例。如何使用nodejs向消息添加时间戳?
下面我将报告如何在nodejs中实现producer:
function kafkaAdvancedNode(config) {
RED.nodes.createNode(this,config);
var topic = config.topic;
var partition = config.partition;
var clusterZookeeper = config.zkquorum;
var debug = (config.debug == "debug");
var node = this;
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var topics = config.topics;
var client = new Client(clusterZookeeper);
try {
this.on("input", function(msg) {
var payloads = [];
// check if multiple topics
if (topics.indexOf(",") > -1){
var topicArry = topics.split(',');
for (i = 0; i < topicArry.length; i++) {
payloads.push({topic: topicArry[i], messages: msg.payload});
}
}
else {
if(partition == "" || !partition)
payloads = [{topic: topics, messages: msg.payload}];
else
payloads = [{topic: topics, messages: msg.payload, partition: partition}];
}
producer.send(payloads, function(err, data){
if (err){
node.error(err);
}
node.log("Message Sent: " + data);
});
});
}
catch(e) {
node.error(e);
}
var producer = new HighLevelProducer(client);
this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
}
在谷歌上我发现this:link
但实际上我不明白如何在我的代码中集成(什么是avro?)
1条答案
按热度按时间8yoxcaq71#
我不认为kafka节点支持0.10 kafka协议,所以它不太可能支持消息的时间戳。
出于这个(以及其他)原因,我使用node rdkafka编写了一些类似的东西,并将其发布为node red contrib rdkafka。看到了吗https://github.com/hjespers/node-red-contrib-rdkafka
更新:我刚刚发布了node red contrib rdkafka的新版本(0.2.0),它支持0.10“事件时间”或“处理时间”时间戳以及动态主题、键和分区。