node rdkafka:查找结束并获取位置

cgh8pdjw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(519)

我正在尝试构建一个消费者,它将阻止某些操作,直到它从头到尾读取所有当前消息。
我找不到任何方法来找出一个主题上的消息数。我现在的想法是 latest 检查位置,然后返回到 earliest .
我当前的实现看起来像。

class KafkaReader {
...
private _connectConsumer() {
        this._logger.debug('Starting consumer with ', this._config.brokerList, this._config.groupId);
        const conf = {
            'metadata.broker.list': this._config.brokerList,
            'group.id': this._config.groupId,
        };

        if (this._config.debug) {
            conf['debug'] = this._config.debug;
        }

        this._consumer = new KafkaConsumer(conf, {
            'auto.offset.reset': 'earliest' // consume from the start
        });

        this._consumer.on('data', (message: ConsumerStreamMessage) => {
            if (this._config.debug) {
                this._logger.debug('Received message', message.topic, message.value.toString('utf8'));
            }

            this._subject.next(message.value.toString('utf8'));
        });

        this._consumer.on('ready', (arg) => { this._onConsumerReady(arg); });

        this._consumer.connect();
    }

    private _onConsumerReady(arg: any) {
        this._logger.debug('Kafka consumer ready.' + JSON.stringify(arg));

        this._consumer.subscribe([this._topic.topicId]);

        this._logger.debug('before seek end', this._consumer.position(null));
        this._consumer.seek({'topic': this._topic.topicId, 'offset': 'latest', 'partition': 0}, 0, (b) => {
            const end = this._consumer.position(null);
            this._logger.debug('end', end, b);
            this._seekToEarliestAndConsume();
        });
    }

    private _seekToEarliestAndConsume() {
        this._consumer.seek({'topic': this._topic.topicId, 'offset': 'earliest', 'partition': 0}, 0, () => {
            this._consumer.consume();
            this._startUpSequence.next(true);
            this._startUpSequence.complete();
        });
    }
}

我得到输出:

[2019-08-01T13:13:15.156] [DEBUG] kafka-reader - before seek end []
[2019-08-01T13:13:15.157] [DEBUG] kafka-reader - end [] { Error: Local: Erroneous state
    at Function.createLibrdkafkaError [as create] (node_modules/node-rdkafka/lib/error.js:334:10)
    at node_modules/node-rdkafka/lib/kafka-consumer.js:229:26
  message: 'Local: Erroneous state',
  code: -172,
  errno: -172,
  origin: 'kafka' }

看起来第二次搜索工作如期,但第一次给我 Local: Erroneous state .
有人知道最后一条信息的位置吗?
为什么会扔 Local: Erroneous state 当试图寻找最新的?
seek要求指定分区,是否有其他分区?
谢谢。

wpx232ag

wpx232ag1#

不确定为什么会出现此错误,但是否应该改用:

queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker. This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

从这里开始:https://blizzard.github.io/node-rdkafka/current/kafkaconsumer.html
扬尼克

相关问题