Nodejs Kafka错误连接拒绝Windows 10机器

zy1mlcev  于 2023-06-24  发布在  Windows
关注(0)|答案(1)|浏览(129)

我在我的windows 10机器上运行Kafka代理:

[2023-06-19 19:12:33,360] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
3.4.1 (Commit:8a516edc2755df89)

我运行zookeeper和Kafka服务器都使用bin\windows\可执行文件。当我跑的时候

kafka-console-producer.bat --topic test --bootstrap-server localhost:9092

然后呢

kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092 --from-beginning

两者都工作正常,我能够从生产者向消费者发送消息。
但是当我尝试使用nodejs生产者时,它给了我下面的错误。

{"level":"WARN","timestamp":"2023-06-19T13:30:50.265Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
{"level":"ERROR","timestamp":"2023-06-19T13:30:50.304Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED ::1:9092","broker":"localhost:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED ::1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1494:16)"}

代码为:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error);

我尝试使用kafka-node和kafka npm包,但得到相同的错误。
我已经在server.properties文件listeners=PLAINTEXT://localhost:9092中添加了以下行,但仍然没有成功。
我期待一个解决方案,它将与最好的Kafka节点包一起工作,以实现高可扩展性。
我正在使用这个链接:[https://www.npmjs.com/package/kafkajs]

m3eecexj

m3eecexj1#

我禁用了IPV6适配器,并将server.properties文件恢复为原始文件。我有类似的问题与redis我认为这可能是同样的问题。

相关问题