我是新来Kafka和实现它在nodejs使用Kafka节点。我想在一个主题中创建3个分区,并同时向所有主题发布消息。我尝试了以下代码,但这里只创建了一个分区,所有消息都将发送到该分区。谁能告诉我哪里出错了吗。非常感谢。
Abc.abcData = async() => {
try
{
var client = new kafka.KafkaClient();
var topic = 'newTopic';
var topicsToCreate = [
{
topic: topic,
partitions: 3,
replicationFactor: 2,
replicaAssignment: [
{
partition: 0,
replicas: [0]
},
{
partition: 1,
replicas: [1]
},
{
partition: 2,
replicas: [2]
}
]
},
]
client.createTopics(topicsToCreate, (error, result) => {
console.log(result);
});
var HighLevelProducer = kafka.HighLevelProducer;
var producer = new HighLevelProducer(client);
var payloads = [
{ topic: topic, messages: 'this is partition 1!!', partitions: 0},
{ topic: topic, messages: 'this is partition 2!!', partitions: 1},
{ topic: topic, messages: 'this is partition 3!!', partitions: 2}
];
producer.on('ready', function () {
producer.send(payloads, function (err, result) {
if (err)
console.log(err);
console.log(result);
});
});
}
catch (err)
{
console.error(err.message);
}
};
我得到的答复如下-
[ { topic: 'newTopic', error: "Topic 'newTopic' already exists." } ]
{"newTopic":{"0":6}}
1条答案
按热度按时间6gpjuf901#
在这里,您在kafka服务器上使用了createtopics(),它仅在kafka服务器上的auto.create.topics.enable设置为true时才起作用。客户端只需向服务器发送一个元数据请求,服务器将自动创建主题。当async设置为false时,此方法在创建所有主题之前不会返回,否则会立即返回。因此,这里默认的一个主题和一个分区是创建。要创建多个分区或自定义分区,必须在server.property文件中添加以下行-