在kafka主题中创建多个分区,并使用kafka节点向所有分区发布消息

ej83mcc0  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(791)

我是新来Kafka和实现它在nodejs使用Kafka节点。我想在一个主题中创建3个分区,并同时向所有主题发布消息。我尝试了以下代码,但这里只创建了一个分区,所有消息都将发送到该分区。谁能告诉我哪里出错了吗。非常感谢。

  1. Abc.abcData = async() => {
  2. try
  3. {
  4. var client = new kafka.KafkaClient();
  5. var topic = 'newTopic';
  6. var topicsToCreate = [
  7. {
  8. topic: topic,
  9. partitions: 3,
  10. replicationFactor: 2,
  11. replicaAssignment: [
  12. {
  13. partition: 0,
  14. replicas: [0]
  15. },
  16. {
  17. partition: 1,
  18. replicas: [1]
  19. },
  20. {
  21. partition: 2,
  22. replicas: [2]
  23. }
  24. ]
  25. },
  26. ]
  27. client.createTopics(topicsToCreate, (error, result) => {
  28. console.log(result);
  29. });
  30. var HighLevelProducer = kafka.HighLevelProducer;
  31. var producer = new HighLevelProducer(client);
  32. var payloads = [
  33. { topic: topic, messages: 'this is partition 1!!', partitions: 0},
  34. { topic: topic, messages: 'this is partition 2!!', partitions: 1},
  35. { topic: topic, messages: 'this is partition 3!!', partitions: 2}
  36. ];
  37. producer.on('ready', function () {
  38. producer.send(payloads, function (err, result) {
  39. if (err)
  40. console.log(err);
  41. console.log(result);
  42. });
  43. });
  44. }
  45. catch (err)
  46. {
  47. console.error(err.message);
  48. }
  49. };

我得到的答复如下-

  1. [ { topic: 'newTopic', error: "Topic 'newTopic' already exists." } ]
  2. {"newTopic":{"0":6}}
6gpjuf90

6gpjuf901#

在这里,您在kafka服务器上使用了createtopics(),它仅在kafka服务器上的auto.create.topics.enable设置为true时才起作用。客户端只需向服务器发送一个元数据请求,服务器将自动创建主题。当async设置为false时,此方法在创建所有主题之前不会返回,否则会立即返回。因此,这里默认的一个主题和一个分区是创建。要创建多个分区或自定义分区,必须在server.property文件中添加以下行-

  1. auto.create.topics.enable=false

相关问题