java多线程——线程池源码分析(一)

通常应用多线程技术时,我们并不会直接创建一个线程,因为系统启动一个新线程的成本是比较高的,涉及与操作系统的交互,而是使用线程池来对线程进行管理,尤其是有很多生命周期很短的线程,线程池会显著提升多线程程序的性能。

本文主要对线程池的源码进行分析,了解了源码,我们才能够更高效的使用线程池,同时出现异常时也能更容易的进行排查。

image-20190407105748909

1.阅读本文时,务必开启IDE
2.本文篇幅较大,可根据需要跳转到需要的章节阅读

一、线程池的继承关系及接口方法

image-20190331184421413

Executor:是最基础的执行接口;
1
void execute(Runnable command);

仅提供一个执行线程使用的方法。这个线程会在未来一段时间进行执行,这个任务可能会执行in a new thread,in a pooled thread,or in the calling thread,和具体的实现相关。

ExecutorService:继承了Executor,并提供了shutdown()、submit()等方法,可以说是真正的线程池接口;

与shutdown相关的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 对之前提交的任务进行一次有顺序的关闭,并且不会接受新的任务,如果已经关闭继续执行不会有额外影响。
* 这个方法不会等待之前提交的任务执行完毕。
*/
void shutdown();

/**
* 尝试停止所有正在执行的任务,暂停处理正在等待的任务,返回等待执行的任务集合
* 这个方法不会等待正在执行的任务终止
* 本方法不提供担保,任务的fail response都可能不会终止
*/
List<Runnable> shutdownNow();

/**
* 判断executor是否被关闭
* 如果已经被shutdown,返回true
*/
boolean isShutdown();

/**
* 判断用用shutdown/shutdownNow后是否所有的任务都被结束。
* 如果所有任务都已经被终止,返回true
* 是否为终止状态
*/
boolean isTerminated();

/**
* 在一个shutdown请求后,阻塞等待所有任务执行完毕
* 或者到达超时时间,或者当前线程被中断
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

与submit相关的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 提交一个可执行的(Runnable)任务,返回一个Future代表这个任务执行状态
* 等到任务成功执行,Future#get()方法会返回null
*/
Future<?> submit(Runnable task);

/**
* 提交一个可以执行的任务,返回一个Future代表这个任务执行状态
* 等到任务执行结束,Future#get()方法会返回这个给定的result
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交一个有返回值的任务,并返回一个Future代表等待的任务执行的结果
* 等到任务成功执行,Future#get()方法会返回任务执行的结果
*/
<T> Future<T> submit(Callable<T> task);

与invoke相关的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 执行给定任务,当所有任务完成后返回一个List<Futrue<T>>列表,持有任务执行的结果与状态
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行给定任务,当所有任务完成或超时后返回一个List<Futrue<T>>列表,持有任务执行的结果与状态
* 如果超时会取消其他未执行完成的任务
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行给定任务,有一个任务完成后,无论异常还是正常
* 返回一个Futrue<T>,持有任务执行的结果与状态
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行给定任务,有一个任务完成或超时,无论异常还是正常
* 返回一个Futrue<T>,持有任务执行的结果与状态
* 超时取消正在执行的任务
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService:对ExcutorService中的大多数方法做了基本实现;
TheadPoolExecutor:这个是线程池的具体实现,也是我们代码分析的主要部分;
ScheduledExecutorService:继承了ExecutorService接口,提供与执行周期性任务相关的功能;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 在给定延时后,创建并执行一个一次性的Runnable任务
* 任务执行完毕后,ScheduledFuture#get()方法会返回null
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

