📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《消息队列的 Work-Queues 篇》🍣
🍣这是RabbitMQ的另一种模式🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtil {
/** * 连接工厂获取信道的工具类 * @return * @throws IOException * @throws TimeoutException */
public static Channel getChannel() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置IP地址
factory.setHost("192.168.123.129");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("123");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
return channel;
}
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println(new String(var2.getBody()));
};
CancelCallback cancelCallback = var1->{
System.out.println(var1 + "消费消息被中断了");
};
/** * 接收消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
IDEA的配置
进行启动多个工作线程。import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
/** * 声明队列 * 1.队列名称 * 2.队列里面的消息是否持久化(磁盘)默认情况下消息存储在内存中 * 3.该队列是否提供一个消费者进行消费,就是是否进行消息共享 * 4.就是当最后一个消费者断开连接之后,该队列是否自动删除消息 * 5.其他参数 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
/** * 发送一个消息 * 1.发送到哪个交换机 * 2.路由的Key值,也就是本次队列的名称 * 3.其他参数信息 * 4.发送消息的消息内容 */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息成功:" + message);
}
}
}
1. 生产者发送消息
AA
发送消息成功:AA
BB
发送消息成功:BB
CC
发送消息成功:CC
DD
发送消息成功:DD
2. 测试结果
W1的控制台输出
W2的控制台输出
rabbitmq
引入了消息应答机制。rabbitmq
它已经处理了,rabbitmq
可以把消息删除了。消息发送后立即被认为已经发送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。这种模式仅适合于在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
// 用于肯定确认,RabbitMQ已知道消息并且成功的处理消息,可以将其丢弃了。
Channel.basicAck()
// 用于否定确认
Channel.basicNack()
Channel.basicReject()
// 后者比前者多一个参数,不处理消息了直接拒绝,可以将其对其
Multiple = true
表示可以批量应答,就好像到8
了,可以对5,6,7
都进行应答。Multiple = true
表示不可以批量应答,就好像到8
了,只可以对8
进行应答,不可以对5,6,7
都进行应答。TCP
连接丢失),导致消息未发送ACK
确认,RabbitMQ将了解到消息未完全处理,并将其重新入对。如果此时其他消费者可以处理,它将很快重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。C1
收到消息1之后为发送ack
就断开链接了,其消息1会重新入队,并被C2
给处理。import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task2 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String next = sc.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,next.getBytes("UTF-8"));
System.out.println("成功发送的消息是:" + next);
}
}
}
1. 工作线程1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Work03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
System.out.println("C1正准备接受消息处理时间较短");
DeliverCallback deliverCallback = (var1,var2)->{
SleepUtils.sleep(1);
System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
/** * 1. 消息的标签 * 2. 是否进行批量应答 */
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
System.out.println("消息处理失败进行函数回调");
}));
}
}
2. 工作线程2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Work04 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
System.out.println("C2正准备接受消息处理时间较长");
DeliverCallback deliverCallback = (var1,var2)->{
SleepUtils.sleep(30);
System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
/** * 1. 消息的标签 * 2. 是否进行批量应答 */
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
System.out.println("消息处理失败进行函数回调");
}));
}
}
3、工具类
public class SleepUtils {
public static void sleep(int seconds){
try {
seconds *= 1000;
Thread.sleep(seconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
aa、bb、cc、dd
。2
接受dd
消息的时候宕机了,那么消息不会丢失,而是转发给工作进程1
。boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
重启之后
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
channel.basicQos(1);
channel.basicQos()
中传入的参数就是我们设置的预取值。如果预取值是5
,表示的是队列里面最多可以堆积5
条消息。版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_56727438/article/details/122025327
内容来源于网络,如有侵权,请联系作者删除!