SpringCloud集成Bus消息总线

x33g5p2x  于2021-10-21 转载在 Spring  
字(13.6k)|赞(0)|评价(0)|浏览(881)

Bus消息总线

Bus消息总线是什么

一言以蔽之,分布式自动刷新配置功能。

是什么

Spring Cloud Bus 配合Spring Cloud Config 使用可以实现配置的动态刷新。

Spring Cloud Bus是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。

Spring Clud Bus目前支持RabbitMQ和Kafka。

能干嘛

Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。

为何被称为总线

什么是总线

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。

基本原理

ConfigClient实例都监听MQ中同一个topic(默认是Spring Cloud Bus)。当一个服务刷新数据的时候,它会把这个信息放入到Topic中,这样其它监听同一Topic的服务就能得到通知,然后去更新自身的配置。

Bus之RabbitMQ环境配置

Linux下安装MQ

MQ的linux安装

windows下安装MQ

  • 安装Erlang,下载地址:http://erlang.org/download/otp_win64_21.3.exe
  • 安装RabbitMQ,下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3.exe
  • 打开cmd进入RabbitMQ安装目录下的sbin目录,如:D:\devSoft\RabbitMQ Scrverk\rabbitmq_server-3.7.14\sbin
  • 输入以下命令启动管理功能
  • rabbitmq-plugins enable rabbitmq _management
  • 这样就可以添加可视化插件。
  • 访问地址查看是否安装成功:http://localhost:15672/
  • 输入账号密码并登录:guest guest

Bus动态刷新全局广播的设计思想和选型

必须先具备良好的RabbitMQ环境先

演示广播效果,增加复杂度,再以3355为模板再制作一个3366

1.新建cloud-config-client-3366

2.POM

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <parent>
  4. <artifactId>cloud_Parent</artifactId>
  5. <groupId>dhy.xpy</groupId>
  6. <version>520.521.finally</version>
  7. </parent>
  8. <modelVersion>4.0.0</modelVersion>
  9. <artifactId>cloud-config-client-3366</artifactId>
  10. <properties>
  11. <maven.compiler.source>8</maven.compiler.source>
  12. <maven.compiler.target>8</maven.compiler.target>
  13. </properties>
  14. <dependencies>
  15. <!--添加消息总线RabbitMQ支持-->
  16. <dependency>
  17. <groupId>org.springframework.cloud</groupId>
  18. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.cloud</groupId>
  22. <artifactId>spring-cloud-starter-config</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.cloud</groupId>
  26. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-actuator</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-devtools</artifactId>
  39. <scope>runtime</scope>
  40. <optional>true</optional>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.projectlombok</groupId>
  44. <artifactId>lombok</artifactId>
  45. <optional>true</optional>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.springframework.boot</groupId>
  49. <artifactId>spring-boot-starter-test</artifactId>
  50. <scope>test</scope>
  51. </dependency>
  52. </dependencies>
  53. </project>

3.YML

这里是bootstrap.yml

  1. server:
  2. port: 3366
  3. spring:
  4. application:
  5. name: config-client
  6. cloud:
  7. #Config客户端配置
  8. config:
  9. label: master #分支名称
  10. name: config #配置文件名称
  11. profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.yml
  12. uri: http://localhost:3344 #配置中心地址
  13. #rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
  14. rabbitmq:
  15. host: localhost
  16. port: 5672
  17. username: admin
  18. password: 123
  19. #服务注册到eureka地址
  20. eureka:
  21. client:
  22. service-url:
  23. defaultZone: http://localhost:7001/eureka
  24. # 暴露监控端点
  25. management:
  26. endpoints:
  27. web:
  28. exposure:
  29. include: "*"

4.主启动

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  4. @EnableEurekaClient
  5. @SpringBootApplication
  6. public class ConfigClientMain3366
  7. {
  8. public static void main(String[] args)
  9. {
  10. SpringApplication.run(ConfigClientMain3366.class,args);
  11. }
  12. }

5.controller

  1. @RestController
  2. @RefreshScope
  3. public class ConfigClientController
  4. {
  5. @Value("${server.port}")
  6. private String serverPort;
  7. @Value("${config.info}")
  8. private String configInfo;
  9. @GetMapping("/configInfo")
  10. public String configInfo()
  11. {
  12. return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;
  13. }
  14. }

设计思想

1.利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置

2.利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,而刷新所有客户端的配置

图二的架构显然更加适合,图—不适合的原因如下:

  • 打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
  • 破坏了微服务各节点的对等性。
  • 有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改。

Bus动态刷新全局广播配置实现

