请有人帮助我了解此配置中的问题所在:版本:
- 软件框架。集成:spring-集成-mqtt:5.5.2
- 引导程序:Spring引导启动程序:2.5.3
- 请访问:www.eclipse.paho.com
@Configuration
public class MqttConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://localhost:1883" });
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageDrivenChannelAdapter("MyApp", clientFactory, "ReplyTopic");
}
@Bean
IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter) {
return IntegrationFlows.from(inboundAdapter)
.bridge()
.channel("replyChannel")
.get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.publishSubscribe().get();;
}
@Bean
public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageHandler("MyApp", clientFactory);
}
@Bean
public IntegrationFlow outboundFlow(MqttPahoMessageHandler outboundAdapter) {
return IntegrationFlows.from("requestChannel")
.handle(outboundAdapter).get()
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
String send(String request, @Header(MqttHeaders.TOPIC) String requestTopic);
}
}
客户代码
@RestController
public class MyController {
@Autowired
private MyGateway myGateway;
@GetMapping("/sendRequest")
public String sendRequest() {
var response = myGateway.send("Hello", "MyTopic");
return response;
}
}
用法:
curl http://localhost:8080/sendRequest
来自mqtt代理的手动响应(HiveMQ)
docker exec -it hivemq mqtt pub -t ReplyTopic -m "World" --debug
CLIENT mqttClient-MQTT_5_0-9ecded84-8416-4baa-a8f3-d593c692bc65: acknowledged PUBLISH: 'World' for PUBLISH to Topic: ReplyTopic
但是我不知道为什么我在Spring应用程序输出中会有这样的消息
2022-10-25 18:04:33.171 ERROR 17069 --- [T Call: MyApp] .m.i.MqttPahoMessageDrivenChannelAdapter : Unhandled exception for GenericMessage [payload=World, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=9dbd5e14-66ed-5dc8-6cea-6d04ef19c6cc, mqtt_receivedTopic=ReplyTopic, mqtt_receivedQos=0, timestamp=1666713873170}]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@6f63903c]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
有人能解释一下为什么我有这个吗?
no output-channel or replyChannel header available
1条答案
按热度按时间qc6wkl3g1#
我认为您所面临的问题与您的
bridge()
配置无关。这来自于
MessagingGatewaySupport
及其replyMessageCorrelator
功能,该功能由您的replyChannel = "replyChannel"
激活。真实的的问题是,您正在尝试做MQTT v3不可能做的事情。没有通过MQTT代理传输的头来执行网关启动器所需的相关密钥-
TemporaryReplyChannel
。请参阅有关网关的文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway。换句话说就是:
replyChannel
标头必须出现在回复消息中,这与网关上的replyChannel
配置无关。这是网关将请求与回复关联的方式。你必须寻找一个聚合器来并行发送请求消息,并保留上面提到的
TemporaryReplyChannel
头。然后当你收到一个回复(inboundAdapter
)时,你把它发送到这个聚合器。你需要确保请求和回复有效负载中的一些相关键,这样它们就可以匹配并满足要发送回网关的回复的组。更多信息请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator