RocketMQ学习

x33g5p2x  于2021-10-29 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(404)

一.Rocket MQ安装

1.文件下载

jdk-8-212
rocketmq4.7-rocketconsole1.0.1

2.安装

安装配置 jdk8

1. 上传jdk压缩文件

将文件jdk-8u212-linux-x64.tar.gz上传到 /root 目录

2. 解压缩

执行解压命令

  1. # 将jdk解压到 /usr/local/ 目录
  2. tar -xf jdk-8u212-linux-x64.tar.gz -C /usr/local/
  3. # 切换到 /usr/local/ 目录, 显示列表, 查看解压缩的jdk目录
  4. cd /usr/local
  5. ll

3. 配置环境变量

修改 /etc/profile 配置文件, 配置环境变量

  1. vim /etc/profile
  2. # 在文件末尾添加以下内容:
  3. export JAVA_HOME=/usr/local/jdk1.8.0_212
  4. export PATH=$JAVA_HOME/bin:$PATH

修改完后, 让环境变量立即生效

  1. source /etc/profile

4. 验证

  1. java -version
  2. ----------------------------------------------------------------
  3. java version "1.8.0_212"
  4. Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
  5. Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
  6. javac -version
  7. ---------------
  8. javac 1.8.0_212

3.安装 RocketMQ

1. 下载 rocketmq 二进制文件

  1. wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

2. 解压缩 rocketmq

将 rocketmq 解压到 /usr/local/ 目录

  1. unzip rocketmq-all-4.7.0-bin-release.zip -d /usr/local/
  2. # 修改一下文件夹名,改成 rocketmq 方便使用
  3. mv /usr/local/rocketmq-all-4.7.0-bin-release /usr/local/rocketmq

3. 配置环境变量 ROCKETMQ_HOME 和 PATH

为了后续操作方便可以配置环境变量,之后在任意位置都可以执行rocketmq的操作命令。

  1. vim /etc/profile
  2. # 在文件末尾添加以下内容:
  3. export ROCKETMQ_HOME=/usr/local/rocketmq
  4. export PATH=$ROCKETMQ_HOME/bin:$PATH

修改完后, 让环境变量立即生效

  1. source /etc/profile

4. 减小 rocketmq 使用的内存

rocketmq需要启动两个服务: name server 和 broker, name server 默认配置JVM使用的内存是4g, broker默认配置JVM使用的内存是8g.

开发环境中如果内存不足, 服务可能会无法启动, 可以通过降低两个服务的内存, 使服务可以正常启动, 也可以节省内存.

修改 name server 内存改为 256m
  1. cd /usr/local/rocketmq/
  2. # 编辑 bin/runserver.sh
  3. vim bin/runserver.sh
  4. # 找到文件中下面这一行:
  5. JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  6. # 将 -Xms4g -Xmx4g -Xmn2g 修改为 -Xms256m -Xmx256m -Xmn128m
  7. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改 broker 内存改为 256m

  1. # 编辑 bin/runbroker.sh
  2. vim bin/runbroker.sh
  3. # 找到文件中下面这一行:
  4. JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
  5. # 将 -Xms8g -Xmx8g -Xmn4g 修改为 -Xms256m -Xmx256m -Xmn128m
  6. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

5. 启动 rocketmq

先启动 name server

  1. # 进入 rocketmq 目录
  2. cd /usr/local/rocketmq/
  3. # 启动 name server
  4. nohup sh bin/mqnamesrv &
  5. # 查看运行日志, 看到"The Name Server boot success."表示启动成功
  6. tail -f ~/logs/rocketmqlogs/namesrv.log

再启动 broker

  1. # 启动 broker, 连接name server: localhost:9876
  2. nohup sh bin/mqbroker -n localhost:9876 &
  3. # 查看运行日志, 看到"The broker[......:10911] boot success."表示启动成功
  4. tail -f ~/logs/rocketmqlogs/broker.log

6. 关闭防火墙

rocketmq的通信会用到多个端口, 为了方便测试我们关闭防火墙

  1. # 关闭防火墙
  2. systemctl stop firewalld.service
  3. # 禁止防火墙开机启动
  4. systemctl disable firewalld.service

7.测试

运行测试, 启动生产者发送消息, 启动消费者接收消息

  1. # 通过环境变量, 告诉客户端程序name server的地址
  2. export NAMESRV_ADDR=localhost:9876
  3. # 启动生产者来测试发送消息
  4. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  5. # 启动消费者来测试接收消息
  6. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

4.RocketMQ 的关闭命令

关闭 broker

mqshutdown broker

关闭 nameserver

mqshutdown namesrv

管理界面

在开源项目 rocketmq-externals 中提供了rocketmq 的管理界面: 地址为: https://github.com/apache/rocketmq-externals

github 在国内访问缓慢, 也可以使用码云的镜像项目, 地址为: https://gitee.com/mirrors/RocketMQ-Externals

