spring集成kafka支持动态主题创建吗

nhjlsmyf  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(444)

我是一个新手,以Spring集成Kafka和我了解Kafka上界通道适配器。但是,有没有一种方法可以不必在上下文xml中设置而以编程方式创建主题呢?
ie:基于我给transformer的消息,我想将消息发布到为该消息类型创建的kafka主题。
更新:
下面是我最后做的。我们欢迎任何更好的解决方案。

<int:channel id="inputForSolrPublish"></int:channel>

<int:service-activator input-channel="inputForSolrPublish"
    ref="solrMasterListRouter" >

-->

private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
                                              final String brokerList,
                                              final Message<?> message) throws Exception {

    for (String topicName : topicNames) {
        createProducerContext(topicName,
                              brokerList).send(topicName,
                                               message.getHeaders()
                                                      .get(KafkaHeaders.MESSAGE_KEY),
                                               message);

    }

}

private KafkaProducerContext<String, String> createProducerContext(final String topicName,
                                                                   final String brokerList) throws Exception {
    KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
    AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
    // Encoder<String> encoder = new
    // org.springframework.integration.kafka.serializer.common.StringEncoder<String>();

    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    producerMetadata.setValueEncoder(kafkaSpecificEncoder);
    producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
    producerMetadata.setAsync(true);

    Properties props = buildProducerConfigProperties();
    ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
                                                                                           brokerList,
                                                                                           props);
    ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
                                                                                             producer.getObject());
    kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
                                                                            config));
    return kafkaProducerContext;
}

private Properties buildProducerConfigProperties() {
    Properties props = new Properties();
    props.put("topic.metadata.refresh.interval.ms",
              "3600000");
    props.put("message.send.max.retries",
              "5");
    props.put("tsend.buffer.bytes",
              "5242880");
    return props;

}
ozxc1zmp

ozxc1zmp1#

是的,您可以在运行时这样做。看到了吗 TopicUtils.ensureTopicCreated .
你可以像一个 <service-activator> 作为一个以上的订户(第一个)到 <publish-subscribe-channel> 用于发送消息。像这样:

<publish-subscribe-channel id="sendMessageToKafkaChannel"/>

<service-activator input-channel="sendMessageToKafkaChannel" output-channel="nullChannel" order="1"
   ref="creatTopicService" method="creatTopic"/>

<int-kafka:outbound-channel-adapter channel="sendMessageToKafkaChannel" order="2"/>

接受这个事实 creatTopic 整个消息,并从消息中或在注入阶段提取所有必需的参数,例如inject ZookeeperConnect 提取 getZkConnect() 第一次
zkAddress ensureTopicCreated 争论。
但你应该明白你不能 <int-kafka:message-driven-channel-adapter> 没有关于Kafka的现有主题。所以,我不确定将来如何处理这些动态创建的主题中的消息。尽管 <int-kafka:inbound-channel-adapter> 可能对那个案子有用。。。

相关问题