来自两个分区的kafka消费者

fdbelqdn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(338)

在kafka中,我需要使用java使用两个使用者(分区1到使用者1,分区2到使用者2)的两个分区来使用一个主题。

这是我的制片人代码

public class KafkaClientOperationProducer {

    KafkaClientOperationConsumer kac = new KafkaClientOperationConsumer();

    public void initiateProducer(ClientOperation clientOperation,
        ClientOperationManager activityManager,Logger logger) throws Exception {

    Properties props = new Properties();
    props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, ClientOperation> producer = new KafkaProducer<>(props);

    try{
    ProducerRecord<String, ClientOperation> record = new ProducerRecord<String, ClientOperation>(
            topicName, key, clientOperation);

    producer.send(record);

    }
    finally{
    producer.flush();
    producer.close();
    kac.initiateConsumer(activityManager);//Calling Consumer
    }
}
}

这是我的消费代码

public class KafkaClientOperationConsumer{

    String topicName = "CA_Topic";
    String groupName = "CA_TopicGroup";
    public  void initiateConsumer(ClientOperationManager activityManager) throws Exception {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    props.put("group.id", groupName);
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaConsumer<String, ClientOperation> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topicName));               

        ConsumerRecords<String, ClientOperation> records = consumer.poll(100);
        try{
        for (ConsumerRecord<String, ClientOperation> record : records) {    
            activityManager.save(record.value());//saves data in database

        }}
    finally{
        consumer.close();}
    }
    }

上面的代码对于单个消费者来说工作得很好,而不是对于多个消费者
clientoperation是一个对象,它保存有关客户端操作的数据。
分区号是3(您可以从代码中看到),
当我尝试使用线程调用initiateconsumer时,即…(executorservice executor),我在数据库中得到了重复的值
请更改我的代码,以便我可以使用两个使用者使用ca\ U主题,由于内存问题,我不能使用两个jvm。提前谢谢

yrdbyhpb

yrdbyhpb1#

我想您必须使用kafkaconsumer.assign方法。这里有一个小例子:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
TopicPartition topicPartition = new TopicPartition("topic", 0); // topic name and partition id to be assigned for this consumer. in other consumer configurations this value must be any value other than 0
List<TopicPartition> partitionList = new ArrayList<TopicPartition>();
partitionList.add(topicPartition);
consumer.assign(partitionList); // in this line, 0. partition assigning to this consumer

您可以在Kafka的文档中看到详细信息:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#assign(java.util.collection)集合

相关问题