1. 克隆项目
  1. cd /usr/local/rocketmq/
  2. # 克隆 rocketmq-externals 项目
  3. git clone https://gitee.com/mirrors/RocketMQ-Externals
2. maven打包管理界面项目

如果没有安装 maven, 请先执行 maven 安装命令

  1. yum install -y maven

打包管理界面项目 rocketmq-console.
打包过程中会下载各种依赖,比较缓慢,请耐心等待

  1. # 进入管理界面项目的文件夹
  2. cd RocketMQ-Externals/rocketmq-console
  3. # 执行maven 打包命令, 执行时间较长, 请耐心等待
  4. mvn clean package -Dmaven.test.skip=true
3. 运行启动管理界面

打包的 jar 文件在 target 目录, 进入目录执行jar文件,直接运行下面第二条命令,1,2里面已有

  1. # 进入 target 目录
  2. cd target
  3. # 运行管理界面
  4. nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876 &

访问管理界面:

收发消息出现超时问题

如果ip不是自己配置过得,运行下面命令

  1. cd /usr/local/rocketmq/
  2. vim conf/broker.conf
  3. 末尾添加
  4. brokerIP1=192.168.64.141
  5. 关闭 broker 服务
  6. mqshutdown broker
  7. 重新使用 broker.conf 配置启动 broker
  8. nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

二.Rocket MQ基本原理

在Rocketmq集群中新建 Topic1
在管理界面中新建主题Topic1,为了方便观察测试效果,这里把写队列和读队列的数量都设置成3。

这样,在 broker-a 和 broker-b 上都创建了 Topic1 主题,并各创建了3写3读队列,共6写6读,如下图所示:

你也可以修改Topic1分别配置 broker-a 和 borker-b 上的队列数量。

perm 参数的含义
perm 参数是设置队列的读写权限,下面表格列出了可配置的值及其含义:

取值含义
6同时开启读写
4禁写
2禁读

Topic 收发消息原理

生产者将消息发送到 Topic1 的其中一个写队列,消费者从对应的一个读队列接收消息。

生产者的负载均衡

生产者以轮询的方式向所有写队列发送消息,这些队列可能会分布在多个broker实例上。

消费者的负载均衡

一个 group 中的多个消费者,可以以负载均衡的方式来接收消息。

读取队列被均匀分配给这些消费者,它们从指定的队列来接收消息。队列的分配可以采用不同的策略,这里简略介绍以下三种策略:

AllocateMessageQueueAveragely 平均分配

这是默认策略,它是这样分配队列的:

AllocateMessageQueueAveragelyByCircle 环形分配

如果使用环形分配,在消费者的代码中需要设置分配策略,代码如下:

  1. consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());

这种分配策略的逻辑很简单,所有0号队列分给0号消费者,所有1号队列分给1号消费者,以此类推。

AllocateMessageQueueConsistentHash 一致性哈希

如果使用一致性哈希算法进行分配,在消费者的代码中需要设置分配策略,代码如下:

  1. consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueConsistentHash());

这种算法依靠一致性哈希算法,看当前消费者可以落到哪个虚拟节点,该虚拟节点对应哪个队列。

问题

思考一下,如果写队列比读队列多会怎样?反之会怎样?

NameServer 基本原理

NameServer 是 rocketmq 自己开发的一个轻型注册中心,他的作用相当于是 zk、eureka等。

rocketmq 为什么不使用 zk 呢?实际上 rocketmq 的早期版本使用的就是 zookeeper。

而 rocketmq 的架构设计决定了只需要一个轻量级的元数据服务器就足够了。杀鸡焉用牛刀?小区里,搞个货架就行了,建个仓库,又占地方,维护成本又高。

甚至,NameServer 都不需要有一个集群的管理者。以至于,NameServer 看起来都不像一个集群。事实上,NameServer 本质上来看,也不是一个集群。因为它的各个节点是独立的,不相关的。每个 NameServer 都是独立和 Producer、Consumer打交道。

基本认识

  1. NameServer主要用于存储Topic,Broker关系信息,功能简单,稳定性高。
  2. 各个NameServer节点之间不相关,不需要通信,单台宕机不影响其它节点。
  3. NameServer集群整体宕机不影响已建立关系的Concumer,Producer,Broker。

Broker、Producer、Consumer 与NameServer的通信

每个Borker和所有NameServer保持长连接,心跳间隔为30秒。每次心跳时还会携带当前的Topic信息。当某个Broker两分钟之内没有心跳,则认为该Broker下线,并调整内存中与该Broker相关的Topic信息。
1.
Consumer 从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接。间隔30秒发送心跳至Broker。Broker检查若发现某 Consumer 两分钟内无心跳则认为该Consumer下线,并通知该Consumer所有的消费者集群中的其他实例,触发该消费者集群重新负载均衡。
1.
Producer 与消费者一样,也是从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接,30秒发送一次心跳。Broker 也会认为两分钟内没有心跳的 Producer 下线。

相关文章