当我用Kafka创建消费者时,我得到了以下例外:
例外情况:
kafka使用者对于多线程访问异常不安全
100多名消费者同时订阅主题,同时获得记录。
我添加了以下代码
//KafkaConsumer Configuration
KafkaConsumer<String, Object> kafkaConsumer;
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put("group.id", GROUP_ID);
properties.put("enable.auto.commit", "false");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", JsonDeserializer.class);
properties.put("value.deserializer", JsonDeserializer.class);
kafkaConsumer = new KafkaConsumer<>(properties);
//Subscribe Consumer
kafkaConsumer.subscribe(Arrays.asList("Topic_Name"));
//Get Records From Beginning
while (true) {
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
record.key(), record.value());
}
}
//Get Records Using WebSocket this section continuously
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
record.key(), record.value());
}
暂无答案!
目前还没有任何答案,快来回答吧!