我可以限制kafka节点消费者的消费吗?

0s0u357o  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(567)

好像我的Kafka节点消费者:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

在某些情况下,我无法处理太多的消息。有没有办法限制它(例如每秒接受不超过1000条消息,可能使用pause api?)
我使用的是kafka节点,与java版本相比,它的api似乎有限

h5qlskok

h5qlskok1#

据我所知,api没有任何类型的节流。但是两个使用者(consumer和highlevelconsumer)都有一个'pause()'函数。所以你可以停止消费,如果你有太多的信息。也许这已经提供了你所需要的。
请记住发生了什么事。您向代理发送一个fetch请求并获得一批消息。您可以配置要获取的消息的最小和最大大小(根据文档而不是消息的数量):

{
    ....
    // This is the minimum number of bytes of messages that must be available to give a response, default 1 byte 
    fetchMinBytes: 1,

    // The maximum bytes to include in the message set for this partition. This helps bound the size of the response. 
     fetchMaxBytes: 1024 * 1024,
 }
iqih9akk

iqih9akk2#

在Kafka,投票和过程应该以协调/同步的方式进行。也就是说,在每次投票之后,您应该先处理所有接收到的数据,然后再进行下一次投票。此模式将自动将消息数限制到客户端可以处理的最大吞吐量。
类似这样的内容(伪代码):

while(isRunning) {
  messages = poll(...)
  for(m : messages) {
    process(m);
  }
}

(这就是为什么没有参数“fetch.max.messages”——您不需要它的原因。)

mrphzbgm

mrphzbgm3#

来自自述中的常见问题
创建 async.queue 具有消息处理器和并发性(消息处理器本身用 setImmediate 使其不会冻结事件循环)
设置 queue.drainresume() 消费者
消费者的消息事件的处理程序 pause() 消费者并将消息推送到队列。

kiz8lqtg

kiz8lqtg4#

我也遇到过类似的情况,我正在使用来自kafka的消息,并且不得不限制使用,因为我的消费服务依赖于第三方api,而第三方api有自己的限制。
我曾经 async/queue 还有一包 async/cargo 打电话 asyncTimedCargo 用于配料。cargo从kafka消费者那里获取所有消息,并在达到大小限制时将其发送到队列 batch_config.batch_size 或超时 batch_config.batch_timeout . async/queue 提供 saturated 以及 unsaturated 如果队列任务工作线程正忙,则可以使用回调来停止使用。这将阻止货物从填补和你的应用程序不会用尽内存。一旦不饱和,消费就会恢复。

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks, callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    }, batch_config.batch_size, batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,
    errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
}, function (task, callback) {
// your worker task for queue
  callback()
}), queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};

相关问题