node.js kafka节点分区器类型用法

wydwbb8l  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(356)

我正在尝试使用kafka节点通过节点js将数据插入kafka主题。
如果尝试使用partitionertype,则不会插入数据。如果我删除partitionertype(即不使用选项),代码就可以工作,但在这种情况下,所有的数据都只进入一个分区。
你能帮我解决这个问题吗。提前谢谢。

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client("zookeeper-host:2181","node-id"),
    options = {
    requireAcks: 1,
    ackTimeoutMs: 100,
    partitionerType: 3
        },
    producer = new Producer(client, options);
    //producer = new Producer(client);  -- This works

    var km = new KeyedMessage('key1', 'message6'),
    kn = new KeyedMessage('key2', 'message5'),
    kv = new KeyedMessage('key3', 'message4'),
    kx = new KeyedMessage('key4', 'message3'),
    ky = new KeyedMessage('key5', 'message2'),
    kz = new KeyedMessage('key6', 'message1'),
    payloads = [
        { topic: 'test-topic3', messages: [km,kn,kv,kx,ky,kz] }
    ];
    console.log("Ready to send data");
    producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
    });
    });

console.log("DATA SENT");

非常感谢
巴黎

ebdffaop

ebdffaop1#

使用refreshmetadata方法。。。
就我而言,它是有效的。
请参阅以下链接。。。
https://github.com/sohu-co/kafka-node/issues/436 client.refreshMetadata([topic1, topic2], callback)

lx0bsm1f

lx0bsm1f2#

上面的代码在highlevelconsumer中运行良好。
谢谢,帕里

相关问题