ThreadPoolExecutor 是 Java 的线程池实现,其中封装了对线程的管理(包括创建、调度、销毁等)及任务的分配等逻辑,作为核心并发组件,其中使用到了如阻塞队列、ReentrantLock、原子变量等并发包下的基础工具,由于前面看完了 ReentrantLock 等相关的源码,所以尝试窥探下 ThreadPoolExecutor 的工作过程。
ctl 变量的计算
ThreadPoolExecutor 使用一个原子性的整型变量 ctl 同时存储了线程池的运行状态(runState)和工作线程数(workerCount)。为了将两者压缩到一个 int 里面,需要限制最大的工作线程数为 (2^29)-1。
1
2
3
4
5
6
7
8
9
|
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
|
所谓的工作线程指的是 “被允许启动,但还没被允许停止的线程”。工作线程数不同于“存活线程数”,当线程工厂被要求创建一个线程但是创建失败时,或者在退出一个线程而它还没实际销毁时,用户可见的线程池大小是不会变化的。下面我们分析上述几个值的计算过程是怎么样的:
- COUNT_BITS:Integer.SIZE 是 32,所以 COUNT_BITS 为 29,其二进制表示为 00000000000000000000000000011101。
- CAPACITY:通过 1 左移 29 位减 1 得到。1 左移 29 位即从第 1 位移到第 30 位,得到 001 00000000000000000000000000000,然后减 1,得到 000 11111111111111111111111111111,这个数值将作为工作线程数,即上述提到的 (2^29)-1。假设
- RUNNING:通过 -1 左移 29 位得到。-1 的二进制表示为 11111111111111111111111111111111,低位补 0 得到 111 00000000000000000000000000000,即通过左移抹掉低 29 位,截取高 3 位记录运行状态,所以下面我们只关注高 3 位。
- SHUTDOWN:通过 0 左移 29 位得到。0 不管怎么移位都是 0,所以高 3 位是 000。
- STOP:通过 1 左移 29 位得到。1 左移 29 位被移到了第 30 位,得到高 3 位为 001,其实到这里可以理解为将低 3 位搬到高 3 位。
- TIDYING:通过 2 左移 29 位得到。低 3 位的 2 的二进制表示 010 被搬到高 3 位。
- TERMINATED:通过 3 左移 29 位得到。低 3 位的 3 的二进制表示 011 被搬到高 3 位。
通过下面这个方法得到整型数值的高位补 0 对齐后的二进制表示,我们不妨输出上述这些数值的二进制表示来一一验证上面的分析是否正确。
1
2
3
4
5
6
7
8
9
10
|
private String toBinaryString(int i) {
String s = Integer.toBinaryString(i);
StringBuilder builder = new StringBuilder();
int len = s.length();
for (int ii = 0; ii < 32 - len; ii++) {
builder.append("0");
}
builder.append(s);
return builder.toString();
}
|
以下是输出结果,与我们的分析符合。
COUNT_BITS: 000 00000000000000000000000011101
CAPACITY: 000 11111111111111111111111111111
RUNNING: 111 00000000000000000000000000000
SHUTDOWN: 000 00000000000000000000000000000
STOP: 001 00000000000000000000000000000
TIDYING: 010 00000000000000000000000000000
TERMINATED: 011 00000000000000000000000000000
我觉得这样设计的好处在于,通过一个 int 控制存储两个变量的意义在于,当两个变量需要同时变化时,不需要额外的加锁操作,因为操作系统保证了 int 类型数据操作的原子性,但是 long 和 double 在 32 位的操作系统下操作不是原子性的,可能存在半读半写问题。
线程池的生命周期涉及下面几个阶段:
- 运行中(RUNNING):接收新任务,同时处理等待队列中的任务。
- 关闭(SHUTDOWN):不接收新任务,但会处理等待队列中剩下的任务。
- 停止(STOP):不接收新任务,也不处理等待队列中剩下的任务,同时还会中断正在执行的任务。
- 清扫中(TIDYING):当所有任务都停止了,工作线程数也归零了,此时会进入 TIDYING 状态,触发 terminated() 钩子函数。
- 已终止(TERMINATED):当 terminated() 方法执行完,线程池进入“已终止”状态。
上述线程池的状态的转换过程如下图所示:
构造函数
ThreadPoolExecutor 有 5 个带参构造函数,其中 4 个最终都是调用下面这个构造函数创建 ThreadPoolExecutor 对象。比较简单不详述。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 非法参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 赋值
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
|
很多开发规范都不建议使用 Executors 工具类的方式创建快熟线程池,因为它们创建出来的线程池可能在执行大量任务时占满内存导致 OOM。一版都是使用手动创建的方式,自己掌控线程池的各个参数,如:
1
2
|
private static ThreadPoolExecutor executor = new ThreadPoolExecutor
(16, 32, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
|
所以理解这几个参数的意义与使用场景比较重要,我们先看涉及的参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* Tracks largest attained pool size. Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
|
以下是结合文档对它们的理解:
- allowCoreThreadTimeOut:默认是 false,如果没有任务提交到线程池,那么核心线程池将会处于空闲状态,这是浪费资源的,为此可以通过这个参数控制核心线程是否超时销毁,如果将这个参数设置为 true,那么核心线程将会在空闲等待 keepAliveTime 后销毁。
- corePoolSize:最少的存活线程数。如果设置 allowCoreThreadTimeOut 为 true,那么一定时间后这个值会变为 0。当阻塞队列未满时,线程池中的线程数将保持在这个数值。
- maximumPoolSize:最大的工作线程数,但实际上这个值不能超过 CAPACITY,即 (2^29)-1。当阻塞队列满时,线程池将在核心线程数的基础上创建新线程来处理任务,直到最大的线程数。
- largestPoolSize:记录线程池中最多有过多少个工作线程。
- completedTaskCount:线程池总共处理了多少个任务,只在工作线程被终止时更新。
- workQueue:用于存储积压任务的阻塞队列,工作线程会从中取出任务进行来执行,注意这里的泛型是 Runnable,对于 Callable 类型的任务,ThreadPoolExecutor 的父类 AbstractExecutorService 会将之转换为 Runnable 的子类 FutureTask,所以这里泛型定义成 Runnable 并无问题。
- keepAliveTime:单位是纳秒,
- threadFactory:ThreadFactory 接口实例,用于创建工作线程。
- handler:所谓的拒绝策略,即当线程池不能接收任务时(队列已满或线程池被 shutdown)的处理器。
- mainLock:ReentrantLock 类型,用于线程池中线程操作的同步控制。
- workers:包装了 Worker 的 HashSet,而 Worker 则是 ThreadPoolExecutor 的一个内部类,它对线程与一些状态信息进行了封装,所以 workers 是存储了所有工作线程的集合。
Worker 类
Worker 类是对工作线程的封装,其中包含了对线程的管理细节。
- Worker 类主要为工作线程维护中断控制状态。它是一个 AbstractQueuedSynchronizer 实现,在工作线程执行每个任务前后都需要加锁或解锁,以防止那些试图唤醒一个等待任务的线程的中断信号去打断一个正在执行任务的线程。
- 重新实现 AbstractQueuedSynchronizer 而不是使用 ReentrantLock 的原因在于 ReentrantLock 是可重入锁,而作者不希望工作任务在调用线程池控制类方法如 setCorePoolSize 时能够多次换取到锁。
- 为了抑制中断信号直到工作线程实际启动,将锁的状态初始化为 -1,期间调用 interruptIfStarted 将不会响应中断。
- Worker 还实现了 Runnable,我觉得这是线程池的设计精髓所在,在 Worker 的构造函数中创建的 Thread 的任务是 this,也就是 Worker 对象,然后 Worker 对象再处理和拉取实际的任务,从而实现了线程的复用,避免因为多任务而去创建多线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each task execution.
* This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run.
*
* We implement a simple non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize.
*
* Additionally, to suppress interrupts until the thread actually starts running tasks,
* we initialize lock state to a negative value, and clear it upon start (in runWorker).
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*
* 同步状态 state 设置为 -1,为了在初始化期间不接受中断信号,
* 直到 runWorker 方法执行才接受中断信号
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// 将调用外部类 ThreadPoolExecutor 的 runWorker 方法
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 调用此方法进行中断,但在初始化时,state 为 -1,不响应中断信号
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) { }
}
}
}
|
runWorker
runWorker 是工作线程执行任务的主要方法,在其中工作线程会重复地从阻塞队列中获取任务然后执行它们,其详细过程如下所示:
- 首先将 AbstractQueuedSynchronizer 中的同步状态从 -1 改成 0 以能够响应中断信号。
- 轮询阻塞队列,只要拿到的任务不是 null
- 加锁。
- 如果线程池正在停止,保证当前工作线程被中断;否则保证当前工作线程不被中断。
- 接下来执行任务,并在此前后注册 beforeExecute 和 afterExecute 两个实现点,它们都是 ThreadPoolExecutor 的 protected 修饰的空方法,所以如果想自己定制化 ThreadPoolExecutor,可以覆盖这两个方法实现类似增强。
- 最后任务执行完将之置空,completedTasks 加 1,然后释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
|
对于 runWorker 方法,作者通过很长的一段注释备注了一些需要注意的地方:
- 可以从一个初始任务开始不停地获取任务,但这个初始任务不一定是阻塞队列的第一个节点,我们通过 getTask 方法获取任务,如果 getTask 返回 null,将会根据线程池的状态是否改变或者配置参数是否改变来决定要不要退出线程(逻辑在 processWorkerExit 方法)。
- 在执行任何任务前,需要先获取当前 Worker 的锁,以避免响应执行过程中线程池的其他中断(我的理解是
Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
这段,即拿到锁后,工作线程只响应 STOP 状态下的中断信号,线程池进入 STOP 前不会设置任何中断标记。
- 每个任务执行前都会执行前置处理方法 beforeExecute,这个方法可能会抛出异常,那样的话将导致 completedAbruptly 标记为 true,从而促使线程马上死亡而不会执行到任务。
- 假设 beforeExecute 方法正常执行完毕,将执行任务,期间会收集所有的异常,然后送进 afterExecute 方法进行处理。异常收集处分别处理 RuntimeException、ERROR 和 Throwable,因为不能再往外抛出 Throwable,所以将之封装成 Error 再往外抛。这里捕获到任何异常同样会导致线程保守性死亡(因为后面的 try 代码块后都没有 catch 住异常,所以 completedAbruptly 标记为 true)。
- 任务执行完毕后,会调用 afterExecute 后置处理方法,这个方法同样也会抛出异常,那样也会导致线程死亡,因为根据 JLS Sec 14.20(大概是 Java Language Spercification 第 14.20 节里面提到的异常处理语法),最终这个还会往外抛,跳过了
completedAbruptly = false;
这一句。
getTask
再看 getTask 方法。进入该方法的线程会阻塞或超时等待任务,这取决于当前配置的设定值,或者如果出现以下的情况它会返回 null,
- 当前线程池中存在超过 maximumPoolSize 个工作线程(可能由于调用了 setMaximumPoolSize 方法)。
- 线程池被 stop。
- 线程池被 shutdown,阻塞队列为空。
- 当前工作线程超时等待获取一个任务,然后这个超时的线程需要被终结。
通过代码看,除了设置 allowCoreThreadTimeOut 参数外,当前线程池的线程数超过核心线程数时,将触发超时等待机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
|
主要的过程在一个死循环中,
- 如果线程池处于 SHUTDOWN 以上的状态(即 STOP、TYDING、TERMINATED)并且阻塞队列为空,则将工作线程数减 1,然后返回 null,外部的 runWorker 接收到 null 后会在 processWorkerExit 中将该线程销毁。
- 如果当前工作线程数大于 maximumPoolSize,或者如果处于超时等待机制,且上一次等待超时,则判断工作线程数是否大于 1 或者阻塞队列是否为空了,是则尝试将工作线程数减 1,将成功就返回 null 否则进入下一轮迭代。
- 如果当前线程池的状态没有进入终止状态,或者工作队列大概率有任务可领取,则尝试去阻塞队列获取任务,获取的方式有两种:
- 如果处于超时等待机制,则以 keepAliveTime 作为超时等待时间获取任务,这种情况下如果过了 keepAliveTime 还是没任务可领取,则会返回 null,然后当前工作线程将被回收,这样就可以理解 keepAliveTime 的含义了,它是线程最大空闲时间(等待也是空闲的一种),为了避免线程资源浪费,超过这个时间线程就会被回收;
- 否则,一直等待,直到领取到任务才返回。
简单而言,上面过程可总结为:
- 如果从阻塞队列中获取到了任务,则直接返回该任务。
- 如果阻塞队列中无任务,那么阻塞等待直到新的任务到来。
- 如果判断到当前 worker 需被回收,那么返回 null。
processWorkerExit
该方法用于清理濒死线程,除非 completedAbruptly 设置为 true,否则都认为 workerCount 已经减掉了当前濒死线程,不用再减。这个方法会将工作线程从线程池的工作线程集合中移除,它可能会停掉整个线程池,或者用一个新的线程替换掉旧的线程,这个“替换”实际上是先销毁旧线程再新增一个线程,触发替换的前提有:
- 如果工作线程是由于执行用户任务遭遇异常而退出
- 线程池中只有少于 corePoolSize 个线程在运行。
- 阻塞队列非空,但是没有工作线程了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
|
上述方法的主要过程:
- 如果 completedAbruptly 为 true,说明是因为执行任务异常而濒死的,那么 workerCount 还没来得及减 1,需要先减 1。
- 加锁,做所谓的 bookkeeping 动作,即将濒死线程从工作线程集合中移除,并清算 completedTaskCount,然后解锁。
- 调用 tryTerminate,检查当前线程池的状态是不是 SHUTDOWN 或 STOP,是则可能使线程池吃进入 TERMINATED 状态,如果线程池被终止,此处后面的代码不会执行。
- 如果运行状态为 RUUNNING,
- 如果不是是因为执行任务异常而濒死的,则可能是等待超时需要被回收,先计算当前所需的最小的线程数 min,如果当前工作线程数大于等于 min,说明当前工作线程数满足作业需求,不用再新建线程。
- 调用 addWorker 创建新工作线程。
tryTerminate
如果满足下面条件之一,线程池将进入 TERMINATED 状态:
- SHUTDOWN 状态下线程数为 0,且阻塞任务队列为空;
- STOP 状态下,线程池线程数为 0。
如果处于 SHUTDOWN 状态但阻塞任务队列不为空或者处于 STOP 状态下,线程池线程数不为 0,这时是具备进入 TERMINATED 状态的潜力的,这时的处理方式时,打断一个空闲的在等待阻塞队列派任务的工作线程,以保证 shutdown 的信号能够传播。因为打断它后,它将进入 processWorkerExit 方法,然后进入 tryTerminate,然后它又会去中断一个在等待的线程,这个场景十分有趣:就像一个即将被炒鱿鱼的工人拍了拍另一个在睡觉的工人说,兄弟别等了,没活干了,咱们得走了,我先走哈,等下你走之前记得叫醒那个兄弟让他也去办离职手续吧。
这个方法只能被以下操作调用:
- 减少工作线程数
- shutdown 期间从阻塞队列中删除任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果满足以下任一条件,都不应该进入 TERMINATED 状态
// 1. 线程池为 RUUNNING 状态
// 2. 线程池为 TIDYING、或 TERMINATED,已经终止过了
// 3. SHUTDOWN 且 workQueue 不为空,不接受新任务,但是历史任务还得处理
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 上面没 return,说明可以 TERMINATED
// 只有 SHUTDOWN 状态且 workQueue 不为空,或者 STOP 状态能执行到这一步
// 如果 workerCount 不为 0,说明此时线程池还有线程(正在运行任务,正在等待任务)
// 则中断唤醒一个正在等任务的空闲 worker
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 能够走到这里没 return,说明当前线程池已经没有工作线程了,也没有任务了
// 可以进入 TIDYING 状态了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程池进入 TIDYING 状态,此时工作线程数变为 0,阻塞队列也是空的
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 空实现,等子类覆盖
} finally {
// 调用完 terminated() 方法后最后进入 TERMINATED 状态
ctl.set(ctlOf(TERMINATED, 0));
// 外部有线程等待线程池终止后进行操作的话
// 这时通知它们去干活
termination.signalAll();
}
// 退出循环,释放锁并返回调用
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
// CAS 失败则不会退出循环,继续重试
}
}
|
执行任务
当执行任务时会执行以下方法,但它不是立即执行的,且该任务可能由一个已有的线程执行或新建一个线程去执行。如果任务不能被提交执行,要么是因为线程池已经被 shutdown,要么是因为当前允许的任务容量已经达到最大,该任务将由拒绝策略处理器处理。该方法的执行分为三步:
- 如果当前线程池中正在运行的线程数小于 corePoolSize,尝试新启动一个线程,并将当前提交到线程池的任务作为这个新线程执行的第一个任务。addWorker 方法会检查 runState 和 workerCount,如果不能添加新线程会返回 false。
- 如果当前线程池处于运行状态且工作线程数大于 corePoolSize,则将任务放入阻塞队列,如果该任务可以被成功放入阻塞队列,那么会重新检查线程池的运行状态,
- 如果线程池处于非运行态,那么将该任务从阻塞队列中删除,并交由拒绝策略处理器处理。
- 如果线程池处于运行态或者删除任务失败,且当前线程池没有工作线程,那么就新建一个非核心线程。
- 如果处于运行态但是往阻塞队列已满导致添加任务失败,那么就尝试新加一个非核心线程,并将当前提交到线程池的任务作为这个新线程执行的第一个任务。如果新建工作线程失败,则触发拒绝策略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
|
addWorker
基于当前线程池的运行状态以及最大最小线程数限制,检查是否能添加一个新的工作线程,如果可以,则当前的工作线程数会加 1。另外,如果没什么问题的话,线程会被创建然后启动,并将 firstTask 作为第一个任务运行。
如果该方法返回 false,要么是因为线程池已经 STOP,或即将被 shutdown,要么就是因为当前线程工厂创建线程失败,而线程创建失败,可能是由于线程工厂返回了 null,也可能是 Thread.start 方法中抛出 OutOfMemoryError。对于线程创建失败的情况,会调用 addWorkerFailed 对线程池进行回滚。
该方法用到了 retry,continue retry 相当于从内层循环 break,break retry 相当于直接跳出两层循环。接下来看这个方法的逻辑:
- 两个死循环,
- 外部死循环检查线程池的运行状态,如果处于 STOP、TYDING 或 TERMINATED 且 firstTask 为空但等待队列不为空的情况下,直接返回 false,表示新建工作线程失败,因为在这三种状态下,线程池不会接受新任务,甚至不会将阻塞队列中任务消费完,而是将重心放在如何销毁线程上,所以不会反其道而行之。
- 然后如果满足以下任一条件,则会根据入参 core 决定是否最大的线程数限制,如果创建的是核心线程,那么最大线程数限制是 corePoolSize,如果创建的不是核心线程,那么最大的线程数限制是 maximumPoolSize,如果超过最大线程数限制,则返回 false,表示创建 Worker 失败;如果没有超过最大线程数限制,则通过 CAS 操作将 ctl 加 1,表示工作线程数加 1,然后跳出双层循环。如果 CAS 失败,说明 ctl 发生了变化,不是预期值,那么继续校验是不是运行状态发生了变化,是的话跳出内层循环,让下一次迭代时在外层循环再次通过校验线程池的状态来决定是否能够创建线程。
- 线程池处于运行状态
- 线程池处于 SHUTDOWN 状态(线程池虽然不接收新任务,但是阻塞队列里面的任务还要消费,这时允许创建新工作线程)
- 线程池处于 STOP、TYDING 或 TERMINATED 状态,但是 firstTask 不为空(线程池虽然不接收新任务,也不处理阻塞队列里面的任务,但是允许创建新工作线程处理已提交但未进入阻塞队列的新任务)
- 线程池处于 STOP、TYDING 或 TERMINATED 状态,firstTask 为空,阻塞队列也为空(没看懂)
- 经过了双层循环,说明能够创建新的工作线程,创建新线程涉及的过程有:
- 先加锁,因为需要将新创建的线程加进 workers 里面,而 workers 是 HashSet 类型,它是线程不安全的,需要同步控制,另外 largestPoolSize 的清算也需要放在临界区内。
- 通过 firstTask 构建 Worker,其中会将 Worker 作为 Runnable 创建线程,并指定其 firstTask。
- 接下来 if 说明,主要有两种情况可以创建线程,一种是线程池处于运行状态,另一种是处于 SHUTDOWN 状态,但是 firstTask 也为空,说明阻塞队列可能还没处理完,需要新建线程来处理。如果满足这两种情况则会将工作线程添加到工作线程集合中,并清算 largestPoolSize。
- 解锁
- 如果工作线程成功添加到工作线程集合中,则启动新创建的工作线程,注意,这个工作线程启动的时候调用的是 Worker 类的 run 方法,也就是线程池的 runWorker 方法。其中会先执行 firstTask,如果 firstTask 为空或者执行完了,则会去获取阻塞队列里面的任务来执行,看到这里估计能明白为什么 Worker 类实现 Runnable 了,实际上就是为了复用线程以执行更多不同的任务。
- 如果线程创建或启动失败,那么就调用 addWorkerFailed 进行回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
|
addWorkerFailed
如果线程创建或启动失败,那么就调用 addWorkerFailed 进行回滚,以下过程需要加锁
- 如果线程启动异常,因为已经将线程添加到了工作线程集合,所以要将之移除。
- 工作线程数减 1,即 ctl 减 1。
- 有可能线程池处于不接收新任务并即将清理工作线程的阶段,这时调用 tryTerminate 尝试让线程池进入 TERMINATED 状态。
1
2
3
4
5
6
7
8
9
10
11
12
|
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
|