我想在node.js项目中使用kafkajs。让我看看我的密码。
制作人:
const producer = kafka.producer();
await producer.connect();
const items = await getItems(); // getting somehow 5k items to produce
await producer.send({
topic: "items",
messages: items.map(c => ({ value: JSON.stringify(c) })),
});
// even if I split here on chunks like this, in consumer I get batch with more than 100 items
/*
const chunked = _.chunk(items, 100);
for (var chunk of chunked) {
await producer.send({
topic: config.kafka.topics.tm.itemsToParse,
messages: chunk.map(c => ({ value: JSON.stringify(c) })),
headers: { from: "csv_parser" },
});
}
* /
消费者:
const consumer = kafka.consumer({ groupId: "groupId" });
await consumer.connect();
await consumer.subscribe({ topic: "items" });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (chunk of _.chunk(batch.messages, 100)) {
if (!isRunning()) break;
// I can handle by 100 items
let items = chunk.map(m => JSON.parse(m.value));
/*
handle items somehow, max ~30 sec
*/
for (message of chunk) resolveOffset(message.offset); // took from example in kafkajs docs
await heartbeat();
}
},
});
一个示例最多可以处理100个项目。所以现在如果一个消费者拿走这批5千件商品,其他人也不会拿走(来自同一个groupid,并行处理)。问题是:
从许多消费者那里同时读取批是真的吗?
我是否可以将使用者配置为批量使用已定义的邮件计数?
生产商是否应以正确的批量大小(按100项)发送批次?=>生产者必须适应消费者?
如果我需要确定批量大小,如何从生产商处正确发送批量?
暂无答案!
目前还没有任何答案,快来回答吧!