在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable
和Callable
,而执行机制由Executor
框架提供。
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上:
任务: 包括被执行任务需要实现的接口:Runnable
接口或Callable
接口。
任务的执行: 包括任务执行机制的核心接口Executor
,以及继承自Executor
的ExecutorService
接口。ExecutorService
接口有两个关键的实现类:ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。
异步计算的结果: 包括接口Future
和实现Future接口的FutureTask
类。
Executor
是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务。ScheduledThreadPoolExecutor
是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor
比Timer
更灵活,功能更强大。Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。Runnable
接口和Callable
接口的实现类,都可以被ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。Runnable
或者Callable
接口的任务对象。工具类Executors
可以把一个Runnable
对象封装为一个Callable
对象(Executors.callable(Runnable task)
或Executors.callable(Runnable task, Object resule)
)。Runnable
对象直接交给ExecutorService
执行(ExecutorService.execute(Runnable command)
),或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable<T>task)
)。ExecutorService
的submit()
方法,ExecutorService
将返回一个实现Future
接口的对象(FutureTask
对象)。由于FutureTask
实现了Runnable
,程序员也可以创建FutureTask
,然后直接交给ExecutorService
执行。FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。Executor
框架最核心的类是ThreadPoolExecutor
,它是线程池的实现类。
三大方法ThreadPoolExecutor
通常使用工厂类Executors
来创建。Executors
可以创建3种类型的ThreadPoolExecutor
:SingleThreadExecutor
、FixedThreadPool
和CachedThreadPool
。
Executors.newSingleThreadExecutor()
Executors.newFixedThreadPool()
Executors.newCachedThreadPool()
七大参数
int corePoolSize:核心线程数。(要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
)
int maximumPoolSize:最大线程数。(如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务;如果使用了无界的任务队列这个参数是无效的。)
对于CPU密集型任务,可以将该参数设置为 CPU核数 + 1
对于IO密集型任务,可以将该参数设置为 CPU核数 /* 2
long keepAliveTime:超时释放时间。(当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间)
TimeUnit unit:超时释放时间的单位。(枚举类TimeUnit
的常量)
BlockingQueue<Runnable> workQueue :阻塞队列。(用于在执行任务之前保存任务的队列, 这个队列将只保存execute
方法提交的Runnable任务)
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,是一个容量为Integer.MAX_VALUE
的队列(无界队列)。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
ThreadFactory threadFactory:线程工厂。(执行程序创建新线程时使用的工厂)
RejectedExecutionHandler handler:拒绝策略。(当提交任务数超过maxmumPoolSize
+ workQueue
之和时,任务会交给handler来处理)
四种拒绝策略
AbortPolicy:抛出RejectedExecutionException
异常
DiscardPolicy:丢掉任务,不抛异常
DiscardOldestPolicy:不抛异常,尝试去和最早的去竞争,竞争失败再丢掉任务
CallerRunsPolicy:哪来的回哪里(将被拒绝的任务任务返回给execute()
方法的调用线程中运行)
execute()
和submit()
的区别public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
execute()
方法是Executor
接口的唯一方法,submit()
方法是ExecutorService
的,一共有三个重载。execute()
方法只能接收Runnable
接口的实现类作为参数,submit()
方法可以接收Runnable
和Callable
接口的实现类作为参数。execute()
方法没有返回值,submit()
方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。execute()
方法无法处理异常,只会抛出,submit()
方法可以通过返回结果的Future对象的get()
方法对异常进行处理。shutdown()
和shitdownNow()
的区别可以通过调用线程池的shutdown()
或shutdownNow()
方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt()
方法来中断线程,所以无法响应中断的任务可能永远无法终止。
shutdown()
只是将线程池的状态设置为SHUTWDOWN
状态,正在执行的任务会继续执行下去,没有被执行的则中断。而shutdownNow()
则是将线程池的状态设置为STOP
,正在执行的任务则被停止,没被执行任务的则返回。(shutdown()
不着急,shutdownNow()
很着急)
// 返回正在积极执行任务的线程的大致数量
executor.getActiveCount();
// 返回已安排执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值
executor.getTaskCount();
// 返回已完成执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值,但在连续调用中永远不会减少
executor.getCompletedTaskCount();
// 返回曾经同时进入池中的最大线程数
// 通过这个数据可以知道线程池是否曾经满过:如该数值等于线程池的最大大小,则表示线程池曾经满过。
executor.getLargestPoolSize();
// 返回当前池中的线程数
executor.getPoolSize();
可以通过继承线程池来自定义线程池,重写线程池的beforeExecute()
、afterExecute()
和terminated()
方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
FixedThreadPool
被称为可重用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool
的核心线程数corePoolSize
和最大线程数maximumPoolSize
都被设置为创建FixedThreadPool
时指定的参数nThreads。
当线程池中的线程数大于核心线程数corePoolSize
时,keepAliveTime
为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime
设置为0L,意味着多余的空闲线程会被立即终止。
corePoolSize
,则创建新线程来执行任务corePoolSize
),将任务加入LinkedBlockingQueue
LinkedBlockingQueue
获取任务来执行SingleThreadExecutor
是使用单个worker线程的Executor(corePoolSize
和maximumPoolSize
被设置为1)。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize
(即线程池中无运行的线程),则创建一个新线程来执行任务LinkedBlockingQueue
LinkedBlockingQueue
获取任务来执行CachedThreadPool
是一个会根据需要创建新线程的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool
的corePoolSize
被设置为0,即corePool为空;maximumPoolSize
被设置为Integer.MAX_VALUE
,即 maximumPool
是无界的。这里把keepAliveTime
设置为60L,意味着CachedThreadPool
中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
SynchronousQueue
是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool
使用SynchronousQueue
,把主线程提交的任务传递给空闲线程执行:
FixedThreadPool
和SignalThreadExecutor
FixedThreadPool
和SignalThreadExecutor
的阻塞队列都为LinkedBlockingQueue
,LinkedBlockingQueue
的容量为Integer.MAX_VALUE
(无界队列),由于无界队列的存在,会导致maximumPoolSize
和keepAliveTime
是无效的,也就是说新的任务会一直添加到无界队列中去,虽然这个“无界队列”其实是有界的,但是还没等这个队列被填满,就OOM了。同时,由于无界队列不可能被正常填满,因此拒绝策略handler
也是形同虚设。CachedThreadPool
CachedThreadPool
的maximumPoolSize
被设置为Integer.MAX_VALUE
,即 maximumPool
是无界的;同时CachedThreadPool
的阻塞队列为没有容量的SynchronousQueue
。这意味着,如果主线程提交任务的速度高于maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新线程。极端情况下,CachedThreadPool
会因为创建过多线程而耗尽CPU和内存资源(OOM)。ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
。它主要用来在给定的延迟之后运行任务,或者定期执行任务。
它有三种调度任务的方式:
schedule():延迟多长时间之后只执行一次;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
System.out.println(LocalDateTime.now());
scheduled.schedule(new Runnable() {
@Override
public void run() {
System.out.println(LocalDateTime.now());
}
}, 4, TimeUnit.SECONDS);
scheduledAtFixedRate():延迟指定时间后执行一次,之后按照固定的时长周期执行;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
scheduled.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(LocalDateTime.now());
}
}, 0, 4, TimeUnit.SECONDS);
scheduleWithFixedDelay():延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
scheduled.scheduleWithFixedDelay(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000);
System.out.println(LocalDateTime.now());
}
}, 0,4, TimeUnit.SECONDS);
ScheduledThreadPoolExecutor
有四种构造器,用来指定核心线程数、线程工厂、拒绝策略:
因为ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,所以它的构造器都是通过super
调用的父类的构造器:
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
DelayedWorkQueue
是一个无界队列,因此这里把maximumPoolSize
也设为了算是无穷大吧(没什么意义)…
ScheduledThreadPoolExecutor
会把待调度的任务封装为一个ScheduledFutureTask
(ScheduledThreadPoolExecutor
的私有内部类)对象放到优先队列DelayedWorkQueue
(ScheduledThreadPoolExecutor
的静态内部类)中。
ScheduledFutureTask
主要包含3个成员变量:
time
:表示这个任务将要被执行的具体时间sequenceNumber
:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号period
:表示任务执行的间隔周期DelayedWorkQueue
会对队列中的ScheduledFutureTask
进行排序。排序时,time
小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask
的time
相同,就比较sequenceNumber
,sequenceNumber
小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
执行流程
DelayedWorkQueue
的take()
方法获取一个已到期(ScheduledFutureTask
的time
大于等于当前时间)的ScheduledFutureTask
对象ScheduledFutureTask
对象的run()
方法执行ScheduledFutureTask
对象ScheduledFutureTask
对象的setNextRunTime()
方法修改ScheduledFutureTask
的time
变量为下次将要被执行的时间ScheduledFutureTask
对象的reExecutePeriodic()
方法将修改time
之后的ScheduledFutureTask
放回DelayedWorkQueue
中Future
接口主要提供了异步返回任务执行结果,取消任务执行,获取任务执行状态的功能,接口定义如下:
public interface Future<V> {
// 取消任务执行
// mayInterruptIfRunning用于控制如果任务正在执行,是否中断对应的执行线程来取消该任务
// 成功cancel,则isCancelled和isDoned都返回true。
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 正常执行,被取消,异常退出都返回true
boolean isDone();
// 阻塞等待执行结果
// CancellationException:任务被取消
// ExecutionException:任务执行异常
// InterruptedException:该等待结果线程被中断
V get() throws InterruptedException, ExecutionException;
// 阻塞等待执行结果指定时间,除了以上异常,
// TimeoutException:等待超时
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。
FutureTask
除了实现Future
接口外,还实现了Runnable
接口。因此,FutureTask
可以交给Executor
执行,也可以由调用线程直接执行(FutureTask.run()
)。
可以把FutureTask
交给Executor
执行;也可以通过ExecutorService.submit()
方法返回一个FutureTask
,然后执行FutureTask.get()
方法或FutureTask.cancel()
方法。除此以外,还可以单独使用FutureTask
。
根据FutureTask.run()
方法被执行的时机,FutureTask
可以处于下面3种状态:
FutureTask
且没有执行FutureTask.run()
方法之前,这个FutureTask
处于未启动状态。FutureTask.run()
方法被执行的过程中,FutureTask
处于已启动状态。FutureTask.run()
方法执行完后正常结束,或被取消(FutureTask.cancel()
),或执行FutureTask.run()
方法时抛出异常而异常结束,FutureTask
处于已完成状态当FutureTask
处于未启动或已启动状态时,执行FutureTask.get()
方法将导致调用线程阻塞;当FutureTask
处于已完成状态时,执行FutureTask.get()
方法将导致调用线程立即返回结果或抛出异常。
当FutureTask
处于未启动状态时,执行FutureTask.cancel()
方法将导致此任务永远不会被执行;当FutureTask
处于已启动状态时,执行FutureTask.cancel(
true
)
方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask
处于已启动状态时,执行FutureTask.cancel(
false
)
方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask
处于已完成状态时,执行FutureTask.cancel()
方法将返回false。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_43613793/article/details/120583263
内容来源于网络,如有侵权,请联系作者删除!