Hystrix线程池技术实现资源隔离

x33g5p2x  于2021-12-21 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(389)

电商网站的商品详情页系统架构遇到的场景问题

1 线程池隔离

这里有100个线程

然后不考虑线程隔离,线程重试资源就会耗尽

通过线程隔离,个干个的

出错也会只是一块微服务出错

2.信号量隔离

一个线程去重试40次给一个服务

小型电商网站的商品详情页系统架构

线程池技术实现资源隔离:分三种情况

小型电商网站的页面展示采用页面全量静态化的思想。数据库中存放了所有的商品信息,页面静态化系统,将数据填充进静态模板中,形成静态化页面,推入 Nginx 服务器。用户浏览网站页面时,取用一个已经静态化好的 html 页面,直接返回回去,不涉及任何的业务逻辑处理。

下面是页面模板的简单 Demo 。

<html>
    <body>
        商品名称:#{productName}<br>
        商品价格:#{productPrice}<br>
        商品描述:#{productDesc}
    </body>
</html>

这样做,好处在于,用户每次浏览一个页面,不需要进行任何的跟数据库的交互逻辑,也不需要执行任何的代码,直接返回一个 html 页面就可以了,速度和性能非常高。

对于小网站,页面很少,很实用,非常简单,Java 中可以使用 velocity、freemarker、thymeleaf 等等,然后做个 cms 页面内容管理系统,模板变更的时候,点击按钮或者系统自动化重新进行全量渲染。

坏处在于,仅仅适用于一些小型的网站,比如页面的规模在几十到几万不等。对于一些大型的电商网站,亿级数量的页面,你说你每次页面模板修改了,都需要将这么多页面全量静态化,靠谱吗?每次渲染花个好几天时间,那你整个网站就废掉了。

大型电商网站的商品详情页系统架构

大型电商网站商品详情页的系统设计中,当商品数据发生变更时,会将变更消息压入 MQ 消息队列中。缓存服务从消息队列中消费这条消息时,感知到有数据发生变更,便通过调用数据服务接口,获取变更后的数据,然后将整合好的数据推送至 redis 中。Nginx 本地缓存的数据是有一定的时间期限的,比如说 10 分钟,当数据过期之后,它就会从 redis 获取到最新的缓存数据,并且缓存到自己本地。

用户浏览网页时,动态将 Nginx 本地数据渲染到本地 html 模板并返回给用户。

虽然没有直接返回 html 页面那么快,但是因为数据在本地缓存,所以也很快,其实耗费的也就是动态渲染一个 html 页面的性能。如果 html 模板发生了变更,不需要将所有的页面重新静态化,也不需要发送请求,没有网络请求的开销,直接将数据渲染进最新的 html 页面模板后响应即可。

在这种架构下,我们需要保证系统的高可用性

如果系统访问量很高,Nginx 本地缓存过期失效了,redis 中的缓存也被 LRU 算法给清理掉了,那么会有较高的访问量,从缓存服务调用商品服务。但如果此时商品服务的接口发生故障,调用出现了延时,缓存服务全部的线程都被这个调用商品服务接口给耗尽了,每个线程去调用商品服务接口的时候,都会卡住很长时间,后面大量的请求过来都会卡在那儿,此时缓存服务没有足够的线程去调用其它一些服务的接口,从而导致整个大量的商品详情页无法正常显示。

这其实就是一个商品接口服务故障导致缓存服务资源耗尽的现象。

如果从 Nginx 开始,缓存都失效了,Nginx 会直接通过缓存服务调用商品服务获取最新商品数据(我们基于电商项目做个讨论),有可能出现调用延时而把缓存服务资源耗尽的情况。这里,我们就来说说,怎么通过 Hystrix 线程池技术实现资源隔离。

资源隔离,就是说,你如果要把对某一个依赖服务的所有调用请求,全部隔离在同一份资源池内,不会去用其它资源了,这就叫资源隔离。哪怕对这个依赖服务,比如说商品服务,现在同时发起的调用量已经到了 1000,但是线程池内就 10 个线程,最多就只会用这 10 个线程去执行,不会说,对商品服务的请求,因为接口调用延时,将 tomcat 内部所有的线程资源全部耗尽。

