如何从消息头获取主题名并发布

ztmd8pv5  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(356)

我对spring集成还不熟悉。我正在尝试构建一个springrest服务,在这里它将获得带有json消息的任何http请求,并发布到kafka主题。
我的json消息将通过requestbody发布,requestbody将在消息头中包含主题名称。
我可以将来自控制器的消息发布到kafka频道,但是我很难从json消息头中获取主题名。
有没有人能建议一种方法,从我的消息头中获取主题名(通常一个http请求包含一个带有主题名的json消息)并使用该主题发布消息。
我的json:

{"resourceType": "MessageHeader",
"topicName": "testToptic",
"messagePayload":{
    "location": "chennai",
    "messageDetail": {
        "department-id": 123,
        "department-name": "SSS",
        "pincode": 600009
    }
}
}}

这是我的豆子和处理程序

@Bean
public IntegrationFlow hanldeGenericKafka() {
    return IntegrationFlows.from(sendToKafkaChannel)

            .handle(
                    kafkaGenericMessageHandler(producerFactory),
                    e -> e.id("kafkaProducer2"))
            .get();
}

public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaGenericMessageHandler(
        ProducerFactory<String, String> producer) {

    return Kafka
            .outboundChannelAdapter(producer)
            .sync(true)
            .headerMapper(kafkaDefaultHeaderMapper())
            .messageKey(m -> m.getHeaders()
                    .get("topicname"))
            .configureKafkaTemplate(t -> t.id("kafkaTemplate"));
}
j91ykkif

j91ykkif1#

可以使用带有内置jsonpath spel函数的表达式从json负载中提取字段值。
在适配器的 .topicExpression() .

相关问题