9 SpringBoot整合RocketMQ实现顺序消息

x33g5p2x  于2021-09-19 转载在 Spring  
字(1.7k)|赞(0)|评价(0)|浏览(1041)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;

有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;

RocketMQTemplate给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:

syncSendOrderly,发送同步顺序消息;

asyncSendOrderly,发送异步顺序消息;

sendOneWayOrderly,发送单向顺序消息;

一般我们用第一种发送同步顺序消息;

第三个参数hashKey,方法点进去:

因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,产品ID作为参数值;

发送到一个队列,这样方便搞顺序队列;

以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;

下面给具体事例,模拟两个订单发送消息;

消息生产者端:

  1. /** * 发送同步顺序消息 */
  2. public void sendOrderlyMessage(){
  3. // hashKey用来计算决定消息发送到哪个消息队列 一般是订单ID,产品ID等
  4. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
  5. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
  6. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");
  7. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
  8. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
  9. rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
  10. }

消费者端:

  1. /** * 消息消费者 * @author java1234_小锋 * @site www.java1234.com * @company 南通小锋网络科技有限公司 * @create 2021-08-22 22:40 */
  2. @RocketMQMessageListener(topic = "java1234-rocketmq-orderly",consumerGroup ="${rocketmq.consumer.group}",consumeMode =ConsumeMode.ORDERLY )
  3. @Component
  4. public class ConsumerService implements RocketMQListener<String> {
  5. @Override
  6. public void onMessage(String s) {
  7. System.out.println("消费者:收到消息内容:"+s);
  8. }
  9. }

运行测试:

相关文章