/**
* 在给定延时后,创建并执行一个ScheduledFutureTask
* 返回ScheduledFuture 可以获取结果或取消任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, ong delay, TimeUnit unit);

/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期
* 也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay
* + 2 * period 后执行,依此类推
* 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止
* 如果任何执行的任务超过了周期,随后的执行会延时,不会并发执行
* 例如延时为3s,第2s开始执行任务,下一次执行就会是第5s,下下次就是8s
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给
* 定的延迟
* 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止
* 例如延时为3s,任务执行4s,第2s开始执行任务,下一次执行任务就是9s,下下次就是16s
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ScheduledThreadPoolExecutor:是可以执行周期性任务的线程池的具体实现;

二、ThreadPoolExecutor分析

构造函数及参数含义

ThreadPoolExecutor提供四个构造函数,但其他三个都是基于下面这构造函数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

参数含义:

corePoolSize

  • 线程池中的核心线程数量,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize,即使有其他空闲线程能执行新来的任务,也会继续创建新的线程;
  • 如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

maximumPoolSize

  • 线程池允许线程的最大数量,在阻塞队列被填满后,会创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

  • 当workQueue为无界队列时,maxiumPoolSize则不会起作用,因为新的任务会一直放入到workQueue中

workQueue

  • 线程存活时间,当线程没有任务执行时,继续存活的时间,默认情况只对线程数大于corePoolSize是有用。
  • 阻塞队列的选择
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序,指定队列的最大长度,使用有界队列可以防止资源耗尽,但会出现任务过多时的拒绝问题,需要进行协调。
    • LinkedBlockingQueue:一个基于链表结构的有界阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

keepAliveTime

线程空闲时的存活时间,默认情况下,该参数只在线程数大于corePoolSize时才有用。

TimeUnit

  • keepAliveTime的单位

  • TimeUnit静态类提供常量

threadFactory

  • 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory,自定义可以实现ThreadFactory接口

handler

  • 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务
  • 线程池提供如下四种策略
    • AbortPolicy:直接抛出异常,默认策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
    • DiscardPolicy:直接丢弃任务
  • 可以根据具体使用场景,实现RejectedExecutionHandler接口,自定义拒绝策略

线程池的执行流程

  1. 如果当前worker数量小于corePoolSize,则新建一个woker并把当前任务分配给该woker线程

  2. 如果当前worker数量大于corePoolSize,则会将任务加入到workerQueue中

  3. 如果wokerQueue是有界队列,并且已经填满,则会判断当前woker数量小于maximumPoolSize,如果小于,则新建一个woker并把当前任务分配给该woker线程,成功则返回。

  4. 当worker的数量已经等于maximumPoolSize则调用拒绝策略处理该任务。

image-20190402225324214

另外,当线程池中线程大于corePoolSize,并且空闲时间超过keepAliveTime时,将会被移出线程池。

源码分析

线程池的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ct1是一个原子整数型,其中打包了两个概念概念
// 其中高3位是维护线程池的运行状态,低29位是用来位置线程池中线程的数量
// 3位的原因是因为线程有五种状态,向上取2次方数是8也就是3位
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这个是29,做位运算用的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 这个是线程的容量,也就是低29位的最大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 高3位分别是100->000->001->010->011
// 注意高1位是符号位,低29位都是0
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// 当与~Capcity做且运算时获得的是高3位的值,也就是线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 这里可以看下图的情况,当与Capacity做且运算时获得的是低29位的值,也就是运行的线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将线程池状态和运行的线程数量进行打包
private static int ctlOf(int rs, int wc) { return rs | wc; }

image-20190405072232285

这张图我只做了一个16位的情况,前两个是补码和非运算后的数据,下面四个是一次简单的计算过程。

关于原码、反码、补码强烈建议下面这片文章:原码, 反码, 补码 详解

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数出校验,出现异常抛出参数非法异常
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 几个引用类型的非空检验,空了抛出空指针异常
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 在执行finalize()方法时使用,并不影响对线程池的理解
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 将传递的参数复值给实例变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 本方法就是上述线程池执行流程的代码形式
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取控制变量的值c
int c = ctl.get();
// 如果活跃线程数量小于corePoolSize数量,调用addWorker方法,创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
// 如果成功了,直接返回,ture是
if (addWorker(command, true))
return;
// 否则获取控制变量c,进行下一步操作,凡是需要使用ctl进行判断都要重新获取c
c = ctl.get();
}
// 判断线程池是否处于Running状态并且顺利将任务加入到阻塞队列中
// 如果加入顺利仍要进行double-check是否需要加入一个新的线程或任务进入这个方式时线程池已经关闭
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**
* 如果线程池不是运行状态并且将任务移除成功,拒绝任务
* 如果线程池是运行状态或任务移除失败,判断是否有活跃线程,如果没有就创建一个
*/
if (! isRunning(recheck) && remove(command))
// 拒绝命令
reject(command);
//这里是添加了一个任务为null的线程,只要有一个线程就可出执行workQueue中的命令
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果向队列添加失败,尝试扩充线程池,将任务分配给新的线程,只要不大于maximumPoolSize就可以。
else if (!addWorker(command, false))
// 拒绝命令
reject(command);
}

addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层循环用于判断线程池状态
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 这个判断条件比较复杂,看下面的分析
// 概括一下就是当前线程池是否是可添加新线程的状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内存循环对线程数量进行cas+1处理
for (;;) {
int wc = workerCountOf(c);
// 这里是判断一下工作线程数量是不是已经大于等于corePoolSize/maximumPoolSize
// addWorked参数里的true/false就是决定比corePoolSize还是maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 这里添加一个线程,成功就跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 添加失败就重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// ------上面的循环是调整线程池状态,下面是真正的添加线程--------
// work启动标志
boolean workerStarted = false;
// work是否被添加到线程池中的标志
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker将传入的任务赋值给worker的
w = new Worker(firstTask);
final Thread t = w.thread;
// 如果线程创建成功
if (t != null) {
// 获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查是否为可以添加线程的状态,如果是运行状态
// 或者shutdown,但队列中有任务(注意这里是创建线程,不是添加任务)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已运行,抛出异常并执行addWorkerFailed
if (t.isAlive())
throw new IllegalThreadStateException();
// 这里将新建的work加入到set中,workers是维护线程池中所有线程的hashSet
// 并更新相关状态
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 处理完成后释放锁
mainLock.unlock();
}
// 添加成功线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果处理启动失败就会将失败的线程从worker中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

return false的条件分析


rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())如何成立

  1. 线程池至少是SHUTDOWN状态,rs>=SHUTDOWN
  2. 下面三个条件至少有一个是不成立的
    rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()

    • rs == SHUTDOWN 前提条件(rs>=SHUTDOWN) –> false rs>SHUTDOWN
      ​ 这种情况是线程池已经处于STOP、TYDING、TERMINATED状态

    • firstTask == null 前提条件 rs== SHUTDOWN –>false firstTask!=null

      ​ 这种情况是线程池处于SHUTDOWN状态,线程池已经不接收新的任务了

    • ! workQueue.isEmpty() 前提条件 rs==SHUTDOWN && firstTask==null –>false

      ​ 这种情况是线程池处于SHUTDWON状态,并且新来的任务为null,没必要新开线程


本方法的几种形式

1、addWorker(command, true)

2、addWorker(command, false)

3、addWorker(null, false)

4、addWorker(null, true)

在execute方法中就使用了前3种
​ 第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
​ 第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
​ 第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
​ 第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

Worker类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Worker类主要是有两个作用,启动线程、管理线程的中断状态
* Worker类通过继承AQS来实现了一个不可重入的锁,为了确保Worker在执行后在被中断
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

final Thread thread;
Runnable firstTask;
// 之前完成的Task数量
volatile long completedTasks;

// 构造方法,重点是将state设置为-1,避免了在runWorker执行之前线程被interrupt
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 启动线程执行任务使用
public void run() {
runWorker(this);
}
// 是否持有独占锁,如果state=0是未加锁状态,其他为加锁状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取锁,是对AQS中的方法的实现,这里在获取锁时与0对比,所以在runWorker之前是无法获取锁的
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁,对AQS中方法的实现
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 这里就能看出在state=-1的状态是不能interrput,只有调用runWorker方法后会将状态置为1
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

