常见MQ(消息中间件):
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)
Cloud Stream是什么?屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
什么是Spring Cloud Stream?
官方定义Spring Cloud Stream
是一个构建消息驱动微服务的框架。
应用程序通过inputs
或者 outputs
来与Spring Cloud Stream
中binder
对象交互。
通过我们配置来binding
(绑定),而Spring Cloud Stream
的binder
对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream
交互就可以方便使用消息驱动的方式。
通过使用Spring Integration
来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream
为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、 Kafka
。
标准MQ
Message Channel
MessageChannel
的子接口SubscribableChannel
,由MessageHandler
消息处理器所订阅。为什么用Cloud Stream?
比方说我们用到了RabbitMQ
和Kafka
,由于这两个消息中间件的架构上的不同,像RabbitMQ
有exchange
,kafka
有Topic
和Partitions
分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream
给我们提供了—种解耦合的方式。
Stream凭什么可以统一底层差异?
在没有绑定器这个概念的情况下,我们的SpringBoot
应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离
。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现
通过定义绑定器Binder
作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder:
INPUT
对应于消费者OUTPUT
对应于生产者Stream中的消息通信方式遵循了发布-订阅模式
Topic
主题进行广播
RabbitMQ
就是Exchange
Kakfa
中就是Topic
Spring Cloud Stream标准流程套路
Binder
- 很方便的连接中间件,屏蔽差异。Channel
- 通道,是队列Queue
的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel
对队列进行配置Source
和Sink
- 简单的可理解为参照对象是Spring Cloud Stream
自身,从Stream
发布消息就是输出,接受消息就是输入。编码API和常用注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
准备RabbitMQ环境
工程中新建三个子模块
新建Module:cloud-stream-rabbitmq-provider8801
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud_Parent</artifactId>
<groupId>dhy.xpy</groupId>
<version>520.521.finally</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.112.128
port: 5672
username: admin
password: 123
bindings: # 服务的整合处理
#生产者
output: # 这个名字是一个通道的名称
#在MQ中相当于声明一个交换机
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
主启动
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
业务类
1.发送消息接口
public interface IMessageProvider {
public String send();
}
2.发送消息接口实现类
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
3.Controller
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
测试
启动 7001eureka
启动 RabpitMq
rabbitmq-plugins enable rabbitmq_management:开启图形化web管理界面
启动 8801
访问 - http://localhost:8801/sendMessage
后台将打印serial: UUID字符串
新建Module:cloud-stream-rabbitmq-consumer8802
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud_Parent</artifactId>
<groupId>dhy.xpy</groupId>
<version>520.521.finally</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.112.128
port: 5672
username: admin
password: 123
bindings: # 服务的整合处理
#消费者
input: # 这个名字是一个通道的名称
#通过指定交换机完成消息的消费
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
主启动类StreamMQMain8802
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
业务类
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t 当前微服务的port: "+serverPort);
}
}
测试
启动EurekaMain7001
启动StreamMQMain8801
启动StreamMQMain8802
8801发送8802接收消息
依照8802,克隆出来一份运行8803 - cloud-stream-rabbitmq-consumer8803。
启动
RabbitMQ
服务注册 - 7001
消息生产 - 8801
消息消费 - 8802
消息消费 - 8802
此时studyexchange交换机会把消息路由到两个与其绑定的队列上
运行后有两个问题
消费
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。
原理
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8802/8803都变成不同组,group两个不同
group: A_Group、B_Group
8802修改YML
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.112.128
port: 5672
username: admin
password: 123
bindings: # 服务的整合处理
#消费者
input: # 这个名字是一个通道的名称
#通过指定交换机完成消息的消费
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: A_Group #8802变成A组
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
8803修改YML(与8802的类似位置 group: B_Group)
结论:还是重复消费
如果属于不同的组,那么当我们使用MQ作为中间件时,会创建出两个不同的队列,并且路由key是/#,可以匹配所有key,因此此时的交换机就相当于扇出交换机,即广播消息给所有的队列
8802/8803都变成相同组,group两个相同
group: A_Group
8802修改YML group: A_Group
8803修改YML group: A_Group
结论:同一个组的多个微服务实例,每次只会有一个拿到,因为此时他们处于竞争关系
8802/8803实现了轮询分组,每次只有一个消费者,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
结论:同一个组的多个微服务实例,每次只会有一个拿到
通过上述,解决了重复消费问题,再看看持久化。
去除掉8802
的分组group: A_Group
,8803
的分组group: A_Group
没有去掉。
启动7001,8801,8802,8803
交换机有了,是持久化的
创建了两个队列
每个队列对应一个消费者
交换机和队列的绑定关系
先关闭8002,8003
8002的临时队列,一断开连接后,就被立马删除了
8801
先发送4条消息到RabbitMq
。
此时8803的持久化队列里面已经积压了四条待消费的消息
先启动8802
,无分组属性配置,后台没有打出来消息。
再启动8803
,有分组属性配置,后台打出来了MQ
上的消息。(消息持久化体现)
8803消费掉了积压在队列的四条消息,而8802则不会收到任何消息
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/m0_53157173/article/details/120875547
内容来源于网络,如有侵权,请联系作者删除!