给cloud-config-center-3344配置中心服务端添加消息总线支持

POM

  1. <!--添加消息总线RabbitNQ支持-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-bus-amap</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org-springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-actuator</artifactId>
  9. </dependency>

YML

  1. server:
  2. port: 3344
  3. spring:
  4. application:
  5. name: cloud-config-center #注册进Eureka服务器的微服务名
  6. cloud:
  7. config:
  8. server:
  9. git:
  10. uri: https://gitee.com/DaHuYuXiXi/springcloud-config.git #GitHub上面的git仓库名字
  11. ####搜索目录
  12. search-paths:
  13. - springcloud-config
  14. ####读取分支
  15. label: master
  16. #rabbitmq相关配置
  17. rabbitmq:
  18. host: 192.168.112.128
  19. port: 5672
  20. username: admin
  21. password: 123
  22. #服务注册到eureka地址
  23. eureka:
  24. client:
  25. service-url:
  26. defaultZone: http://localhost:7001/eureka
  27. ##rabbitmq相关配置,暴露bus刷新配置的端点
  28. management:
  29. endpoints: #暴露bus刷新配置的端点
  30. web:
  31. exposure:
  32. include: 'bus-refresh'

给cloud-config-client-3355客户端添加消息总线支持

POM

  1. <!--添加消息总线RabbitNQ支持-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-bus-amap</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org-springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-actuator</artifactId>
  9. </dependency>

YML

  1. server:
  2. port: 3355
  3. spring:
  4. application:
  5. name: config-client
  6. cloud:
  7. #Config客户端配置
  8. config:
  9. label: master #分支名称
  10. name: config #配置文件名称
  11. profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.yml
  12. uri: http://localhost:3344 #配置中心地址k
  13. #rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
  14. rabbitmq:
  15. host: 192.168.112.128
  16. port: 5672
  17. username: admin
  18. password: 123
  19. #服务注册到eureka地址
  20. eureka:
  21. client:
  22. service-url:
  23. defaultZone: http://localhost:7001/eureka
  24. # 暴露监控端点
  25. management:
  26. endpoints:
  27. web:
  28. exposure:
  29. include: "*"

给cloud-config-client-3366客户端添加消息总线支持
POM

  1. <!--添加消息总线RabbitNQ支持-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-bus-amap</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org-springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-actuator</artifactId>
  9. </dependency>

YML

  1. server:
  2. port: 3366
  3. spring:
  4. application:
  5. name: config-client
  6. cloud:
  7. #Config客户端配置
  8. config:
  9. label: master #分支名称
  10. name: config #配置文件名称
  11. profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.yml
  12. uri: http://localhost:3344 #配置中心地址
  13. #rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
  14. rabbitmq:
  15. host: 192.168.112.128
  16. port: 5672
  17. username: admin
  18. password: 123
  19. #服务注册到eureka地址
  20. eureka:
  21. client:
  22. service-url:
  23. defaultZone: http://localhost:7001/eureka
  24. # 暴露监控端点
  25. management:
  26. endpoints:
  27. web:
  28. exposure:
  29. include: "*"

测试

启动

  • EurekaMain7001
  • ConfigcenterMain3344
  • ConfigclientMain3355
  • ConfigclicntMain3366

运维工程师

修改Github上配置文件内容,增加版本号

发送POST请求

  1. curl -X POST "http://localhost:3344/actuator/bus-refresh"

—次发送,处处生效

配置中心

http://localhost:3344/config-dev.yml

客户端

  • http://localhost:3355/configInfo
  • http://localhost:3366/configInfo
  • 获取配置信息,发现都已经刷新了

Bus动态刷新定点通知

不想全部通知,只想定点通知

  • 只通知3355
  • 不通知3366

简单一句话 - 指定具体某一个实例生效而不是全部

  1. 公式:http://localhost:3344/actuator/bus-refresh/{destination}--->微服务名称+端口号
  2. /bus/refresh请求不再发送到具体的服务实例上,而是发给config server通过destination参数类指定需要更新配置的服务或实例

案例

  • 我们这里以刷新运行在3355端口上的config-client(配置文件中设定的应用名称)为例,只通知3355,不通知3366
  1. curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355

通知总结

原理探究

spring cloud bus与spring cloud config的整合,并以RabbitMq作为消息代理,实现了应用配置的动态更新。

首先之前我们是对每一个微服务都单独发送一个post请求,刷新对应微服务的端口最新信息,完成手动刷新,现在我们有两种方案,可以完成广播刷新:

