kafka节点异步使用者处理程序

9w11ddsr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(246)

我的消费者就是这样初始化的:

const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
    autoCommit: false
})

现在是消费者 consumer.on('message', message => applyMessage(message)) 问题是 applyMessage 使用 knex ,代码类似于:

async function applyMessage(message: kafka.Message) {
    const usersCount = await db('users').count()
    // just assume we ABSOLUTELY need to calculate a number of users,
    // so we need previous state
    await db('users').insert(inferUserFromMessage(message))
}

上面的代码使 applyMessage 对kafka中的所有消息并行执行,因此在上面的代码中,假设数据库中还没有用户, usersCount 即使是来自Kafka的第二条消息,也将始终为0,自第一次调用后,该消息应为1 applyMessage 插入用户。
我如何“同步”的方式,所有的代码 applyMessage 函数是否按顺序运行?

xzv2uavs

xzv2uavs1#

您需要实现某种互斥。基本上是一个将要同步执行的事情排队的类。例子

var Mutex = function() {
  this.queue = [];
  this.locked = false;
};

Mutex.prototype.enqueue = function(task) {
  this.queue.push(task);
  if (!this.locked) {
    this.dequeue();
  }
};

Mutex.prototype.dequeue = function() {
  this.locked = true;
  const task = this.queue.shift();
  if (task) {
    this.execute(task);
  } else {
    this.locked = false;
  }
};

Mutex.prototype.execute = async function(task) {
  try { await task(); } catch (err) { }
  this.dequeue();
}

为了让这个工作,你的 applyMessage 函数(处理kafka消息的函数)需要返回 Promise -另请注意,async已从父函数移到返回的promise函数:

function applyMessage(message: kafka.Message) {
  return new Promise(async function(resolve,reject) {
    try {
      const usersCount = await db('users').count()
      // just assume we ABSOLUTELY need to calculate a number of users,
      // so we need previous state
      await db('users').insert(inferUserFromMessage(message))
      resolve();
    } catch (err) {
      reject(err);
    }
  });
}

最后,每次调用 applyMessage 需要添加到互斥队列,而不是直接调用:

var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))

相关问题