当主题不可用时,我需要禁用Kafka制作者的自动主题创建。我使用的是Sping Boot 2.7.6,这意味着spring-kafka版本2.8.11
我为Kafka消费者找到了类似的属性-ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,它工作正常。
如果主题不可用,我希望Spring在消息发送过程中抛出异常。目前,主题是自动创建的。
消费者:
@Bean
fun kafkaConsumerFactory(): ConsumerFactory<String?, String?> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.GROUP_ID_CONFIG] = groupId
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = false
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
制作人:
@Bean
fun kafkaProducerFactory(): ProducerFactory<String?, String?> {
val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
这是不可能的,还是我错过了什么?谢谢
1条答案
按热度按时间ryevplcw1#
Kafka producer没有这样的属性。您必须使用Broker属性:来源:https://kafka.apache.org/documentation/#brokerconfigs