带有pm2群集的nodejs kafka消费者

7uzetpgm  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(542)

我已经实现了kafka消费者应用程序,我只是想知道如果我在pm2集群模式下运行这个应用程序,所有的核心会使用相同的消息还是不同的消息?有什么办法可以证明吗?在集群模式下运行这个应用程序理想吗?我之所以在集群模式下运行它,是因为我们的kafka产生了大量的消息。
另外,目前如果我在pm2集群模式下运行,我们所有的核心都达到了100%的cpu使用率。是这样吗?
仅供参考:我正在使用https://www.npmjs.com/package/no-kafka

qyyhg6bp

qyyhg6bp1#

基于pm2的集群只适用于网络服务器,因为集群进程共享传入的网络端口并分发请求。
在您的例子中,数据源是一个消息订阅,必须手动将其分发到集群的工作进程。
因此,为了安全起见,主进程应该与数据源进行交互,并将消息均匀地分配给工作进程,这样在外部,它看起来是一个单一的使用者,但仍然可以处理所有cpu核上的消息。
下面的例子演示了这样一种不依赖于基于pm2的聚类的设置:

const cluster = require('cluster');
const _ = require('lodash');
const os = require('os');

// dispatch index
let dispatchIndex = 0;

/**
 * Dispatches data to workers in a cyclic fashion
 * @param {*} data - data to process
 */
function dispatch(data) {

    // ensure master
    if (!cluster.isMaster) {
        throw new Error('Only master can dispatch');
    }

    // get worker ids, sorted
    const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);

    // ensure at least one worker is available
    if (workersIds.length < 1) {
        throw new Error('No worker process alive');
    }

    // select next worker
    dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
    const worker = cluster.workers[workersIds[dispatchIndex]];
    dispatchIndex++;

    // send data to worker
    worker.send(data);
}

// Main Script
if (cluster.isMaster) {

    // Setup master process
    console.info(`Master ${process.pid} started.`);

    // fork worker processes to match available CPUs
    const numCpu = os.cpus().length;
    for (let i = 0; i < numCpu; i++) {
        cluster.fork();
    }

    //***Get/Subscribe data from external source and dispatch to workers***
    setInterval(() => dispatch({ a: 'value' }), 1000);

} else if (cluster.isWorker) {

    // Setup worker process
    console.info(`Worker ${process.pid} started.`);

    //***handle dispatched data***
    process.on('message', (data) => {
        console.info(`Data processed by ${process.pid}`);
    });
}

阅读集群模块文档也很好。

dced5bon

dced5bon2#

所有内核使用相同的消息还是不同的消息?有什么办法可以证明吗?
这取决于主题配置+使用者配置。举个例子。
假设我们有一个有3个分区的主题。
现在我们开始一个消费过程,消费群体是“某个消费群体”。有关消费群的详细信息,请看这里https://www.npmjs.com/package/no-kafka#groupconsumer-新的统一消费者api。
现在您的一个消费者正在收听3个分区。
因为kafka维护每个主题的偏移量,所以每个消费者组的每个分区您的消费者将从3个不同的分区接收3条消息。因此,没有重复的信息。
现在,让我们在混合中再添加一个消费者流程。
现在,消费者组“some\u consumer\u group”的消费者1正在侦听分区0和1,而消费者组“some\u consumer\u group”的消费者2正在侦听分区2(可能也会反过来)。
最后,如果我们向组中再添加一个消费者,那么现在我们就让每个消费者听一个分区
如果这是设置,您将不会遇到重复的消息。
另外,目前如果我在pm2集群模式下运行,我们所有的核心都达到了100%的cpu使用率。是这样吗?
我不太熟悉Kafka,也不知道这些信息是如何处理的。
但是,请检查库是否在获取下一批消息之前等待提交发生。
如果没有,则可能是您的进程为消息创建了太多处理程序。

相关问题