我想让Kafka同时读信息,
我正在用两个分区运行我的主题:
Topic:Tokens_Activity_Sandbox PartitionCount:2 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: Tokens_Activity_Sandbox Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: Tokens_Activity_Sandbox Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
做了个小测试客户。
平行发送时 sleep
对于主题的命令(每个分区1个),客户机逐个读取它们,顺序地而不是并行地(一旦进入睡眠状态)。
我做错什么了?
const topic = 'Tokens_Activity_Sandbox';
const { Kafka } = require('kafkajs');
const util = require('util');
const sleep = util.promisify(setTimeout);
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.0.135:9092'],
});
const groupId = 'test-group2333';
const admin = kafka.admin({
retry: { retries: 0 },
});
const producer = kafka.producer({ allowAutoTopicCreation: false });
const consumer = kafka.consumer({
allowAutoTopicCreation: false,
groupId: groupId,
retry: { retries: 1, restartOnFailure: async () => false },
});
if (process.env.CREATE === 'true') {
(async () => {
await admin.createTopics({
topics: [
{
topic: topic,
numPartitions: 2, // default: 1
},
],
});
})();
}
process.on('SIGINT', async () => {
// eslint-disable-next-line no-undef
await consumer.disconnect();
});
const eachMessageF = async ({ topic, partition, message }) => {
console.log('incoming message', {
topic,
partition,
offset: message.offset,
value: message.value.toString(),
});
if (message.value.toString() === 'sleep') {
console.log('going to sleep');
await sleep(5000);
console.log('woke up from sleep');
}
console.log(`done processing ${message.value.toString()}`);
};
const run = async () => {
// admin
await admin.connect();
await producer.connect();
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: topic, fromBeginning: false });
await consumer
.run({
partitionsConsumedConcurrently: 2,
eachMessage: async ({ topic, partition, message }) => {
try {
await eachMessageF({ topic, partition, message });
} catch (e) {
console.log('caught th error <3');
}
},
})
.catch((err) => {
console.log('caught err in catch of consumer.run');
throw err;
});
};
run()
这将是运行的输出
incoming message {
topic: 'Tokens_Activity_Sandbox',
partition: 1,
offset: '4',
value: 'sleep'
}
going to sleep
woke up from sleep
done processing sleep
incoming message {
topic: 'Tokens_Activity_Sandbox',
partition: 0,
offset: '6',
value: 'sleep'
}
going to sleep
woke up from sleep
done processing sleep
暂无答案!
目前还没有任何答案,快来回答吧!