ActiveMQ 的独占消费模式

x33g5p2x  于2021-12-19 转载在 其他  
字(1.3k)|赞(0)|评价(0)|浏览(578)

本文主要介绍一下ActiveMQ消息独占模式

1、消息独占模式(Exclusive Consumer)

Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。 ActiveMQ从4.x版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。 Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。 可以通过Destination Options 来创建一个Exclusive Consumer,如下:
Java代码

  1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
  2. consumer = session.createConsumer(queue);
    顺便说一下,可以给consumer设置优先级,以便针对网络情况(如network hops)进行优化,如下:
    Java代码
  3. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true &consumer.priority=10");

2、支持负载均衡的消息的顺序消费思路

       (1)以一个订单产生为例,用户进行下单时会产生订单,订单生成成功,则需要减库存等一系列操作。类似上述场景都需要消息顺序高效的执行。

       (2)如果使用独占消费者模式可以解决顺序消费的问题,但是如果当订单量非常庞大时,每次消息中间上的数据只能交给一个消费者去处理,这样效率就会很低。

       (3)解决这个问题的思路是,可以在消息中间件(Broker)与Consumer之间添加一个集群(cluster,代理服务器角色)该集群主要目的用于将broker上的消息进行排队,该cluster的大致设计是可以创建一个ConcurrentHashMap的队列,队列的key可以使用String类型的订单ID,value可以使用一个ConcurrentLinkedQueue的队列。这样每次通过一些业务逻辑处理,将代理服务器的消息推送给每一个Consumer(这里的Consumer也可以是一个集群),对于每一个Consumer而言,在处理一个订单数据时,每处理完一个订单里的一条消息时都要将消息的返回结果通知代理服务器,代理服务器得知订单中消息的处理情况后,就可以将同一个订单中的第二条数据推送给另外一个Consumer,第二个Consumer的处理逻辑与第一个的相同。这样处理方式可以保证每一个订单的数据消息都是顺序执行的。在这个期间对于第一个Cosumer而言,还可以并发处理第二个订单中的消息。对于整个系统而言,顺序处理消息的时间可能没有特别大的变化,但是对于整个系统而言,系统的吞吐量变大了。

相关文章