Rabbitmq 缓存数据库

x33g5p2x  于2021-11-19 转载在 RabbitMQ  
字(10.9k)|赞(0)|评价(0)|浏览(492)

Rabbitmq概念

消息队列、消息服务器、消息中间件Borker
常见消息服务器:Rabbitmq Activemq Rocketmq(阿里的) Kafka Tubemq(腾讯)

搭建Rabbitmq服务器

VMware

  • 16.x
  • NAT网段 192.168.64.0
  • 编辑–虚拟网络编辑器–选择vmnet8–左下角修改成192.168.64.0

虚拟机

centos-8-2105/centos-7-1908

  • 已经做了基本配置
    yun安装源、扩展源使用阿里服务器
    安装了python、pip、ansible
    添加了两个脚本文件,方便配置ip地址(ip-static固定ip/ip-dhcp自动获取ip)
  • 1.解压centos-8-2105
  • 2.双击centos-8-2105.vmx加载镜像
  • 3.启动,按提示选择已复制虚拟机
  • 4.登录用户名密码都是 root

ip设置测试

  1. ./ip-dhcp 自动获取ip
  2. 如果网卡有问题 执行以下命令
  1. # centos 8 开启 VMware 托管
  2. nmcli n on systemctl restart NetworkManager
  3. # centos 7 禁用 NetworkManager 系统服务
  4. systemctl stop NetworkManager systemctl disable NetworkManager
  5. #如果网络还有问题,可以充值vmware虚拟网络
  6. 编辑--虚拟网络篇编辑器--还原默认设置
  7. 还原默认设置会删除所有虚拟网络,重新创建,重新初始化

默认分配的ip

静态设置ip

准备docker环境

1.关闭centos-8-2105

2.克隆centos-8-2105:docker-base

3.mobaxterm连接docker-base

ip:192.168.64.3
注意:先打开虚拟机

4.上传文件到/root

/DevOPs/Docker/docker-install

5. 离线安装docker(参考CSND笔记)

  1. # 进入 docker-install 文件夹
  2. cd docker-install
  3. # 为 docker-install 添加执行权限
  4. chmod +x install.sh
  5. # 安装
  6. ./install.sh -f docker-20.10.6.tgz

6.关机

一般执行shutdown

Docker运行Rabbitmq

1.克隆docker-base :rabbitmq

2.设置ip

  1. ./ip-static
  2. ip:192.168.64.140
  3. ifconfig

3.上传rabbitmq镜像到/root

/docker/rabbit-image.gz

4.导入镜像

docker load -i rabbit-image.gz

5.启动rabbitmq

关闭防火墙

  1. systemctl stop firewalld
  2. systemctl disable firewalld
  3. # 重启 docker 系统服务
  4. systemctl restart docker

配置管理员用户名和密码

  1. mkdir /etc/rabbitmq
  2. vim /etc/rabbitmq/rabbitmq.conf
  3. # 添加两行配置:
  4. default_user = admin
  5. default_pass = admin

启动Rabbitmq

  1. docker run -d --name rabbit \
  2. -p 5672:5672 \
  3. -p 15672:15672 \
  4. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
  5. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
  6. --restart=always \
  7. rabbitmq:management

访问网址http://192.168.64.140:15672

用户名/密码:admin

Rabbitmq使用场景

服务解耦

MQ Message Queue
数据吞吐量很大
数据排队

异步调用

feign是同步调用,效率低
mq异步调用

Rabbitmq六种收发消息方式

准备工作

新建工程

新建module

添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.4.3</version>
  6. </dependency>
  7. </dependencies>

简单模式

只有一个消费者

生产者代码

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /*简单模式*/
  8. public class Producer {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //1.连接
  11. ConnectionFactory f = new ConnectionFactory();
  12. f.setHost("192.168.64.140");
  13. f.setPort(5672);//收发消息5672
  14. f.setUsername("admin");
  15. f.setPassword("admin");
  16. Connection con = f.newConnection();//创建连接
  17. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  18. //2.在服务器上创建 helloworld队列
  19. //队列如果已经存在,不会重复创建
  20. channel.queueDeclare("helloworld",false,false,false,null);
  21. //3.向helloworld发送消息
  22. channel.basicPublish("", "helloworld",null,"Hello World".getBytes());
  23. }
  24. }

