kafka节点高级生产者只对偶数个分区进行写操作

7vux5j2d  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我正在使用kafka节点库,并测试高级生产者。
我创建了一个包含10个分区的主题“hlptestinput”,并编写了一个函数,每秒生成一个。
生产者写入分区0、2、4、6和8,但不写入奇数分区。
奇怪的是,当我从这个主题消费并生成第二个主题“hlptestinputfromconsumer”时,它有5个分区,所有的分区都会写入消息。
有没有我遗漏的配置?

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    consumer = new ConsumerGroup(
        {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
            groupId: 'testGroup'
        },
        'HLPTestInput'
    );

let index = 0;
setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
    index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});
kx5bkwkv

kx5bkwkv1#

我不太确定,但可能是因为你用同一个制片人写了两个不同的主题。因为highlevelproducer使用循环写。因此,假设您的生产者写入“hlptestinput”主题,然后您将时间间隔设置为1000,因此在此期间,您的消费者收到消息,现在您的生产者写入“hlptestinputfromconsumer”主题。
因此,您的生产者在其分区0、2、4中编写“hlptestinput”主题。。。
以及“hlptestinputfromconsumer”主题的第1、3、5部分。。。
所以我建议你再找一个制作人。那就可以了。
请尝试以下代码:

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    client1 = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    producer1 = new HighLevelProducer(client1),
    consumer = new ConsumerGroup(
       {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
           groupId: 'testGroup'
        },
        'HLPTestInput'
    );
let index = 0;
    setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
   index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer1.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});

相关问题