等待Kafka领导人选举

5fjcxozz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(502)

形势

我用Kafka写一些动态生成的Kafka主题。
我发现在我的制作人注册后立即写这些主题通常会导致一个错误: There is no leader for this topic-partition as we are in the middle of a leadership election .
完全错误是:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

代码

以下是导致问题的代码:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

问题

两个问题:
在连接到生产者(或发送)时,我应该做什么特别的事情来确保逻辑阻塞,直到生产者真正准备好将数据发送到Kafka主题?
在向生产者发送数据以确保消息不会被丢弃时,我应该做些什么特别的事情吗?

0aydgbwb

0aydgbwb1#

解决方案

Kafka提供了一个 createTopics 方法通过具有可选 waitForLeaders 标志:

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

使用此方法可以解决问题。

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

不幸的是,如果主题已经存在,这将导致另一个错误,但这是一个单独的问题,我怀疑错误比破坏更具信息性。

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}

相关问题