聊一聊SpringBoot中的异步编程

x33g5p2x  于2021-12-06 转载在 Spring  
字(5.1k)|赞(0)|评价(0)|浏览(440)

从WEB服务器的响应模式来看,Web服务器为用户的每一次请求都分配了一个处理线程,可以称之为用户线程,也就是说本次请求的所有内容都交由这个线程来处理。如果用户单次请求提交的并行任务较多且业务复杂,单一线程的处理速度就会变慢。

为了提升服务器的响应速度,可以采取异步的模式,在用户线程中新开一个或多个任务线程,让程序并行的处理用户请求,最后将处理结果交由用户线程整合后返回。

一、启用异步线程

1.1 Callable接口实现异步处理

Callable接口可以开启新线程执行,并将执行的结果返回主线程,那么可以尝试直接使用Callable做任务的并行处理。

@Slf4j
@RestController
@RequestMapping("/message")
public class MessageAction {

    @RequestMapping("/echo")
    public Object echo(String message) {
        log.info("【用户线程】" + Thread.currentThread().getName());
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("【任务线程】" + Thread.currentThread().getName());
                return "[ECHO]" + message;
            }
        };
    }
}

访问:http://localhost:8080/message/echo,发现用户线程和使用Callable接口的任务线程都是来自线程池中。

配置任务线程的线程池。

/** * 异步线程配置类 */
@Configuration
public class AsyncPoolConfig implements WebMvcConfigurer {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(1000); //超时时间
        configurer.registerCallableInterceptors(timeoutInterceptor()); // timeout拦截器
        configurer.setTaskExecutor(getThreadPoolTaskExecutor());
    }

    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20); // 核心线程数
        executor.setMaxPoolSize(100); // 最大线程数
        executor.setQueueCapacity(25); // 工作队列大小
        executor.setKeepAliveSeconds(200); // 线程无任务存活时间
        executor.setThreadNamePrefix("async-pool-task-"); // 线程名前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
        return executor;
    }

    @Bean
    public TimeoutCallableProcessingInterceptor timeoutInterceptor() {
        return new TimeoutCallableProcessingInterceptor();
    }
}

再次访问接口,发现配置生效。

1.2 WebAsyncTask超时处理

WebAsyncTask是一个由Spring提供的异步任务管理类,可以在此类中配置要执行的请求处理异步线程,也可以配置一个与之相关的超时管理线程,当程序超时后,可以启动超时处理线程进行结果的返回。

修改MessageAction接口,配置超时。

@Slf4j
@RestController
@RequestMapping("/message")
public class MessageAction {

    @RequestMapping("/echo")
    public Object echo(String message) {
        log.info("【用户线程】" + Thread.currentThread().getName());
        Callable task = new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("【任务线程】" + Thread.currentThread().getName());
                TimeUnit.SECONDS.sleep(2); // 休眠2s
                return "[ECHO]" + message;
            }
        };
        WebAsyncTask webAsyncTask = new WebAsyncTask(200, task); // 设置task超时时间为200ms
        // 设置超时处理线程
        webAsyncTask.onTimeout(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("【超时线程】" + Thread.currentThread().getName());
                return "timeout";
            }
        });
        return webAsyncTask;
    }
}

再次访问接口,发现超时线程执行。

1.3 DeferredResult

DeferredResult用于实现Runnable接口的异步处理,与Callable接口不同,Runnable接口的run方法是没有返回值的,需要将执行结果保存在DeferredResult对象实例中。DeferredResult中也提供了状态监听方法处理超时。

修改MessageAction接口,使用Runnable接口处理任务。

@Slf4j
@RestController
@RequestMapping("/message")
public class MessageAction {

    private final ThreadPoolTaskExecutor threadPoolExecutor; // 线程池

    @Autowired
    public MessageAction(ThreadPoolTaskExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @RequestMapping("/echo")
    public Object echo(HttpServletRequest request, String message) {
        log.info("【用户线程】" + Thread.currentThread().getName());

        DeferredResult result = new DeferredResult(6000L);

        // 监听超时
        result.onTimeout(() -> {
            log.info("【超时线程】" + Thread.currentThread().getName());
            result.setResult("超时路径: " + request.getRequestURI());
        });

        // 监听完成
        result.onCompletion(() -> {
            log.info("【完成线程】" + Thread.currentThread().getName());
        });

        threadPoolExecutor.execute(() -> {
            log.info("【任务线程】" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(2); // 休眠2s
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result.setResult("[ECHO]" + message); // 设置Runnable处理后的返回值
        });

        return result;
    }
}

访问接口,程序正常执行。

二、SpringBoot异步任务

SpringBoot提供有异步任务的概念,在请求接收到之后,响应返回之前开启一个异步任务,该任务与相应的内容无关,并且可能在相应结束后继续执行。

配置异步任务线程池管理类。

@Configuration
@EnableAsync // 启用异步处理
public class DefaultThreadPoolConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() { // 异步执行者
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心线程数
        executor.setMaxPoolSize(20); // 最大线程数
        executor.setQueueCapacity(100); //任务队列大小
        executor.setThreadNamePrefix("NicholasGUB - "); // 前缀
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { // 异常处理
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

定义异步任务处理类。

@Component
@Slf4j
public class ThreadTask {

    @Async
    public void startTaskHandle() {
        log.info("【异步线程】开启:" + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("【异步线程】结束:" + Thread.currentThread().getName());
    }
}

开启异步任务。

@Slf4j
@RestController
@RequestMapping("/task")
public class TaskAction {

    private final ThreadTask threadTask; // 异步任务

    @Autowired
    public TaskAction(ThreadTask threadTask) {
        this.threadTask = threadTask;
    }

    @RequestMapping("/start")
    public Object echo() {
        log.info("【用户线程】" + Thread.currentThread().getName());
        threadTask.startTaskHandle(); // 异步任务执行
        return "async task start success";
    }
}

浏览器访问:http://localhost:8080/task/start。

相关文章