ThreadPoolExecutor源码解析

  线程池的使用范围实在是太广了,很多组件的底层使用线程池创建、管理线程。之前有一篇关于线程池使用,增强的文章:GYM4-线程池

重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

//保存线程运行状态和线程数量两个重要信息,采用一个数字保存多个信息的目的
//是保证多个信息的并发安全
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//这个队列持有所有还为运行的任务。使用isEmpty判断队列是否为空。
private final BlockingQueue<Runnable> workQueue;

//保存所有的工作线程,只有持有mainLock锁的才能有操作
private final HashSet<Worker> workers = new HashSet<Worker>();


//如果为false,空闲的核心线程仍然可以存活
private volatile boolean allowCoreThreadTimeOut;

//除非allowCoreThreadTimeOut为真,否则这个值代表最小的一直存活的
//工作线程的数量.
private volatile int corePoolSize;

//最大可获得线程数量,只有持有mainLock锁才能操作
private int largestPoolSize;

//空闲线程的超时时间以纳秒计时。线程数量大于corePoolSize时并且
//allowCoreThreadTimeOut为真,线程超时时间才能工作
private volatile long keepAliveTime;

//工作线程集合以及相关记录变量的锁。使用锁而不是并发set是因为,
//可以避免一些不必要的中断。简化了相关的记录变量 例如largestPoolSize。
// shutdown 和shutdownNow方法都需要持有锁,目的是去报分别检查
//中断权限和是否正在中断工作线程仍能平稳运行
private final ReentrantLock mainLock = new ReentrantLock();

//终结使用的condition
private final Condition termination = mainLock.newCondition();

//完成任务的数量,持有mainLock可以修改
private long completedTaskCount;

//所有的线程由threadFactory创建,设置线程name,Deamon等
private volatile ThreadFactory threadFactory;

//线程池饱和时的拒绝策略一共四种 ,可以参看《GYM4-线程池》
private volatile RejectedExecutionHandler handler;

execute操作

  Executor接口定义的方法,执行提交的任务。执行任务的具体线程可能是新创建的也可能是已经存在的。如果线程池关闭或者容量达到上限,RejectedExecutionHandler持有拒绝策略。  
任务提交分为三个步骤:
  1.正在运行的线程数量小于corePoolSize,尝试创建一个新的线程执行当前任务。调用addWorker方法,它会使用原子操作检查runState和workerCount。当无法执行提交任务时,返回false。
  2.如果成功的入队,我们仍然需要检查我们可以让它入队(因为存在最后一个线程结束的可能),或者进入这个方法时线程池关闭。
  3.如果进入任务队列失败,我们尝试创建一个新的线程。如果失败,一定是线程池是关闭或者饱和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

//正在线程数量小于小于核心线程数
if (workerCountOf(c) < corePoolSize) {

//添加新的线程不一定成功,没有使用锁导致可能性比较多
if (addWorker(command, true))

return;

c = ctl.get();

}

//创建新的线程失败,加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}//入队失败或线程池关闭,创建新的线程,创建失败拒绝当前任务

else if (!addWorker(command, false))

reject(command);

}

addWroker操作

  根据当前线程池的装填和给定的边界(核心、最大线程数)检查时候能够创建新的工作线程。如果创建成功调整工作线程的数量,创建一个新的线程运行当前任务。因为线程池关闭或者正在关闭导致的线程创建失败返回false。不管是因为线程工厂返回null还是因为异常(典型是线程启动时的OutOfMemoryError),我们都要回滚。
java中的标签:

  • break lable 跳转到标签所在的循环,并且跳过循环
  • continue lable 跳转到标签所在的循环,并且继续执行循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145


private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);



// 阻塞队列是否为空

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

//一直尝试将工作线程数量加一

for (;;) {

int wc = workerCountOf(c);

//如果工作线程数量大于允许的最大线程数量返回false

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

//尝试工作线程数量加一。成功跳出最外层循环,失败往下走

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

//状态发生改变返回第一次循环

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}



boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

//创建新的工作线程

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

//获取锁并将Worker加入集合

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());



if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

//添加成功启动线程

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

Worker子类

持有运行任务的线程。继承自AbstractQueuedSynchronizer,实现了非重入的锁,保证工作线程的线程安全。参考AbstractQueuedSynchronizer解析(1)

同时Worker实现Runnable接口,也就是说他会作为一个任务被执行,而执行这个任务的线程就是Woker自身包含的Thread字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

//工作线程

final Thread thread;

//初始的任务可能为空

Runnable firstTask;

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

//线程运行的runnable就是Woker自身定义的run方法

this.thread = getThreadFactory().newThread(this);

}

//thread属性将要运行的任务


public void run() {

runWorker(this);

}

runWoker方法

工作线程不断从队列中获取任务并且执行他们,下面是对应的几个问题:
1.一开始执行firstTask,之后从阻塞队列中获取。如果获得的任务为null说明线程池状态或者参数发生改变,当前工作者应该被清理。
2.任务执行前持有锁是为了阻止其他线程中断正在执行的任务,除非线程器停止,线程不会被中断。
3.调用beforeExecute可能产生异常,这种情况下我们会杀死线程而不是继续执行任务。
4.如果任务报错,我们执行afterExecute方法。
5.afterExecute方法异常,线程被杀死。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

//初始任务为空的话,从队列中获取任务

while (task != null || (task = getTask()) != null) {

w.lock();

//线程池停止,确保线程中断,否则相反。recheck的目的

//是处理shutdownNow方法清理中断的竞争问题。

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

//调用beforeExecute、run、afterExecute三个方法

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}