我在Spring Integration中使用org.eclipse.paho.mqttv5.client
,并尝试在mqtt中设置no local
选项,如下所示:
@Bean
public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
clientManager,
"test"
);
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
adapter.connectComplete(true);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
字符串
但是Mqttv5PahoMessageDrivenChannelAdapter
没有办法设置MqttSubscription
(它有mqtt的no-local配置)
在Mqttv5PahoMessageDrivenChannelAdapter
类中,它有一个方法subscribe
:
private void subscribe() {
var clientManager = getClientManager();
if (clientManager != null && this.mqttClient == null) {
this.mqttClient = clientManager.getClient();
}
String[] topics = getTopic();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
this.topicLock.lock();
try {
if (topics.length == 0) {
return;
}
int[] requestedQos = getQos();
MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
.mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
.toArray(MqttSubscription[]::new);
IMqttMessageListener listener = this::messageArrived;
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
}
}
catch (MqttException ex) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
}
finally {
this.topicLock.unlock();
}
}
型
但它只使用参数topic
和qos
创建MqttSubscription
:MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);
1条答案
按热度按时间ni65a41a1#
这是我们在引入MQTT v5支持时所忽略的。
看起来我们必须引入类似于基于
MqttSubscription
的构造函数的东西,作为普通topic
及其qos
的替代选项。这样您就可以为每个订阅进行细粒度配置。请提出GH问题,我们将在下一个Spring Integration版本中解决它。
作为一种变通方法,我只能建议直接使用Paho API。可以使用自定义
MessageProducerSupport
impl将其与项目中的其余集成流连接起来。