Hystrix 进行资源隔离,其实是提供了一个抽象,叫做 command。这也是 Hystrix 最最基本的资源隔离技术。

利用 HystrixCommand 获取单条数据

我们通过将调用商品服务的操作封装在 HystrixCommand 中,限定一个 key,比如下面的 GetProductInfoCommandGroup,在这里我们可以简单认为这是一个线程池,每次调用商品服务,就只会用该线程池中的资源,不会再去用其它线程资源了。

public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {

    private Long productId;

    public GetProductInfoCommand(Long productId) {
        super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoCommandGroup"));
        this.productId = productId;
    }

    @Override
    protected ProductInfo run() {
        String url = "http://localhost:8081/getProductInfo?productId=" + productId;
        // 调用商品服务接口
        String response = HttpClientUtils.sendGetRequest(url);
        return JSONObject.parseObject(response, ProductInfo.class);
    }
}

我们在缓存服务接口中,根据 productId 创建 command 并执行,获取到商品数据。

@RequestMapping("/getProductInfo")
@ResponseBody
public String getProductInfo(Long productId) {
    HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);
    
    // 通过command执行,获取最新商品数据
    ProductInfo productInfo = getProductInfoCommand.execute();
    System.out.println(productInfo);
    return "success";
}

上面执行的是 execute() 方法,其实是同步的。也可以对 command 调用 queue() 方法,它仅仅是将 command 放入线程池的一个等待队列,就立即返回,拿到一个 Future 对象,后面可以继续做其它一些事情,然后过一段时间对 Future 调用 get() 方法获取数据。这是异步的。

利用 HystrixObservableCommand 批量获取数据

只要是获取商品数据,全部都绑定到同一个线程池里面去,我们通过 HystrixObservableCommand 的一个线程去执行,而在这个线程里面,批量把多个 productId 的 productInfo 拉回来。

public class GetProductInfosCommand extends HystrixObservableCommand<ProductInfo> {

    private String[] productIds;

    public GetProductInfosCommand(String[] productIds) {
        // 还是绑定在同一个线程池
        super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
        this.productIds = productIds;
    }

    @Override
    protected Observable<ProductInfo> construct() {
        return Observable.unsafeCreate((Observable.OnSubscribe<ProductInfo>) subscriber -> {

            for (String productId : productIds) {
                // 批量获取商品数据
                String url = "http://localhost:8081/getProductInfo?productId=" + productId;
                String response = HttpClientUtils.sendGetRequest(url);
                ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class);
                subscriber.onNext(productInfo);
            }
            subscriber.onCompleted();

        }).subscribeOn(Schedulers.io());
    }
}

在缓存服务接口中,根据传来的 id 列表,比如是以 , 分隔的 id 串,通过上面的 HystrixObservableCommand,执行 Hystrix 的一些 API 方法,获取到所有商品数据。