channel.queueDeclare()参数说明

  • 参数:
  • 1.队列名
  • 2.是否是持久队列
  • 3.是否是排他队列,可共享(独占队列)
  • 4.是否能被服务器自动删除(没有消费者的时候服务器是否自动删除)
  • 5.队列的其他参数属性(键值对)
    channel.basicPublish参数说明
  • 1.交换机,空串是默认交换机
  • 3.消息的其他参数属性(键值对)

消费者代码

  1. package m1;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*简单模式--消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. f.setPort(5672);//收发消息5672
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();//创建连接
  15. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  16. //2.在服务器上创建 helloworld队列
  17. //队列如果已经存在,不会重复创建
  18. channel.queueDeclare("helloworld",false,false,false,null);
  19. //创建回调对象
  20. //处理消息的回调函数
  21. DeliverCallback deliverCallback1=(consumerTag,message)->{
  22. byte[] a=message.getBody();
  23. String s=new String(a);
  24. System.out.println("收到"+s);
  25. };
  26. DeliverCallback deliverCallback=new DeliverCallback() {
  27. @Override
  28. public void handle(String consumerTag, Delivery message) throws IOException {
  29. byte[] a=message.getBody();
  30. String s=new String(a);
  31. System.out.println("收到"+s);
  32. }
  33. };
  34. //取消消息的回调对象
  35. CancelCallback cancelCallback=new CancelCallback() {
  36. @Override
  37. public void handle(String consumerTag) throws IOException {
  38. }
  39. };
  40. //从helloworld接收消息,把消息传递到回调对象处理
  41. // channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象); channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
  42. }
  43. }

channel.basicConsume参数说明
第二个参数:确认方式 ACK --Acknowledgment

  • -true 自动确认
  • -false 手动确认(手动发送回执) 保证正确处理消息

工作模式

多个消费者

生产者代码

  1. package m1;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*简单模式--消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. f.setPort(5672);//收发消息5672
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();//创建连接
  15. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  16. //2.在服务器上创建 helloworld队列
  17. //队列如果已经存在,不会重复创建
  18. channel.queueDeclare("helloworld",false,false,false,null);
  19. //创建回调对象
  20. //处理消息的回调函数
  21. DeliverCallback deliverCallback1=(consumerTag,message)->{
  22. byte[] a=message.getBody();
  23. String s=new String(a);
  24. System.out.println("收到"+s);
  25. };
  26. DeliverCallback deliverCallback=new DeliverCallback() {
  27. @Override
  28. public void handle(String consumerTag, Delivery message) throws IOException {
  29. byte[] a=message.getBody();
  30. String s=new String(a);
  31. System.out.println("收到"+s);
  32. }
  33. };
  34. //取消消息的回调对象
  35. CancelCallback cancelCallback=new CancelCallback() {
  36. @Override
  37. public void handle(String consumerTag) throws IOException {
  38. }
  39. };
  40. //从helloworld接收消息,把消息传递到回调对象处理
  41. // channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象);
  42. /* * 第二个参数:确认方式 ACK --Acknowledgment * -true 自动确认 * -false 手动确认(手动发送回执) 保证正确处理消息 * */
  43. channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
  44. }
  45. }

