kafka出站适配器没有将主题值公开为springbean

pod7payv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(303)

我已经成功地将kafka出站通道适配器与固定主题名集成。现在,我想使主题名可配置,因此,希望通过应用程序属性公开它。
application.properties包含以下条目之一:

kafkaTopic:testNewTopic

我的配置类如下所示:

@Configuration
@Component
public class KafkaConfig {

    @Value("${kafkaTopic}")
    private String kafkaTopicName;

    @Bean
    public String getTopic(){
    return kafkaTopicName;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // set more properties
        return new DefaultKafkaProducerFactory<>(props);
    }
}

在si-config.xml中,我使用了以下内容(例如:topic=“gettopic”):

<int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
        auto-startup="true" sync="true" channel="inputToKafka" topic="getTopic">
    </int-kafka:outbound-channel-adapter>

但是,当通过bean公开时,配置无法获取主题名称。但是当我硬编码主题名的值时,它工作得很好。
有人能告诉我我做错了什么吗?
topic 在kafka出站通道内接受被称为bean的值?
每个使用我的实用程序的应用程序都会提供不同的kafka主题名,因此如何将其外部化

cwxwcias

cwxwcias1#

这个 topic 属性表示字符串值。
但是,它支持属性占位符解析:

topic="${kafkaTopic}"

以及上述bean的spel评估:

topic="#{getTopic}"

因为xml解析器配置允许这样做。
但是你要注意 KafkaTemplate ,注入 <int-kafka:outbound-channel-adapter>defaultTopic 财产。因此,您不必担心xml。
另外一个可用的选项是springintegrationannotations配置。您可以在其中定义 @ServiceActivator 对于
KafkaProducerMessageHandler @Bean :

@ServiceActivator(inputChannel = "inputToKafka")
@Bean
KafkaProducerMessageHandler kafkaOutboundChannelAdapter() {
     kafkaOutboundChannelAdapter adapter = new kafkaOutboundChannelAdapter( kafkaTemplate());
     adapter.setSync(true);
     adapter.setTopicExpression(new LiteralExpression(this.kafkaTopicName));
     return adapter;
}

相关问题