Kafka消费者不使用KafkaJS在本地工作

vuktfyat  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(136)

我正在尝试使用KafkaJS在本地运行一个Kafka消费者。它表明消费者正在运行,但它没有从生产者正在积极推动事件的主题中消费。消费者过去一周一直在工作,但发生了一些事情,之后它停止消费事件。在开发环境中运行相同的设置时,可以正常运行,甚至可以处理事件。无法调试到底发生了什么。
这些是我写的实用函数

const kafka = new Kafka({
  clientId: "integrations-queues",
  brokers: [
    process.env.KAKFA_CONNECTION_URL_1,
    process.env.KAKFA_CONNECTION_URL_2,
  ],
});

const createKafkaConsumer = (groupId: string) => {
  const consumer = kafka.consumer({
    groupId,
  });

  return consumer;
};

const connectAndSubscribeKafkaConsumer = async (
  consumer: Consumer,
  topic: string
) => {
  try {
    await consumer.connect();

    await consumer.subscribe({ topic });
  } catch (error) {
    SentryService.captureException(error);
    console.log(error);
  }
};
(async () => {
  try {
    const consumerGroupName =
      process.env.ENV === "localhost"
        ? "raw-data-consumer-local"
        : process.env.WORKFLOW_RAW_DATA_CONSUMER;

    rawDataConsumer = createKafkaConsumer(consumerGroupName);

    await connectAndSubscribeKafkaConsumer(
      rawDataConsumer,
      process.env.WORKFLOW_EXECUTIONS_PRODUCER_TOPIC
    );

    await rawDataConsumer.run({
      eachMessage: rawEventHandler,
    });
  } catch (error) {
    SentryService.captureException(error);
    console.log(error);
  }
})();

如果有人遇到类似的问题,我试着搜索这个问题,但找不到任何东西。

b5lpy0ml

b5lpy0ml1#

一周前我在
Kafka的默认保留期为7天。如果你没有活跃的生产者,那么消费者将停止看到新的事件。一个全新的消费者当然会工作,因为数据是新鲜的。
我建议使用kafka-consumer-groups命令来检查您的组ID滞后

相关问题