向service A的实例3发送post请求,访问/bus/refresh接口,此时,service A的实例3就会将刷新请求发送到消息总线上,该消息事件会被service A的实例1和实例2从总线中获取到,并重新从config server中获取它们的配置信息,从而实现配置信息的动态更新。

1.在config server中引入 spring cloud bus,将配置服务端也加入到消息总线中来;

2./bus/refresh请求不再发送到具体服务实例上,而是发送给Config Server,并通过destination参数指定需要更新配置的服务或实例。

我们采用了第二种方案,那么第二种方案的原理又是什么呢?

核心流程

Spring Cloud 默认实现了配置中心动态刷新的功能,在公共模块 spring-cloud-context 包中。目前比较流行的配置中心 Spring Cloud Config 动态刷新便是依赖此模块,而Nacos动态刷新机制是在此模块上做了扩展,比Spring Cloud Config功能更强大丰富。

首先 Spring Cloud Config 动态刷新需要依赖 Spring Cloud Bus,而 Nacos 则是在后台修改后直接推送到各服务。

其次,Spring Cloud Config的刷新机制针对所有修改的变量,只有有改动,后台就会获取。

而Nacos则是支持粒度更细的方式,只有 refresh 属性为 true 的配置项,才会在运行的过程中变更为新的值。这时Nacos特有的方式。

相同点:两种配置中心动态刷新的范围都是以下两种:

  • @ConfigurationProperties 注解的配置类
  • @RefreshScope 注解的bean

大致的核心流程如下:

分别看一下这两点的实现原理。

首先 spring cloud config 动态刷新功能通过以下变量来确定是否开启,默认为true。

  1. 显然默认spring cloud config动态刷新功能是默认开启的
  2. @ConditionalOnProperty(value = endpoints.refresh.enabled”, matchIfMissing = true)

RefreshEndpoint 端点暴露方式:

  1. @Configuration(
  2. proxyBeanMethods = false
  3. )
  4. @AutoConfigureAfter({WebMvcAutoConfiguration.class})
  5. public class LifecycleMvcEndpointAutoConfiguration {
  6. public LifecycleMvcEndpointAutoConfiguration() {
  7. }
  8. @Bean
  9. @ConditionalOnMissingBean
  10. public EnvironmentManager environmentManager(ConfigurableEnvironment environment) {
  11. return new EnvironmentManager(environment);
  12. }
  13. }
  14. // Mvc适配器
  15. public class GenericPostableMvcEndpoint extends EndpointMvcAdapter {
  16. //代理类为RefreshEndpoint
  17. public GenericPostableMvcEndpoint(Endpoint<?> delegate) {
  18. super(delegate);
  19. }
  20. //当我们访问refresh端点时,必须发送post请求
  21. @RequestMapping(method = RequestMethod.POST)
  22. @ResponseBody
  23. @Override
  24. public Object invoke() {
  25. //如果当前RefreshEndpoint功能未开启,则向页面输出端点不可用的信息
  26. //http状态为未找到
  27. if (!getDelegate().isEnabled()) {
  28. return new ResponseEntity<>(Collections.singletonMap(
  29. "message", "This endpoint is disabled"), HttpStatus.NOT_FOUND);
  30. }
  31. //否则调用EndpointMvcAdapter的invoke方法
  32. //其实调用的就是下面RefreshEndpoint 的invoke方法
  33. return super.invoke();
  34. }
  35. }

这里的实现方式同 springboot actuator endpoint原理一样,都是通过 EndpointMvcAdapter 适配器来代理实现。

RefreshEndpoint 端点:

一起的版本:

  1. public class RefreshEndpoint extends AbstractEndpoint<Collection<String>> {
  2. private ContextRefresher contextRefresher;
  3. public String[] refresh() {
  4. Set<String> keys = contextRefresher.refresh();
  5. return keys.toArray(new String[keys.size()]);
  6. }
  7. @Override
  8. public Collection<String> invoke() {
  9. return Arrays.asList(refresh());
  10. }
  11. }

现在的版本:

  1. @Endpoint(
  2. id = "refresh"
  3. )
  4. public class RefreshEndpoint {
  5. private ContextRefresher contextRefresher;
  6. public RefreshEndpoint(ContextRefresher contextRefresher) {
  7. this.contextRefresher = contextRefresher;
  8. }
  9. //向当前端点写入最新的信息
  10. @WriteOperation
  11. public Collection<String> refresh() {
  12. //刷新环境,获取最新的信息
  13. Set<String> keys = this.contextRefresher.refresh();
  14. return keys;
  15. }
  16. }