之所以要在Runnable外面包一层Worker是为了通过Worker来控制中断,而Runnable只需要执行业务逻辑就可以了。

这里我也有一疑问,为什么不允许Worker在runWorker前就被中止呢?为什么呢?

runWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 确保线程在stopping时被设置中断标志,否则清除中断标志
// 1.如果线程池处于STOP状态,并且当前线程并不是中断状态,调用wt.interrupt确保线程中断
// 2.如果线程池不是STOP状态,但Thread.interrupted()返回是true
// [表示当前线程是中断状态,并且清除了中断标志] 然后再次判断线程池状态是否是STOP状态
// 如果是就再次调用wt.interrupt确保线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 本类中是空实现,子类有需要可依据情况实现,Tomcat中的线程池就重写了该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务+1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

本方法先是使线程进入执行状态,并且可以进行中断,然后循环执行任务,直到getTask()获取不到任务则进行退出。

getTask方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 两种可能减少工作线程的数量
// 1.线程池处于STOP以上的状态
// 2.线程池处于SHUTDOWN状态,但时阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut默认为false
// 如果allowCoreThreadTimeOut为true,说明后面的执行的任务一定要需要定时
// 活跃线程数量是否已经大于corePoolSize,也需要定时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. 活跃线程>线程池最大线程的情况下
// 或者上一次提交任务超时并且timed为true时
// 2. wc>1或阻塞队列为空,减少线程,返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
// 减少线程失败
continue;
}

try {
// 获取阻塞队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

本方法主要是从阻塞队列中获取任务,在以下状态时会返回null:

  1. 超过了maximumPoolSize设置的线程数量;
  2. 线程池被stop
  3. 线程池被shutdown,并且workQueue空了
  4. 线程等待任务超时

processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1、worker数量-1
* 如果是突然终止,说明是task执行时异常情况导致
* 即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
* 如果不是突然终止,说明是worker线程没有task可执行了
* 不用-1,因为已经在getTask()方法中-1了
*/
if (completedAbruptly)
decrementWorkerCount();
/**
* 2、从Workers Set中移除worker
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
workers.remove(w); //从HashSet<Worker>中移除
} finally {
mainLock.unlock();
}

/**
* 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
* 主要是判断线程池是否满足终止的状态
* 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
* 没有线程了,更新状态为tidying->terminated
*/
tryTerminate();

/**
* 4、是否需要增加worker线程
* 线程池状态是running 或 shutdown
* 如果当前线程是突然终止的,addWorker()
* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
* 故如果调用线程池shutdown(),直到workQueue为空前,
* 线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
*/
int c = ctl.get();
// 如果状态是running、shutdown
// 即tryTerminate()没有成功终止线程池,尝试再添加一个worker
if (runStateLessThan(c, STOP)) {
// 不是突然完成的,即没有task任务可以获取而完成的,
// 计算min,并根据当前worker数量判断是否需要addWorker()
if (!completedAbruptly) {
//allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;


//如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;

//如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
if (workerCountOf(c) >= min)
return; // replacement not needed
}

// 添加一个没有firstTask的worker
// 只要worker是completedAbruptly突然终止的
// 或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
addWorker(null, false);
}
}

三、总结

线程池需要说的东西很多,本文分为两个部分,第一部分是描述线程池的继承关系,第二部分ThreadPoolExecutor源码分析,原理上线程池并没有像HashMap源码那么复杂,重要的是在方法中不断的进行判断线池状态容易让人产生疑惑,本文根据情况选择需要的阅读部分~


参考资料:

  1. 吃透线程池源码
  2. 线程池的工作原理与源码解读
  3. Java线程池ThreadPoolExecutor使用和分析(一)

注:这里强烈推荐参考资料3,非常完成的线程池分析

0%