Spring Boot Spring集成Mqtt:目标解析异常:没有可用的输出通道或回复通道标头

watbbzwu  于 2022-11-05  发布在  Spring
关注(0)|答案(1)|浏览(613)

请有人帮助我了解此配置中的问题所在:版本:

  • 软件框架。集成:spring-集成-mqtt:5.5.2
  • 引导程序:Spring引导启动程序:2.5.3
  • 请访问:www.eclipse.paho.com
@Configuration
public class MqttConfig {
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
       DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
       MqttConnectOptions options = new MqttConnectOptions();
       options.setServerURIs(new String[] { "tcp://localhost:1883" });
       return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory clientFactory) {
        return new MqttPahoMessageDrivenChannelAdapter("MyApp", clientFactory, "ReplyTopic");
    }

    @Bean
    IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter) {
        return IntegrationFlows.from(inboundAdapter)
                               .bridge()
                               .channel("replyChannel")
                               .get();
    }

    @Bean
    public MessageChannel replyChannel() {       
        return MessageChannels.publishSubscribe().get();;
    }

   @Bean
   public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory clientFactory) {
        return new MqttPahoMessageHandler("MyApp", clientFactory);
   }

   @Bean
   public IntegrationFlow outboundFlow(MqttPahoMessageHandler outboundAdapter) {
        return IntegrationFlows.from("requestChannel")
                               .handle(outboundAdapter).get()
   }

   @MessagingGateway
   public interface MyGateway {
        @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
        String send(String request, @Header(MqttHeaders.TOPIC) String requestTopic);
   }
}

客户代码

@RestController
public class MyController {
    @Autowired
    private MyGateway myGateway;

    @GetMapping("/sendRequest")
    public String sendRequest() {
       var response = myGateway.send("Hello", "MyTopic");
       return response;
    }
}

用法:

curl http://localhost:8080/sendRequest

来自mqtt代理的手动响应(HiveMQ)

docker exec -it hivemq mqtt pub -t ReplyTopic -m "World" --debug
CLIENT mqttClient-MQTT_5_0-9ecded84-8416-4baa-a8f3-d593c692bc65: acknowledged PUBLISH: 'World' for PUBLISH to Topic:  ReplyTopic

但是我不知道为什么我在Spring应用程序输出中会有这样的消息

2022-10-25 18:04:33.171 ERROR 17069 --- [T Call: MyApp] .m.i.MqttPahoMessageDrivenChannelAdapter : Unhandled exception for GenericMessage [payload=World, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=9dbd5e14-66ed-5dc8-6cea-6d04ef19c6cc, mqtt_receivedTopic=ReplyTopic, mqtt_receivedQos=0, timestamp=1666713873170}]

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@6f63903c]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

有人能解释一下为什么我有这个吗?

no output-channel or replyChannel header available
qc6wkl3g

qc6wkl3g1#

我认为您所面临的问题与您的bridge()配置无关。
这来自于MessagingGatewaySupport及其replyMessageCorrelator功能,该功能由您的replyChannel = "replyChannel"激活。
真实的的问题是,您正在尝试做MQTT v3不可能做的事情。没有通过MQTT代理传输的头来执行网关启动器所需的相关密钥-TemporaryReplyChannel。请参阅有关网关的文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway。
换句话说就是:replyChannel标头必须出现在回复消息中,这与网关上的replyChannel配置无关。这是网关将请求与回复关联的方式。
你必须寻找一个聚合器来并行发送请求消息,并保留上面提到的TemporaryReplyChannel头。然后当你收到一个回复(inboundAdapter)时,你把它发送到这个聚合器。你需要确保请求和回复有效负载中的一些相关键,这样它们就可以匹配并满足要发送回网关的回复的组。
更多信息请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

相关问题