我想在Apache Camel中的单个路由中有效地使用来自多个MQTT主题的事件。Paho MQTT组件目前似乎只支持一个主题,而我希望你可以像Kafka组件一样包含多个主题,用逗号分隔。
Kafka组件路径参数
例如,假设您有以下主题:
- a/B/c0
- a/B/c1
- a/B/..
- a/B/c40
但你对除a/b/c40以外的所有主题都感兴趣。
一种选择是使用MQTT主题通配符并从主题a/B/c40中删除事件。但问题是c40占用了大量的带宽(95+ %),应该避免这种额外的负载。
另一种选择是为每个主题创建一个消费者路由,并将这些路由连接到同一个目的地路由。然而,创建和管理40个mqtt客户机对我来说似乎并不理想,或者开销可以忽略不计?
第三个选项是在启动路由之前使用client.subscribe(topiclist)初始化MQTT客户端,然后通过高级选项'client'传递MqttClient。Paho组件客户端选项但是我不确定何时应该执行client.connect(connectOptions)方法。如果在启动路由之前没有执行这个connect方法,我会收到一个“client not connected”错误。但是,当我在启动路由之前执行它时,路由似乎不再接收所有消息。
你认为哪一个选择是最好的?在第三个选项的情况下,什么时候应该调用client.connect方法?在开始路线之前/之后,在路线中...?
1条答案
按热度按时间jdgnovmf1#
在我的情况下,我选择了第二个选择。不幸的是,Camel上的Paho MQTT客户端不支持同一路由中的多个主题。几个月前我遇到过这个问题,我有来自多个(10多个)生产线的MQTT数据,每行大约5-6个主题,消息大小适中(不记得确切的大小,但每条消息大约2 - 3 KB)。
我创建了一个配置CSV和一个小gawk脚本来生成camel蓝图,这使得路由的管理非常简单。我在每条生产线上使用了一个camel上下文。此数据稍后在一个目标主题中处理。性能不是问题,应用程序能够处理每10 - 15毫秒传入的消息。
在处理高频大有效载荷时,我通常会记住两个设置:
1.根据您的需要,将maxInflight属性设置为较高的值,例如500、1000。这确保不会由于太多的飞行中消息而丢失消息。
1.使用较低的qos,如0或1。这确保了代理不会备份太多,并且消息丢失/未传递的情况非常罕见。
只有当有这么多的路线和上下文时,它们才需要一些时间来启动,但这不是你必须经常做的事情,所以这是可以接受的。