上一篇博文,我们惊心动魄的讲解了 RocketMQ 的安装,接下来我们讲解它的使用。
此篇博文读者可参考文档 https://github.com/apache/rocketmq/tree/master/docs/cn
首先我们需要一个生产者和消费者 ,最好都注册在 nacos 上,如果读者有看过博主的博文,那么可以直接拿以前测试的项目进行测试。
消费者有一个方法能调用控制者。
无论是生产者还是消费者,我们都要添加以下依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
生产者需要配置 rocketMQ服务的地址 , 以及 生产者组名
rocketmq:
name-server: 127.0.0.1:9876 #rocketMQ服务的地址
producer:
group: scz_group # 生产者组
编写测试代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("testNotParamFunction")
public String testNotParamFunction(){
rocketMQTemplate.convertAndSend("scz-topic","testmessage");
return "success";
}
rocketMQTemplate 是 rocketmq 提供的调用接口
scz-topic 是发送到 rocketmq 上对应存放的主题名字
testmessage 是测试用的信息,类型是Object
发送一下请求进行测试:
可以观察到消息有进来。
消费者 需要配置 rocketMQ服务的地址
rocketmq:
name-server: 127.0.0.1:9876 #rocketMQ服务的地址
启动一个监听者监听 rocketMQ 进行消费
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "scz_group", topic = "scz-topic")
public class XfzService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("成功受到消费信息: "+s);
}
}
此类必须实现 RocketMQListener ,里面的泛型对应的是传入 rocketMQ 的消息。
需要填写 consumerGroup 和 topic 。
启动消费者后就会实时监听这个主题,进行消费、
RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
调用同步的方法:
@RequestMapping("testNotParamFunctionSync")
public String testNotParamFunctionSync(){
//arg[0] 主题 arg[1]信息 arg[2] 超时时间
SendResult sendResult = rocketMQTemplate.syncSend("scz_topic", "testmessage", 10000);
log.info(sendResult.toString());
return "success";
}
测试:
成功打印出返回的信息。
代码编写
@RequestMapping("testNotParamFunctionAsSync")
public String testNotParamFunctionAsSync(){
//arg[0] 主题 arg[1]信息 arg[2] 回调类
rocketMQTemplate.asyncSend("scz_topic", "testmessage", new SendCallback() {
//成功响应回调
@Override
public void onSuccess(SendResult sendResult) {
log.info("成功响应回调");
}
@Override
public void onException(Throwable throwable) {
log.info("失败响应回调");
}
});
return "success";
}
测试结果
主要用于发送日志,因为无论是否成功,都不会再去管理它。
@RequestMapping("testNotParamFunctionSendOneWay")
public String testNotParamFunctionSendOneWay(){
//arg[0] 主题 arg[1]信息
rocketMQTemplate.sendOneWay("scz_topic", "testmessage");
return "success";
}
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
我们之前的普通消息,并不能保证消息的一致性,因为虽然rocketMq是先进先出的,但是它一个主题有多个队列,发送消息的时候是分散到多个队列中,所以消费者消费的时候会从多个队列取数据,导致并不是顺序的。
@RequestMapping("testNotParamFunctionSyncSendOrderly")
public String testNotParamFunctionSyncSendOrderly(){
//arg[0] 主题 arg[1]信息 arg[2] 放置一个随机的key就成
rocketMQTemplate.syncSendOrderly("scz_topic", "testmessage","random");
return "success";
}
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务消息交互流程:
两个概念:
事务消息发送步骤:
事务消息回查步骤:
代码编写如下:
首先编写发送事务消息:
@RequestMapping("testNotParamFunctionTransaction")
public String testNotParamFunctionTransaction(){
//arg[0] 组 arg[1]主题 arg[2] 信息 arg[3]参数
rocketMQTemplate.sendMessageInTransaction("scz_transaction_group","transaction_topic", MessageBuilder.withPayload("testmessage").build(),"testmessage");
return "success";
}
继续编写接收到 半事务消息发送成功 ,执行本地事务,以及检查本地事务状态的类
@Service
@RocketMQTransactionListener(txProducerGroup = "scz_transaction_group")
public class TransactionListener implements RocketMQLocalTransactionListener {
//执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行本地事务成功 return RocketMQLocalTransactionState.COMMIT;
//执行本地事务失败 return RocketMQLocalTransactionState.ROLLBACK;
return null;
}
//消息回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//回查消息成功 return RocketMQLocalTransactionState.COMMIT;
//回查消息失败 return RocketMQLocalTransactionState.ROLLBACK;
return null;
}
}
ps:这边的组名会和发送消息时填写的组名相同。
end:RocketMQ 讲解到此结束,过程中发先了一些控制台查询时会报的异常,暂时还没解决,若是解决了会对博文进行相对应的补充。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_29064815/article/details/107444320
内容来源于网络,如有侵权,请联系作者删除!