public String getProductInfos(String productIds) {
    String[] productIdArray = productIds.split(",");
    HystrixObservableCommand<ProductInfo> getProductInfosCommand = new GetProductInfosCommand(productIdArray);
    Observable<ProductInfo> observable = getProductInfosCommand.observe();

    observable.subscribe(new Observer<ProductInfo>() {
        @Override
        public void onCompleted() {
            System.out.println("获取完了所有的商品数据");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        /**
         * 获取完一条数据,就回调一次这个方法
         * @param productInfo
         */
        @Override
        public void onNext(ProductInfo productInfo) {
            System.out.println(productInfo);
        }
    });
    return "success";
}

我们回过头来,看看 Hystrix 线程池技术是如何实现资源隔离的。

从 Nginx 开始,缓存都失效了,那么 Nginx 通过缓存服务去调用商品服务。缓存服务默认的线程大小是 10 个,最多就只有 10 个线程去调用商品服务的接口。即使商品服务接口故障了,最多就只有 10 个线程会 hang 死在调用商品服务接口的路上,缓存服务的 tomcat 内其它的线程还是可以用来调用其它的服务,干其它的事情。
Hystrix 里面核心的一项功能,其实就是所谓的资源隔离,要解决的最最核心的问题,就是将多个依赖服务的调用分别隔离到各自的资源池内。避免说对某一个依赖服务的调用,因为依赖服务的接口调用的延迟或者失败,导致服务所有的线程资源全部耗费在这个服务的接口调用上。一旦说某个服务的线程资源全部耗尽的话,就可能导致服务崩溃,甚至说这种故障会不断蔓延。

Hystrix 实现资源隔离,主要有两种技术:

  • 线程池
  • 信号量

默认情况下,Hystrix 使用线程池模式。

前面已经说过线程池技术了,这一小节就来说说信号量机制实现资源隔离,以及这两种技术的区别与具体应用场景。

信号量机制

信号量的资源隔离只是起到一个开关的作用,比如,服务 A 的信号量大小为 10,那么就是说它同时只允许有 10 个 tomcat 线程来访问服务 A,其它的请求都会被拒绝,从而达到资源隔离和限流保护的作用。

线程池与信号量区别

线程池隔离技术,并不是说去控制类似 tomcat 这种 web 容器的线程。更加严格的意义上来说,Hystrix 的线程池隔离技术,控制的是 tomcat 线程的执行。Hystrix 线程池满后,会确保说,tomcat 的线程不会因为依赖服务的接口调用延迟或故障而被 hang 住,tomcat 其它的线程不会卡死,可以快速返回,然后支撑其它的事情。

线程池隔离技术,是用 Hystrix 自己的线程去执行调用;而信号量隔离技术,是直接让 tomcat 线程去调用依赖服务。信号量隔离,只是一道关卡,信号量有多少,就允许多少个 tomcat 线程通过它,然后去执行。

适用场景

  • 线程池技术,适合绝大多数场景,比如说我们对依赖服务的网络请求的调用和访问、需要对调用的 timeout 进行控制(捕捉 timeout 超时异常)。
  • 信号量技术,适合说你的访问不是对外部依赖的访问,而是对内部的一些比较复杂的业务逻辑的访问,并且系统内部的代码,其实不涉及任何的网络请求,那么只要做信号量的普通限流就可以了,因为不需要去捕获 timeout 类似的问题。

信号量简单 Demo

业务背景里,比较适合信号量的是什么场景呢?

比如说,我们一般来说,缓存服务,可能会将一些量特别少、访问又特别频繁的数据,放在自己的纯内存中。

举个栗子。一般我们在获取到商品数据之后,都要去获取商品是属于哪个地理位置、省、市、卖家等,可能在自己的纯内存中,比如就一个 Map 去获取。对于这种直接访问本地内存的逻辑,比较适合用信号量做一下简单的隔离。

优点在于,不用自己管理线程池啦,不用 care timeout 超时啦,也不需要进行线程的上下文切换啦。信号量做隔离的话,性能相对来说会高一些。

假如这是本地缓存,我们可以通过 cityId,拿到 cityName。

public class LocationCache {
    private static Map<Long, String> cityMap = new HashMap<>();

    static {
        cityMap.put(1L, "北京");
    }

    /**
     * 通过cityId 获取 cityName
     *
     * @param cityId 城市id
     * @return 城市名
     */
    public static String getCityName(Long cityId) {
        return cityMap.get(cityId);
    }
}

写一个 GetCityNameCommand,策略设置为信号量。run() 方法中获取本地缓存。我们目的就是对获取本地缓存的代码进行资源隔离。

public class GetCityNameCommand extends HystrixCommand<String> {

    private Long cityId;

    public GetCityNameCommand(Long cityId) {
        // 设置信号量隔离策略
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetCityNameGroup"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));

        this.cityId = cityId;
    }

    @Override
    protected String run() {
        // 需要进行信号量隔离的代码
        return LocationCache.getCityName(cityId);
    }
}

在接口层,通过创建 GetCityNameCommand,传入 cityId,执行 execute() 方法,那么获取本地 cityName 缓存的代码将会进行信号量的资源隔离。

@RequestMapping("/getProductInfo")
@ResponseBody
public String getProductInfo(Long productId) {
    HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);

    // 通过command执行,获取最新商品数据
    ProductInfo productInfo = getProductInfoCommand.execute();

    Long cityId = productInfo.getCityId();

    GetCityNameCommand getCityNameCommand = new GetCityNameCommand(cityId);
    // 获取本地内存(cityName)的代码会被信号量进行资源隔离
    String cityName = getCityNameCommand.execute();

    productInfo.setCityName(cityName);

    System.out.println(productInfo);
    return "success";
}

相关文章