我正在尝试使用KafkaJS在本地运行一个Kafka消费者。它表明消费者正在运行,但它没有从生产者正在积极推动事件的主题中消费。消费者过去一周一直在工作,但发生了一些事情,之后它停止消费事件。在开发环境中运行相同的设置时,可以正常运行,甚至可以处理事件。无法调试到底发生了什么。
这些是我写的实用函数
const kafka = new Kafka({
clientId: "integrations-queues",
brokers: [
process.env.KAKFA_CONNECTION_URL_1,
process.env.KAKFA_CONNECTION_URL_2,
],
});
const createKafkaConsumer = (groupId: string) => {
const consumer = kafka.consumer({
groupId,
});
return consumer;
};
const connectAndSubscribeKafkaConsumer = async (
consumer: Consumer,
topic: string
) => {
try {
await consumer.connect();
await consumer.subscribe({ topic });
} catch (error) {
SentryService.captureException(error);
console.log(error);
}
};
(async () => {
try {
const consumerGroupName =
process.env.ENV === "localhost"
? "raw-data-consumer-local"
: process.env.WORKFLOW_RAW_DATA_CONSUMER;
rawDataConsumer = createKafkaConsumer(consumerGroupName);
await connectAndSubscribeKafkaConsumer(
rawDataConsumer,
process.env.WORKFLOW_EXECUTIONS_PRODUCER_TOPIC
);
await rawDataConsumer.run({
eachMessage: rawEventHandler,
});
} catch (error) {
SentryService.captureException(error);
console.log(error);
}
})();
如果有人遇到类似的问题,我试着搜索这个问题,但找不到任何东西。
1条答案
按热度按时间b5lpy0ml1#
一周前我在
Kafka的默认保留期为7天。如果你没有活跃的生产者,那么消费者将停止看到新的事件。一个全新的消费者当然会工作,因为数据是新鲜的。
我建议使用
kafka-consumer-groups
命令来检查您的组ID滞后