java 如何在Spring集成中自定义MqttSubscription?

v09wglhw  于 2024-01-05  发布在  Java
关注(0)|答案(1)|浏览(102)

我在Spring Integration中使用org.eclipse.paho.mqttv5.client,并尝试在mqtt中设置no local选项,如下所示:

@Bean
    public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
        Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
                clientManager,
                "test"
        );
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        adapter.connectComplete(true);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

字符串
但是Mqttv5PahoMessageDrivenChannelAdapter没有办法设置MqttSubscription(它有mqtt的no-local配置)
Mqttv5PahoMessageDrivenChannelAdapter类中,它有一个方法subscribe

private void subscribe() {
        var clientManager = getClientManager();
        if (clientManager != null && this.mqttClient == null) {
            this.mqttClient = clientManager.getClient();
        }

        String[] topics = getTopic();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        this.topicLock.lock();
        try {
            if (topics.length == 0) {
                return;
            }

            int[] requestedQos = getQos();
            MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
                    .mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
                    .toArray(MqttSubscription[]::new);
            IMqttMessageListener listener = this::messageArrived;
            IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
                    .mapToObj(t -> listener)
                    .toArray(IMqttMessageListener[]::new);
            this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
                    .waitForCompletion(getCompletionTimeout());
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            logger.debug(message);
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
            }
        }
        catch (MqttException ex) {
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
            }
            logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
        }
        finally {
            this.topicLock.unlock();
        }
    }


但它只使用参数topicqos创建MqttSubscriptionMqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);

ni65a41a

ni65a41a1#

这是我们在引入MQTT v5支持时所忽略的。
看起来我们必须引入类似于基于MqttSubscription的构造函数的东西,作为普通topic及其qos的替代选项。这样您就可以为每个订阅进行细粒度配置。
请提出GH问题,我们将在下一个Spring Integration版本中解决它。
作为一种变通方法,我只能建议直接使用Paho API。可以使用自定义MessageProducerSupport impl将其与项目中的其余集成流连接起来。

相关问题