具体的刷新逻辑在 ContextRefresher 中。

配置ContextRefresher 刷新类:

  1. public class ContextRefresher {
  2. //......
  3. private ConfigurableApplicationContext context;
  4. private RefreshScope scope;
  5. public synchronized Set<String> refresh() {
  6. //提取之前的属性配置
  7. Map<String, Object> before = extract(
  8. this.context.getEnvironment().getPropertySources());
  9. //获取最新的属性配置
  10. addConfigFilesToEnvironment();
  11. //获取发生变化的属性
  12. Set<String> keys = changes(before,
  13. extract(this.context.getEnvironment().getPropertySources())).keySet();
  14. //发布EnvironmentChangeEvent事件
  15. this.context.publishEvent(new EnvironmentChangeEvent(keys));
  16. //刷新 RefreshScope Bean
  17. this.scope.refreshAll();
  18. return keys;
  19. }
  20. //......
  21. }

addConfigFilesToEnvironment();上述代码通过该方法重新获取配置:

  1. //获取最新的属性配置
  2. private void addConfigFilesToEnvironment() {
  3. ConfigurableApplicationContext capture = null;
  4. try {
  5. StandardEnvironment environment = copyEnvironment(
  6. this.context.getEnvironment());
  7. //这里重新创建 springboot启动类,重新启动时,通过配置中心会就会重新获取配置了
  8. capture = new SpringApplicationBuilder(Empty.class).bannerMode(Mode.OFF)
  9. .web(false).environment(environment).run();
  10. if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {
  11. environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);
  12. }
  13. MutablePropertySources target = this.context.getEnvironment()
  14. .getPropertySources();
  15. String targetName = null;
  16. }
  17. }

通过SpringApplicationBuilder重新创建启动类,启动时就会重新拉取最新配置,然后发布 EnvironmentChangeEvent事件,通过对应的监听器重新加载带有@ConfigurationProperties 的配置类和作用域为 @RefreshScopebean

@RefreshScope

该注解是 Spring Cloud 对bean 作用域做的扩展类型,这种类型的bean生命周期和单例不同,每一次调用调用/refresh方法都会清除所有该类型的bean。下次使用时,就会重新创建,然后注入最新属性变量。

具体来看一下代码。

  1. public class RefreshScope extends GenericScope
  2. implements ApplicationContextAware, BeanDefinitionRegistryPostProcessor, Ordered {
  3. //.....
  4. @ManagedOperation(description = "Dispose of the current instance of all beans in this scope and force a refresh on next method execution.")
  5. public void refreshAll() {
  6. //调用清除缓存方法
  7. super.destroy();
  8. this.context.publishEvent(new RefreshScopeRefreshedEvent());
  9. }
  10. //......
  11. }
  12. public class GenericScope implements Scope, BeanFactoryPostProcessor, DisposableBean {
  13. @Override
  14. public void destroy() {
  15. List<Throwable> errors = new ArrayList<Throwable>();
  16. //清除缓存
  17. Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
  18. for (BeanLifecycleWrapper wrapper : wrappers) {
  19. try {
  20. //销毁所有 @RefreshScope 类型的bean
  21. wrapper.destroy();
  22. }
  23. catch (RuntimeException e) {
  24. errors.add(e);
  25. }
  26. }
  27. if (!errors.isEmpty()) {
  28. throw wrapIfNecessary(errors.get(0));
  29. }
  30. this.errors.clear();
  31. }
  32. }

被销毁的bean 再下次使用时,会重新创建,这样已满足配置动态刷新的需求了。但是有些时候,在清除这些bean之后,想执行一些自定义的监听逻辑,怎么做呢?

Spring Cloud同样提供了相应的事件:RefreshScopeRefreshedEvent。在refreshAll方法在清除缓存之后,会发布该事件:

  1. this.context.publishEvent(new RefreshScopeRefreshedEvent());

这里是留的扩展,如果有需要可以做一些扩展。目前在源码中看到 Zuul,Nacos都监听了该事件,具体细节有兴趣的可以去研究。

整合bus后的原理分析

老规矩:两种方案,我们选择第二种

方案一:

方案二:

Bus消息总线会提供一个/bus/refresh服务来实现应用的热刷新。不再使用actuator来提供人刷新逻辑。/bus/refresh服务要求请求必须是post请求,可用任意技术实现。

ConfigClient 实例都监听RabbitMQ中同一个topic【默认是SpringCloudBus】。当一个服务刷新数据的时候,它会把这个消息放入Topic中,这样其他监听同一Topic的服务就能够得到通知,然后去更新自身的配置。

相关文章