将avro消息推送到kafka主题

5q4ezhmt  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(573)

我在试着用Kafka节点
库将数据推送到现有主题。我已经用curl将schema添加到schemaregistry中,并且还添加了主题。我得到以下错误:

  1. Error: Topic name or id is needed to build Schema
  2. at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
  3. at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
  4. at new Promise (<anonymous>)

我的代码片段如下:

  1. const Settings = {
  2. "kafka" : {
  3. "kafkaHost" : config.KafkaHost
  4. },
  5. "schema": {
  6. "registry" : config.KafkaRegistry
  7. }
  8. };
  9. console.log("settings registry: ", config.KafkaRegistry);
  10. console.log("settings kafkaHost: ", config.KafkaHost)
  11. KafkaAvro.init(Settings).then( kafka => {
  12. const producer = kafka.addProducer();
  13. let payloads = [
  14. {
  15. topic: 'classifier-response-test',
  16. messages: JSON.stringify(kafkaData)
  17. }
  18. ];
  19. producer.send(payloads).then( success => {
  20. // Message was sent encoded with Avro Schema
  21. console.log("message sent ! Awesome ! :) ")
  22. }, error => {
  23. // Something wrong happen
  24. console.log("There seems that there is a mistake ! try Again ;) ")
  25. console.log(error)
  26. });
  27. } , error => {
  28. // something wrong happen
  29. console.log("There seems that there is a global mistake ! try Again ;) ")
  30. console.log(error)
  31. });
vdgimpew

vdgimpew1#

问题是我们需要将主题列表放在模式设置中,以便在主题和模式注册表之间建立链接。我们可以将schema id或topic name。

  1. const kafkaSettings = {
  2. "kafka" : {
  3. "kafkaHost" : config.KafkaHost
  4. },
  5. "schema": {
  6. "registry" : config.KafkaRegistry,
  7. "topics": [
  8. {"name": "classifier-response-test" }
  9. ]
  10. }
  11. };
tsm1rwdh

tsm1rwdh2#

图书馆还没有准备好 send 消息列表,当尝试在发送机制中将架构动态添加到架构池中时,它抱怨的原因。最简单的解决方案是一次发送一条消息,在您的代码示例中可能是这样的

  1. const Settings = {
  2. "kafka" : {
  3. "kafkaHost" : config.KafkaHost
  4. },
  5. "schema": {
  6. "registry" : config.KafkaRegistry
  7. }
  8. };
  9. KafkaAvro.init(Settings).then( kafka => {
  10. kafka.send({
  11. topic: 'classifier-response-test',
  12. messages: JSON.stringify(kafkaData)
  13. }).then( success => {
  14. // Message was sent encoded with Avro Schema
  15. console.log("message sent ! Awesome ! :) ")
  16. }, error => {
  17. // Something wrong happen
  18. console.log("There seems that there is a mistake ! try Again ;) ")
  19. console.log(error)
  20. });
  21. } , error => {
  22. // something wrong happen
  23. console.log("There seems that there is a global mistake ! try Again ;) ")
  24. console.log(error)
  25. });

感谢您使用Kafka节点avro

展开查看全部

相关问题