📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《RabbitMQ发布确认和交换机基础总结与实战》🍣
🍣这是RabbitMQ的发布确认和交换机基础总结与实战🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗
🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣源码地址🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣
一个消息的持久化需要经历的步骤:
Channel channel = RabbitmqUtil.getChannel();
//开启发布确认
channel.confirmSelect();
confirmSelect
,每当需要使用发布确认的时候,都需要调用该方法。waitForConfirms
方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行单个发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量处理消息的个数
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行批量发布确认
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量处理消息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
原理
回调函数
来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。代码
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
ConfirmCallback ackCallback = (var1,var2)->{
System.out.println("已确认的消息" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
System.out.println("未确认的消息" + var1);
};
/** * 1. 确认收到消息的回调函数 * 2. 未确认收到消息的回调函数 */
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
ConcurrentSkipListMap
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/** * 1. 消息标识 * 2. 是否批量处理 */
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已确认的消息是:" + message + " 已确认的消息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未确认的消息
String s = map.get(var1);
System.out.println(s);
System.out.println("未确认的消息" + var1);
};
/** * 1. 确认收到消息的回调函数 * 2. 未确认收到消息的回调函数 */
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 将消息保存到一个线程安全地队列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 单个发布确认
ConfirmMessageIndividually(); // 单个确认发送1000条消息所消耗的时间是680ms
// 批量发布确认
ConfirmMessageBatch(); //批量确认发送1000条消息所消耗的时间是112ms
//异步发布确认
ConfirmMessageAsync(); // 异步确认发送1000条消息所消耗的时间是41ms
// 异步确认发送1000条消息所消耗的时间是33ms
}
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行单个发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量处理消息的个数
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行批量发布确认
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量处理消息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/** * 1. 消息标识 * 2. 是否批量处理 */
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已确认的消息是:" + message + " 已确认的消息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未确认的消息
String s = map.get(var1);
System.out.println(s);
System.out.println("未确认的消息" + var1);
};
/** * 1. 确认收到消息的回调函数 * 2. 未确认收到消息的回调函数 */
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 将消息保存到一个线程安全地队列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
}
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
""
表示的就是默认的无名队列。routingKey
绑定key指定的。临时队列
:一个具有随即名称的队列。一旦我们断开了连接,队列将被自动删除。String queueName = channel.queueDeclare().getQueue();
Fanout
交换机类型1. 两个消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
/** * 1. 队列名称 * 2. 交换机名称 * 3. RoutingKey */
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (var1,var2)->{
System.out.println("ReceiveLogs01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
/** * 1. 队列名称 * 2. 交换机名称 * 3. RoutingKey */
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (var1,var2)->{
System.out.println("ReceiveLogs02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
2. 一个生产者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.Scanner;
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("成功发送消息:" + message);
}
}
}
3. 测试结果
生产者
消费者
fanout
模式下,所有队列都可以收到消息。fanout
交换机的差别在于RoutingKey
的绑定上,它绑定的的多个队列的key
一般是不同的,如果是相同的,那么它表现得就和fanout
有点类似。队列和交换机的绑定关系
1. 生产者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.Scanner;
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));
System.out.println("成功发送消息:" + message);
}
}
}
2. 两个消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 声明队列
channel.queueDeclare("console",false,false,false,null);
// 进行绑定
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsDirect01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume("console",true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 声明队列
channel.queueDeclare("disk",false,false,false,null);
// 进行绑定
channel.queueBind("disk",EXCHANGE_NAME,"error");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsDirect02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume("disk",true,deliverCallback,var1->{});
}
}
3. 测试结果
RoutingKey
的比较,然后才会发送给相应的队列。之前我们Fanout
可以将所有消息发送到所有队列,direct
可以将消息发送到某个队列。但我们假设我们当前有3
个队列,我们想只发送消息到其中的两个队列,那么这就需要Topic
。
topic
的RoutingKey
不能随意写,它必须是一个单词列表,以点号隔开。*(星号)
可以代替一个单词。#(井号)
可以代替零个或多个单词quick.orange.rabbit
:被队列Q1、Q2接收到lazy.orange.elephant
:被队列Q1、Q2接收到quick.orange.fox
:被队列Q1接收到lazy.brown.fox
:被队列Q2接收到lazy.pink.rabbit
:虽然满足两个绑定但只被队列Q2接收一次。quick.brown.fox
:不匹配任何绑定不会被任何队列接收到会被丢弃。quick.orange.male.rabbit
:是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit
:是四个单词当匹配Q2#
,那么这个队列将接收所有数据,就有点像fanout
了。#
和*
出现,那么该队列绑定类型就是direct
了。1. 两个消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受消息.....");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsTopic01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
System.out.println("接收队列:" + queueName + " 接受的键:" + var2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受消息.....");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsTopic02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
System.out.println("接收队列:" + queueName + " 接受的键:" + var2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
2. 生产者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被队列Q1、Q2接收到");
map.put("lazy.orange.elephant","被队列Q1、Q2接收到");
map.put("quick.orange.fox","被队列Q1接收到");
map.put("lazy.brown.fox","被队列Q2接收到");
map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次。");
map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃。");
map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit","是四个单词当匹配Q2");
for(String key : map.keySet()){
String message = map.get(key);
channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));
System.out.println("发送的消息是:" + message);
}
}
}
3. 测试结果
1. 生产者
2. 消费者
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_56727438/article/details/122070671
内容来源于网络,如有侵权,请联系作者删除!