kafka消费集群环境偏移量

ih99xse1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(368)

我正在尝试让x个用户访问Kafka中的某个特定主题,但不使用相同的消息。我想举个例子。。。
耗电元件1拾取偏移1耗电元件2拾取偏移2耗电元件1拾取偏移3耗电元件2拾取偏移4
我想让Kafka为这两个消费者排队。我注意到group.id配置,我假设您可以使用同一个组,它会相应地处理它,但它似乎没有按我想象的方式工作。
这是我正在使用的代码。。。

public void init(){
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaUrl);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("group.id", "group1");
            props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("event1", "event2"));

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
     }

     public void pollTopics() {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
                if(processor != null) {
                    kafkaThreadPool.execute(processor);
                }
            }
        }catch (Exception e){
            LOG.error("Polling exception occurred", e);
        }
    }

我希望能够在集群环境中运行此代码,并让kafka成为队列。我希望它拉信息,并前往下一个偏移量在同一时间,然后下一个Kafka民意测验将抓住下一个偏移量。这可能吗?如果是这样,我做错了什么?

rur96b6h

rur96b6h1#

这在Kafka是不可能的。
如果使用使用者组,则单个分区只能由单个使用者读取。因此,kafka确实是按分区扩展的,也就是说,如果您希望有多个使用者(读取不同的数据),那么每个使用者至少需要一个分区。如果分区多于使用者,则某些(或所有)使用者将同时读取多个分区。
您的解决方案是,创建一个具有多个分区的主题(或者使用多个主题并让组中的所有使用者订阅一个主题)。

相关问题