java/kotlin kafka消费者在web应用程序中使用多个示例(容器)

vsikbqxv  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(341)

对于我的springbootweb应用程序,在应用程序启动之后,我想调用一个类的方法来保持它运行直到应用程序关闭。
但是,如果我的应用程序部署到kubernetes集群,那么它通常会有多个web应用程序示例,比如说4个示例(因此我假设将有4个web应用程序示例) KafkaConsumer 以及)。我想知道下面的代码是线程安全的还是可以水平伸缩的?
当我的应用程序停止时,我得到如下警告: kafka consumer is not safe for multithreaded access ```
@Component
class KafkaConsumerService : ApplicationRunner {
private lateinit var kafkaConsumer: KafkaConsumer<String, String>

init {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
props[ConsumerConfig.GROUP_ID_CONFIG] = "AnotherDemoConsumer"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name

  1. kafkaConsumer = KafkaConsumer(props)

}

override fun run(args: ApplicationArguments?) {
receiveFromKafka()
}

fun receiveFromKafka() {
kafkaConsumer.subscribe(listOf("test-topic"))

  1. while (true) {
  2. val consumerRecords = kafkaConsumer.poll(3000)
  3. consumerRecords.forEach { record ->
  4. logger.info("Receive Kafka message having key: ${record.key()}, value: ${record.value()}, " +
  5. "partition: ${record.partition()}, offset: ${record.offset()}")
  6. }
  7. }

}
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题