我的消费者就是这样初始化的:
const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
autoCommit: false
})
现在是消费者 consumer.on('message', message => applyMessage(message))
问题是 applyMessage
使用 knex
,代码类似于:
async function applyMessage(message: kafka.Message) {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
}
上面的代码使 applyMessage
对kafka中的所有消息并行执行,因此在上面的代码中,假设数据库中还没有用户, usersCount
即使是来自Kafka的第二条消息,也将始终为0,自第一次调用后,该消息应为1 applyMessage
插入用户。
我如何“同步”的方式,所有的代码 applyMessage
函数是否按顺序运行?
1条答案
按热度按时间xzv2uavs1#
您需要实现某种互斥。基本上是一个将要同步执行的事情排队的类。例子
为了让这个工作,你的
applyMessage
函数(处理kafka消息的函数)需要返回Promise
-另请注意,async已从父函数移到返回的promise函数:最后,每次调用
applyMessage
需要添加到互斥队列,而不是直接调用: