# 1. 介绍
# 1.1 使用场景
并发编程可以高效利用CPU资源,提升任务执行效率,但是多线程及线程间的切换也伴随着资源的消耗。当遇到单个任务处理时间比较短,但需要处理的任务数量很大时,线程会频繁的创建销毁,大量的时间和资源都会浪费在线程的创建和销毁上,效率很低。
这个时候就需要用的线程池了,线程作为一个工作者,线程执行完一个任务之后不销毁,而是继续执行其他的任务。
# 1.2 好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
# 1.3 一个简单示例
先通过一个简单的示例了解下线程池:
public class Test {
public static void main(String[] args) {
// 1. 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 15; i++) {
// 2. 创建任务
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("执行任务...");
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.execute(task);// 3. 任务交给线程池执行
}
executor.shutdown();// 4. 关闭线程池
}
}
# 2. Executor框架接口
Executor框架提供了一种“任务提交”与“任务如何运行”分离开来的机制,实现对异步任务的控制与执行。我们先大概了解下每个类的基本情况。

# 2.1 Executor接口
Executor接口只有一个execute方法,用于提交任务。
public interface Executor {
void execute(Runnable command);
}
// 启动线程执行任务
new Thread(new Runnable() {
public void run() {
// TODO Auto-generated method stub
}
}).start();
// 使用Executor提交任务
Executor executor = newExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
# 2.2 ExecutorService接口
ExecutorService接口继承自Executor接口,提供了线程池主要功能,提交任务、异步任务执行、关闭线程池等。
public interface ExecutorService extends Executor {
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
List<Runnable> shutdownNow();
// 线程池是否已关闭
boolean isShutdown();
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
boolean isTerminated();
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,第二个参数将会放到 Future中,作为返回值,
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
// 执行所有任务,返回 Future 类型的一个 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
}
# 2.3 AbstractExecutorService类
AbstractExecutorService实现了ExecutorService接口,并在其基础上实现了几个实用的方法提供给子类进行调用。
public abstract class AbstractExecutorService implements ExecutorService {
/**
* newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
* RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* 提交任务
*/
public Future<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
// 将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
// 执行所有的任务,返回任务结果。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {}
}
# 2.4 ThreadPoolExecutor
ThreadPoolExecutor就是线程池了,继承自AbstractExecutorService
# 3. 线程池状态
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用- shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。

线程状态如何保存呢?
ThreadPoolExecutor采用一个 32 位的整数(int变量ctl)来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数。
// 高 3 位用于存放线程池状态,低 29 位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 后29位用于存放线程数 */
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 最大线程数:这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/** 高 3 位表示线程池的状态 */
// 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整数 c 的低 29 位修改为 0,获取线程池的状态
return c & ~CAPACITY;
}
// 将整数 c 的高 3 为修改为 0,获取线程池中的线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
# 4. 线程池参数
线程池ThreadPoolExecutor类有四个构造方法,我们通过这个参数最全的构造方法来看下线程池参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize: 核心线程数量;
maximumPoolSize: 最大线程数量;
workQueue: 等待队列,当线程池中的线程数量大于等于corePoolSize时,把该任务放入等待队列;
keepAliveTime: 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时,核心线程外的线程空闲时间超过keepAliveTime就会销毁;
unit: keepAliveTime的时间单位;
threadFactory: 用于创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。Executors.defaultThreadFactory() 创建的线程优先级都是NORM_PRIORITY;
handler: RejectedExecutionHandler类型的变量,表示线程池的拒绝策略。当阻塞队列满了并且没有空闲的线程时,如果继续提交任务,就需要采取一种策略处理该任务。线程池提供以下拒绝策略:
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
- 实现自己的拒绝策略,实现RejectedExecutionHandler接口重写rejectedExecution方法即可。
线程池任务提交过程:
任务提交的顺序为 corePoolSize –> workQueue –> maximumPoolSize -> handler。
- 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果运行的线程数大于等于 corePoolSize,则将任务放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
- 当workQueue已经满时,如果运行的线程数小于maximumPoolSize,则创建新的线程去处理提交的任务;
- 当workQueue已经满时,如果运行的线程数大于等于maximumPoolSize且没有空闲线程,则通过handler所指定的拒绝策略来处理任务。
线程池中的线程执行完当前任务后,会循环到任务队列中取任务继续执行;线程获取队列中任务时会阻塞,直到获取到任务返回;当线程数大于corePoolSize且线程阻塞时间超时,线程就会被销毁。
# 5. 线程池创建
介绍四种创建线程池的方式:通过 ThreadPoolExecutor 的方式创建线程池及Executors工具类提供的三种创建方式。
# 5.1 ThreadPoolExecutor方式
直接调用 ThreadPoolExecutor 的构造方法,自己手动设置每一个参数,这是阿里推荐的方法。
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 ——《阿里巴巴Java开发手册》
# 5.2 FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize 和 maximumPoolSize都设置为指定nThreads,表示核心线程数等于最大线程数,当达到核心线程数且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略。
- FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。
- keepAliveTime 表示线程数量大于corePoolSize时空闲的线程的存活时间,而FixedThreadPool的corePoolSize 和 maximumPoolSize相等,不可能有多余corePoolSize的线程,所以这里的keepAliveTime本来就无效。
- workQueue使用LinkedBlockingQueue,没有设置范围,默认是最大值(Integer.MAX_VALUE),相当于一个无界队列。当线程池中的线程数量等于corePoolSize 时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,因为workQueue是无界队列,所以maximumPoolSize和参数都无效。
# 5.3 SingleThreadExecutor
newSingleThreadExecutor与FixedThreadPool类似,不过是将线程数设置为1。
corePoolSize 和 maximumPoolSize都指定为1,表示该线程池中最多有一个线程,其他同FixedThreadPool。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
# 5.4 CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,线程池中的所有线程都不是核心线程。
- keepAliveTime为60L,unit设置为TimeUnit.SECONDS,空闲线程超过60秒后将会被终止。
- 阻塞队列采用的SynchronousQueue
- SynchronousQueue 不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
- SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。
理解CachedThreadPool提交任务的过程:
- 第一次提交任务,因为SynchronousQueue需要有读操作与写操作匹配才能写入数据,所以任务不能进入SynchronousQueue队列,而是直接创建一个线程执行任务;
- 之后提交任务,如果线程池里因为空闲线程超时被销毁而没有线程,同样不能进入SynchronousQueue队列,需要创建一个线程执行任务;
- 之后提交任务,如果线程池里的线程都正在执行任务,同样不能进入SynchronousQueue队列,需要创建一个线程执行任务;
- 之后提交任务,如果线程池有线程处于空闲状态(处于空闲状态的线程都会在SynchronousQueue的take()方法上阻塞),那么SynchronousQueue通过offer()方法将任务交给take()执行,不需要创建线程;
CachedThreadPool的问题:如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
# 6. 执行过程
这一节详细分析任务提交到线程池之后,线程池是如何处理和执行任务的。
# 6.1 execute()方法
execute()方法执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务,即使有空闲线程,也要创建一个新线程 ;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= maximumPoolSize,且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
* 当前核心线程数小于corePoolSize,则新建一个线程放入线程池中。
* 注意这里不管核心线程有没有空闲,都会创建线程
*/
if (workerCountOf(c) < corePoolSize) {
// 创建线程,并执行command。addWorker方法后面详细讲解。
if (addWorker(command, true))
return;
c = ctl.get();// 如果添加失败,则重新获取ctl值
}
// 当前核心线程数大于等于corePoolSize,将任务添加到队列
if (isRunning(c) && workQueue.offer(command)) {
/*
* 再次检查线程池的运行状态
* 如果不是运行状态,将command从workQueue中移除,使用拒绝策略处理command
*/
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果有效线程数为0,创建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 当前核心线程数大于等于corePoolSize,且workQueue队列添加任务失败,尝试创建maximumPoolSize中的线程来执行任务
*/
else if (!addWorker(command, false))
reject(command);
}
# 6.2 addWorker方法
addWorkerr(Runnable firstTask, boolean core)方法的主要工作是在线程池中创建一个新的线程并执行:
- 增加线程数量ctl;
- 创建Worker对象来执行任务,每一个Worker对象都会创建一个线程;
- worker添加成功后,启动这个worker中的线程
- 参数firstTask:这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行;
- 参数core:true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限
/**
* 在线程池中创建一个新的线程并执行
* @param firstTask 这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行
* @param core true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限
* @return
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 1. 增加线程数量ctl
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);// 获取运行状态
/*
* 不能创建线程的几种情况:
* 1. 线程池已关闭且rs == SHUTDOWN,不允许提交任务,且中断正在执行的任务
* 2. 线程池已关闭且firstTask!=null,
* 3. 线程池已关闭且workQueue为空
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);// 获取线程数
// 判断线程数上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出外层for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,循环尝试
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/*
* 2. 创建Worker对象来执行任务,每一个Worker对象都会创建一个线程
* Worker类下文详细讲解
*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
/*
* 判断状态:
* 小于 SHUTTDOWN 那就是 RUNNING,最正常的情况
* 等于 SHUTDOWN,不接受新的任务但是会继续执行等待队列中的任务,所以要求firstTask == null
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);// 添加worker
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 3. worker添加成功,启动这个worker中的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
# 6.3 Worker类
线程池中的每一个线程被封装成一个Worker对象,线程池维护的其实就是一组Worker对象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 线程被封装成Worker
final Thread thread;
/*
* 在创建线程的时候,如果同时指定的需要执行的第一个任务。
* 可以为 null,线程自己到任务队列中取任务执行
*/
Runnable firstTask;
// 线程完成的任务数
volatile long completedTasks;
// Worker 只有这一个构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);// 调用 ThreadFactory 来创建一个新的线程
}
/**
* worker工作,调用外部类的 runWorker 方法,循环等待队列中获取任务并执行,下文详细介绍
*/
public void run() {
runWorker(this);
}
// ... 其他几个方法用AQS及锁的操作,不关注
}
# 6.4 runWorker方法
循环从等待队列中获取任务并执行:
- 获取到新任务就执行;
- 获取不到就阻塞等待新任务;
- 队列中没有任务或空闲线程超时,销毁线程。
/**
* 循环从等待队列中获取任务并执行
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/*
* 循环调用 getTask() 获取任务,getTask()下文详细讲解
* 获取到任务就执行,
* 获取不到就阻塞等待新任务,
* 返回null任务就销毁当前线程
*/
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,中断
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(
ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);// 钩子方法,留给需要的子类实现
Throwable thrown = null;
try {
task.run();// 真正执行任务,执行execute()中传入任务的run方法
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);// 钩子方法,留给需要的子类实现
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果到这里,需要销毁线程:
// 1. getTask 返回 null退出while循环,队列中没有任务或空闲线程超时
// 2. 任务执行过程中发生了异常
processWorkerExit(w, completedAbruptly);
}
}
# 6.5 getTask方法
获取workQueue中的任务
- 正常情况,直接workQueue.take()获取到任务返回;
- workQueue中没有任务,当前线程阻塞直到获取到任务;
- getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null:
- 状态为SHUTDOWN && workQueue.isEmpty(),任务队列没有任务,且即将关闭线程池,销毁当前线程
- 状态 >= STOP,关闭线程池,销毁当前线程
- 当前线程数超过最大maximumPoolSize,销毁当前线程
- 空闲线程超时keepAliveTime,需要销毁线程
/**
* 获取workQueue中的任务
* 1. 正常情况,直接workQueue.take()获取到任务返回;
* 2. workQueue中没有任务,当前线程阻塞直达获取到任务;
* 3. getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null:
* 状态为SHUTDOWN && workQueue.isEmpty()
* 状态 >= STOP
* 当前线程数 wc > maximumPoolSize
* 空闲线程超时keepAliveTime
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 两种返回null的情况:
* 1. rs == SHUTDOWN && workQueue.isEmpty()
* 2. rs >= STOP
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();// CAS 操作,减少工作线程数
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 两种返回null的情况:
* 1. 当前线程数 wc > maximumPoolSize,return null
* 2. 空闲线程超时,return null
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/*
* 到 workQueue 中获取任务并返回
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
# 6.6 总结
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= maximumPoolSize,且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
- 线程池中的线程执行完当前任务后,会循环到任务队列中取任务继续执行;线程获取队列中任务时会阻塞,直到获取到任务返回;

# 7. 关闭线程池
关闭线程池使用shutdown方法或shutdownNow方法,最终目的是将线程池状态设置成TERMINATED。
# 7.1 shutdown方法
shutdown方法过程:
- 将线程池切换到SHUTDOWN状态;
- 调用interruptIdleWorkers方法请求中断所有空闲的worker;
- 调用tryTerminate尝试结束线程池。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 安全策略判断
advanceRunState(SHUTDOWN);// CAS设置线程池状态为SHUTDOWN
interruptIdleWorkers();// 中断空闲线程
onShutdown(); // 钩子方法,用于ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试结束线程池,下文详细讲解
tryTerminate();
}
# 7.2 tryTerminate方法
结束线程池,最终将线程池状态设置为TERMINATED。
/**
* 结束线程池,最终将线程池状态设置为TERMINATED
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,已经关闭了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程数量不为0,则中断一个空闲的工作线程,并返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试设置状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();// 钩子方法,留给子类实现
} finally {
// 设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
# 7.3 shutdownNow方法
shutdownNow方法过程:
- 将线程池切换到STOP状态;
- 中断所有工作线程,无论是否空闲;
- 取出阻塞队列中没有被执行的任务并返回;
- 调用tryTerminate尝试结束线程池。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 安全策略判断
advanceRunState(STOP);// CAS设置线程池状态为STOP
interruptWorkers();// 中断所有工作线程,无论是否空闲
tasks = drainQueue();// 取出阻塞队列中没有被执行的任务并返回
} finally {
mainLock.unlock();
}
tryTerminate();// 结束线程池,最终将线程池状态设置为TERMINATED
return tasks;
}
shutdown方法 VS shutdownNow方法
- shutdown方法设置线程池状态为SHUTDOWN,SHUTDOWN状态不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
- shutdownNow方法设置线程池状态为STOP,STOP状态不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
# 8. 其他问题
一些不需要长篇大论介绍的知识点,这里简单说下。
# 8.1 任务拒绝策略
构造线程时传入的 RejectedExecutionHandler 类型参数 handler 就是拒绝策略。
RejectedExecutionHandler只有一个钩子方法,执行拒绝策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
ThreadPoolExecutor 中有四个已经定义好的RejectedExecutionHandler实现类可供我们直接使用。(我们也可以实现自己的策略)
/**
* 由提交任务的线程自己来执行这个任务
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* 默认的策略:接抛出 RejectedExecutionException 异常
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 不做任何处理,直接忽略掉这个任务
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
# 8.2 线程池的监控
我们可以通过线程池提供的参数和方法对线程池进行监控:
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量;
- 实现钩子方法beforeExecute方法,afterExecute方法和terminated方法,增加一些新操作。
# 8.3 线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。如果需要线程池创建之后立即创建线程,可以通过以下两个方法实现:
- prestartCoreThread():初始化一个核心线程;
- prestartAllCoreThreads():初始化所有核心线程。
# 8.4 线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:
- setCorePoolSize:设置核心池大小
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小
← 并发编程三大核心问题 线程池 →