java Spring Boot Kafka制作人禁用主题自动创建

w8rqjzmb  于 2022-12-10  发布在  Java
关注(0)|答案(1)|浏览(262)

当主题不可用时,我需要禁用Kafka制作者的自动主题创建。我使用的是Sping Boot 2.7.6,这意味着spring-kafka版本2.8.11
我为Kafka消费者找到了类似的属性-ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,它工作正常。
如果主题不可用,我希望Spring在消息发送过程中抛出异常。目前,主题是自动创建的。
消费者:

@Bean
fun kafkaConsumerFactory(): ConsumerFactory<String?, String?> {
    val props: MutableMap<String, Any> = HashMap()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
    props[ConsumerConfig.GROUP_ID_CONFIG] = groupId
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
    props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = false
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
    props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
    props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = StringDeserializer::class.java
    return DefaultKafkaConsumerFactory(props)
}

制作人:

@Bean
fun kafkaProducerFactory(): ProducerFactory<String?, String?> {
    val configProps: MutableMap<String, Any> = HashMap()
    configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
    configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    return DefaultKafkaProducerFactory(configProps)
}

这是不可能的,还是我错过了什么?谢谢

ryevplcw

ryevplcw1#

Kafka producer没有这样的属性。您必须使用Broker属性:来源:https://kafka.apache.org/documentation/#brokerconfigs

相关问题