我正在尝试使用kafka节点通过节点js将数据插入kafka主题。
如果尝试使用partitionertype,则不会插入数据。如果我删除partitionertype(即不使用选项),代码就可以工作,但在这种情况下,所有的数据都只进入一个分区。
你能帮我解决这个问题吗。提前谢谢。
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.Client("zookeeper-host:2181","node-id"),
options = {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: 3
},
producer = new Producer(client, options);
//producer = new Producer(client); -- This works
var km = new KeyedMessage('key1', 'message6'),
kn = new KeyedMessage('key2', 'message5'),
kv = new KeyedMessage('key3', 'message4'),
kx = new KeyedMessage('key4', 'message3'),
ky = new KeyedMessage('key5', 'message2'),
kz = new KeyedMessage('key6', 'message1'),
payloads = [
{ topic: 'test-topic3', messages: [km,kn,kv,kx,ky,kz] }
];
console.log("Ready to send data");
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
console.log("DATA SENT");
非常感谢
巴黎
2条答案
按热度按时间ebdffaop1#
使用refreshmetadata方法。。。
就我而言,它是有效的。
请参阅以下链接。。。
https://github.com/sohu-co/kafka-node/issues/436
client.refreshMetadata([topic1, topic2], callback)
lx0bsm1f2#
上面的代码在highlevelconsumer中运行良好。
谢谢,帕里