消费者代码
业务:模拟耗时消息

  1. package m2;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*工作模式--消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. f.setPort(5672);//收发消息5672
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();//创建连接
  15. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  16. //2.在服务器上创建 helloworld队列
  17. //队列如果已经存在,不会重复创建
  18. channel.queueDeclare("helloworld",false,false,false,null);
  19. //回调对象
  20. DeliverCallback deliverCallback=(consumerTag,message)->{
  21. String s=new String(message.getBody());
  22. System.out.println(s);
  23. //遍历所有字符,遇到.暂停1s
  24. for (int i = 0; i < s.length(); i++) {
  25. if(s.charAt(i)=='.'){
  26. try {
  27. Thread.sleep(1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. System.out.println("消息处理完成");
  34. };
  35. CancelCallback cancelCallback=(consumerTag)->{};
  36. //接收信息
  37. channel.basicConsume("helloworld",true,deliverCallback,cancelCallback);
  38. }
  39. }

如何解决合理分发??

修改Consumer代码

  1. package m2;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*工作模式--消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. f.setPort(5672);//收发消息5672
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();//创建连接
  15. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  16. //2.在服务器上创建 helloworld队列
  17. //队列如果已经存在,不会重复创建
  18. channel.queueDeclare("helloworld",false,false,false,null);
  19. //回调对象
  20. DeliverCallback deliverCallback=(consumerTag,message)->{
  21. String s=new String(message.getBody());
  22. System.out.println(s);
  23. //遍历所有字符,遇到.暂停1s
  24. for (int i = 0; i < s.length(); i++) {
  25. if(s.charAt(i)=='.'){
  26. try {
  27. Thread.sleep(1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. // channel.basicAck(回执,是否同时确认收到过的所有消息);
  34. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  35. System.out.println("消息处理完成");
  36. };
  37. CancelCallback cancelCallback=(consumerTag)->{};
  38. //每次只收一条,处理完之前不收下一条
  39. channel.basicQos(1);
  40. //接收信息
  41. //第二个参数为false手动确认
  42. channel.basicConsume("helloworld",false,deliverCallback,cancelCallback);
  43. }
  44. }

重启消费者模块测试

如何实现持久化??

  1. 队列持久化

修改生产者代码

  1. package m2;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. /*工作模式--生产者*/
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1.连接
  12. ConnectionFactory f = new ConnectionFactory();
  13. f.setHost("192.168.64.140");
  14. f.setPort(5672);//收发消息5672
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Connection con = f.newConnection();//创建连接
  18. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  19. //2.在服务器上创建 helloworld队列
  20. //队列如果已经存在,不会重复创建
  21. channel.queueDeclare("task_queue",true,false,false,null);
  22. //循环输入消息发送
  23. while (true){
  24. System.out.println("输入消息:");
  25. String s=new Scanner(System.in).nextLine();
  26. channel.basicPublish("","task_queue",null,s.getBytes());
  27. }
  28. }
  29. }

修改消费者代码

  1. package m2;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*工作模式--消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. f.setPort(5672);//收发消息5672
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();//创建连接
  15. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  16. //2.在服务器上创建 helloworld队列
  17. //队列如果已经存在,不会重复创建
  18. channel.queueDeclare("task_queue",true,false,false,null);
  19. //回调对象
  20. DeliverCallback deliverCallback=(consumerTag,message)->{
  21. String s=new String(message.getBody());
  22. System.out.println(s);
  23. //遍历所有字符,遇到.暂停1s
  24. for (int i = 0; i < s.length(); i++) {
  25. if(s.charAt(i)=='.'){
  26. try {
  27. Thread.sleep(1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. // channel.basicAck(回执,是否同时确认收到过的所有消息);
  34. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  35. System.out.println("消息处理完成");
  36. };
  37. CancelCallback cancelCallback=(consumerTag)->{};
  38. //每次只收一条,处理完之前不收下一条
  39. channel.basicQos(1);
  40. //接收信息
  41. //第二个参数为false手动确认
  42. channel.basicConsume("task_queue",false,deliverCallback,cancelCallback);
  43. }
  44. }
  1. 消息持久化
    ```生产者代码修改`

  1. package m2;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.Scanner;
  8. import java.util.concurrent.TimeoutException;
  9. /*工作模式--生产者*/
  10. public class Producer {
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //1.连接
  13. ConnectionFactory f = new ConnectionFactory();
  14. f.setHost("192.168.64.140");
  15. f.setPort(5672);//收发消息5672
  16. f.setUsername("admin");
  17. f.setPassword("admin");
  18. Connection con = f.newConnection();//创建连接
  19. Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
  20. //2.在服务器上创建 helloworld队列
  21. //队列如果已经存在,不会重复创建
  22. channel.queueDeclare("task_queue",true,false,false,null);
  23. //循环输入消息发送
  24. while (true){
  25. System.out.println("输入消息:");
  26. String s=new Scanner(System.in).nextLine();
  27. channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_BASIC,s.getBytes());
  28. }
  29. }
  30. }

重启rabbit测试

相关文章