我正在尝试构建一个消费者,它将阻止某些操作,直到它从头到尾读取所有当前消息。
我找不到任何方法来找出一个主题上的消息数。我现在的想法是 latest
检查位置,然后返回到 earliest
.
我当前的实现看起来像。
class KafkaReader {
...
private _connectConsumer() {
this._logger.debug('Starting consumer with ', this._config.brokerList, this._config.groupId);
const conf = {
'metadata.broker.list': this._config.brokerList,
'group.id': this._config.groupId,
};
if (this._config.debug) {
conf['debug'] = this._config.debug;
}
this._consumer = new KafkaConsumer(conf, {
'auto.offset.reset': 'earliest' // consume from the start
});
this._consumer.on('data', (message: ConsumerStreamMessage) => {
if (this._config.debug) {
this._logger.debug('Received message', message.topic, message.value.toString('utf8'));
}
this._subject.next(message.value.toString('utf8'));
});
this._consumer.on('ready', (arg) => { this._onConsumerReady(arg); });
this._consumer.connect();
}
private _onConsumerReady(arg: any) {
this._logger.debug('Kafka consumer ready.' + JSON.stringify(arg));
this._consumer.subscribe([this._topic.topicId]);
this._logger.debug('before seek end', this._consumer.position(null));
this._consumer.seek({'topic': this._topic.topicId, 'offset': 'latest', 'partition': 0}, 0, (b) => {
const end = this._consumer.position(null);
this._logger.debug('end', end, b);
this._seekToEarliestAndConsume();
});
}
private _seekToEarliestAndConsume() {
this._consumer.seek({'topic': this._topic.topicId, 'offset': 'earliest', 'partition': 0}, 0, () => {
this._consumer.consume();
this._startUpSequence.next(true);
this._startUpSequence.complete();
});
}
}
我得到输出:
[2019-08-01T13:13:15.156] [DEBUG] kafka-reader - before seek end []
[2019-08-01T13:13:15.157] [DEBUG] kafka-reader - end [] { Error: Local: Erroneous state
at Function.createLibrdkafkaError [as create] (node_modules/node-rdkafka/lib/error.js:334:10)
at node_modules/node-rdkafka/lib/kafka-consumer.js:229:26
message: 'Local: Erroneous state',
code: -172,
errno: -172,
origin: 'kafka' }
看起来第二次搜索工作如期,但第一次给我 Local: Erroneous state
.
有人知道最后一条信息的位置吗?
为什么会扔 Local: Erroneous state
当试图寻找最新的?
seek要求指定分区,是否有其他分区?
谢谢。
1条答案
按热度按时间wpx232ag1#
不确定为什么会出现此错误,但是否应该改用:
从这里开始:https://blizzard.github.io/node-rdkafka/current/kafkaconsumer.html
扬尼克