深入理解线程与线程池

线程

什么是线程

线程,程序执行流的最小执行单位,是行程中的实际运作单位

线程的生命周期

新建 New、就绪 Runnable、运行 Running、阻塞 Blocked、死亡 Dead

image

新建(new)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadTest {
public static void main(String[] args) {
//新建一个线程(实现Runnable),只是一个普通的java对象
MyTheard myTheard = new MyTheard();
//线程进入就绪状态
thread.start();
}

static class MyTheard implements Runnable {
@Override
public void run() {
System.out.println("我的线程");
}
}
}

就绪(Runnable)

当线程对象调用了Thread.start()方法之后,该线程处于就绪状态,JVM会为其创建方法调用栈和程序计数器,处于这个状态的线程并没有开始运行,它只是表示该线程可以运行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Thread implements Runnable {
...
private ThreadGroup group;
public synchronized void start() {
/**
* 新建的线程threadStatus为0(new),当start之后,JVM会去修改threadStatus的值
* 此处防止多次start
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/**
* 通知组该线程即将启动,以便可以将其添加到组的线程列表中,并且可以减少组的未启动计数
*/
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
/**
* 底层通知JVM该线程已经就绪
*/
private native void start0();
}

运行(Running)

如果处于就绪状态的线程获得了CPU资源,就开始执行run()的线程执行体,则该线程处于运行状态

1
2
3
4
5
6
7
8
9
10
class Thread implements Runnable {
...
private Runnable target;
@Override
public void run() {
if (target != null) {
target.run();
}
}
}

阻塞(Blocked)

阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况大概三种:

  • 等待阻塞

运行的线程执行wait()方法,JVM会把该线程放入等待池中。(wait会释放持有的锁)

  • 同步阻塞

运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中

  • 其他阻塞

运行的线程执行sleep()join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。(注意,sleep()是不会释放持有的锁)

具体阻塞情况有如下情况:

线程睡眠Thread.sleep(long millis)方法,使线程转到阻塞状态。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。sleep()平台移植性好。

线程等待:Object类中的wait()方法,导致当前的线程等待,直到其他线程调用此对象的notify()方法或 notifyAll()唤醒方法。这个两个唤醒方法也是Object类中的方法,行为等价于调用wait(0)一样。唤醒线程后,就转为就绪(Runnable)状态。

线程让步Thread.yield()方法,暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。

线程加入join()方法,等待其他线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。

线程I/O:线程执行某些IO操作,因为等待相关的资源而进入了阻塞状态。比如说监听system.in,但是尚且没有收到键盘的输入,则进入阻塞状态。

线程唤醒:Object类中的notify()方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意性的,并在对实现做出决定时发生。类似的方法还有一个notifyAll(),唤醒在此对象监视器上等待的所有线程

死亡(Dead)

线程会以以下三种方式之一结束,结束后就处于死亡状态:

  • run()方法执行完成,线程正常结束
  • 线程抛出一个未捕获的ExceptionError
  • 直接调用该线程的stop()方法来结束该线程,该方法容易导致死锁,通常不推荐使用

创建线程的三种方式(推荐实现Runnable接口的方式)

  • 继承于java.lang.Thread类,覆盖run方法,调用start()方法启动线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1):定义一个类A继承于java.lang.Thread类.
class MusicThread extends Thread{
//2):在A类中覆盖Thread类中的run方法.
public void run() {
//3):在run方法中编写需要执行的操作
for(int i = 0; i < 50; i ++){
System.out.println("播放音乐"+i);
}
}
}
public class ExtendsThreadDemo {
public static void main(String[] args) {
for(int j = 0; j < 50; j ++){
System.out.println("运行游戏"+j);
if(j == 10){
//4):在main方法(线程)中,创建线程对象,并启动线程.
MusicThread music = new MusicThread();
music.start();
}
}
}
}
  • 实现Runnable接口,覆盖run方法,传入到线程类中,调用start()方法启动线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//1):定义一个类A实现于java.lang.Runnable接口,注意A类不是线程类.
class MusicImplements implements Runnable{
//2):在A类中覆盖Runnable接口中的run方法.
public void run() {
//3):在run方法中编写需要执行的操作
for(int i = 0; i < 50; i ++){
System.out.println("播放音乐"+i);
}
}
}
public class ImplementsRunnableDemo {
public static void main(String[] args) {
for(int j = 0; j < 50; j ++){
System.out.println("运行游戏"+j);
if(j == 10){
//4):在main方法(线程)中,创建线程对象,并启动线程
MusicImplements mi = new MusicImplements();
Thread t = new Thread(mi);
t.start();
}
}
}
}
  • 直接在函数体使用(匿名内部类,其实也是属于第二种实现方式的特例)
1
2
3
4
5
6
7
void java_thread(){
Thread t = new Thread(new Runnable(){
public void run(){
//xx
}});
t.start();
}

线程的并行与并发

  • 并发的关键是你有处理多个任务的能力,不一定要同时

  • 并行的关键是你有同时处理多个任务的能力

Java虚拟机的多线程是通过线程轮流切换并分配处理器执行时间的方式来实现,一个CPU可以多线程,但是一个单核CPU任何时间点,都只能在做一个任务。而程序时间大多花在读取数据(IO)上,真正的计算工作花时间还是相对少的,因此CPU很大时间表现都很闲。现实中的CPU在大部分时候的闲置状态的。因此,开多线程能提高效率不如说成是充分利用了CPU执行时间。多核CPU才是真正意义的并行处理

线程池

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念,线程池可以用来管理线程,减少资源的消耗

java并发编程框架:Executor

Executor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制

ThreadPoolExecutor构造函数的各个参数说明

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) //后两个参数为可选参数
  • corePoolSize

    核心线程数,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的

  • maximumPoolSize

    最大线程数,可允许创建的线程数,corePoolSizemaximumPoolSize设置的边界自动调整池大小:

    corePoolSize < 运行的线程数 < maximumPoolSize:仅当队列满时才创建新线程
    corePoolSize = 运行的线程数 = maximumPoolSize:创建固定大小的线程池
    
  • keepAliveTime

    如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止

  • unit

    keepAliveTime参数的时间单位

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
  • workQueue【重点】

    一个阻塞队列,用来存储等待执行的任务,它的使用规则如下:

    当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
    当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
    当队列满时,在有新任务时就创建新线程
    

    最常用的阻塞队列有如下三种,前提核心线程数已经用完:

    ArrayBlockingQueue【不推荐】:有界队列,当新任务进来时,如果队列满了并且线程数小于maximumPoolSize,会去创建新线程,如果线程数等于maximumPoolSize,则会通过拒接策略拒接任务
    LinkedBlockingQueue【推荐】:无界队列,当新任务进来时,会加到队列中,此时maximumPoolSize是无效的
    SynchronousQueue【特定场景】:在某次添加线程后必须等待其他线程取走后才能继续添加,否则会创建新的线程去执行,由此推断在A、B线程有依赖的情况下使用SynchronousQueue
    
  • threadFactory

    使用ThreadFactory创建新线程,默认使用DefaultThreadFactory创建线程

  • handle

    定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    

image

Executor的类图详解

继承关系:Executor -> ExecutorService -> AbstractExecutorService -> ThreadPoolExecutor

  • Executor

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,即在将来的某个时间执行给定的可运行程序。可运行程序会被线程池操作线程去执行,由线程池控制

  • ExecutorService

ExecutorService接口是继承了Executor的接口,并声明了一些方法:submit、invokeAll、invokeAny、shutDown

  • AbstractExecutorService

AbstractExecutorService抽象类实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法

  • ThreadPoolExecutor

ThreadPoolExecutor继承AbstractExecutorService抽象类

几个重要的方法

  • execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行

  • submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果

  • shutdown()shutdownNow()是用来关闭线程池的

ThreadPoolExecutor源码分析

类内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* ctl是线程池的主要控制状态,它包含了两个字段:
* workerCount:当前有效线程数
* runState:运行状态
*
* 为了用一个int来表示两个字段,我们规定workerCount用 (2^29)-1 表示,runState用 (2^31)-1 表示
*
* workerCount是已被允许启动但未被允许停止的workers数量,该数可能与实际运行的线程数不相同
*
* runState提供主要的生命周期控制:
* RUNNING: 接受新任务并处理排队的任务
* SHUTDOWN: 不接受新任务,而是处理排队的任务
* STOP: 不接受新任务,不处理排队任务,中断进行中的任务
* TIDYING: 所有任务都已终止,workerCount为0,线程转换为状态TIDYING 将运行Terminated()挂钩方法
* TERMINATED: 调用Terminated()完成
*
* runState随时间而改变,转换顺序如下:
*
* RUNNING -> SHUTDOWN
* 在调用shutdown()时,可能隐式在finalize()中
* (RUNNING or SHUTDOWN) -> STOP
* 在调用shutdownNow()时
* SHUTDOWN -> TIDYING
* 当等待队列和工作者集合为空时
* STOP -> TIDYING
* 当工作者集合为空时
* TIDYING -> TERMINATED
* 当Terminate()挂钩方法完成时
*
* 检测到从SHUTDOWN到TIDYING的转换比您想要的要简单得多,
* 因为在SHUTDOWN状态期间队列在非空之后可能变为空,反之亦然
* 但是我们只有在看到队列为空后才能终止,看到workerCount为0.
*
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState 存在高位,所以左移
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// 打包ctl、解析ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 通过CAS增加workerCount
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 通过CAS减少workerCount
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 用于保留任务并移交给工作线程的队列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 线程池的可重入锁,用来控制线程的执行与中断
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的线程工作者的集合,持有mainLock才可以访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* 线程池中达到的最大线程数,持有mainLock才可以访问
*/
private int largestPoolSize;
/**
* 计数完成的任务数。仅在工作线程终止时更新。持有mainLock才可以访问
*/
private long completedTaskCount;
/*
* 多线程共用的变量,要声明为volatile
*/
/**
* 用于创建线程的工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝处理器,用于队列满了或者shutdown之后,会拒绝新的线程工作者
*/
private volatile RejectedExecutionHandler handler;
/**
* 超出corePoolSize,并且小于maximumPoolSize的线程启动后,如果空闲时间超过keepAliveTime,则会被回收
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程也有存活时间,默认false
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程数是线程池会保持线程存活的最大数
*/
private volatile int corePoolSize;
/**
* 线程最大容量数是线程池最多拥有的线程数
*/
private volatile int maximumPoolSize;
/**
* 线程池 默认拒绝处理器
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* 调用shutdown和shutdownNow所需的权限
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");

/* 执行终结器时使用的上下文,或者为null */
private final AccessControlContext acc;

private static final boolean ONLY_ONE = true;

/**
* Class Worker主要维护线程运行任务的中断控制状态,以及其他一些记录
* 该类通过继承AQS简化了对锁的操作
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

/** 当前工作者的线程 */
final Thread thread;
/** 工作者的第一个任务,可以为空 */
Runnable firstTask;
/** 当前工作者完成的任务计数器 */
volatile long completedTasks;

Worker(Runnable firstTask) {
// new出来后,设置state=-1禁止中断,直到runWorker()
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** 将主运行循环委托给外部runWorker */
public void run() {
runWorker(this);
}

// state=0代表解锁状态
// state=0代表加锁状态

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 如果state>=0,则强制中断正在执行的线程
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/**
* 主工作者运行循环。反复从队列中获取任务并执行它们
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 开始之前会先解锁,开始的时候再加锁,这中间就可以被中断
w.unlock();
// 是否异常退出循环
boolean completedAbruptly = true;
try {
// 开始循环执行任务,如果有firstTask会先执行,没有则getTask()
while (task != null || (task = getTask()) != null) {
// 获得一个任务后,给内部AQS加锁
w.lock();
// 如果线程池正在停止,并且当前线程未被打断,则中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 开始执行之前需要做的事(目前为空,留给子类实现)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行task的内容
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 完成执行之前需要做的事(目前为空,留给子类实现)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 完成后给内部AQS解锁
w.unlock();
}
}
// 没有异常
completedAbruptly = false;
} finally {
// 执行到这里说明getTask()返回null,说明当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的工作线程干掉
processWorkerExit(w, completedAbruptly);
}
}

/**
* 循环到等待队列中获取任务
*/
private Runnable getTask() {
// 拉取任务是否超时
boolean timedOut = false;

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果runStatus=shutdown并且等待队列为空,则退出循环
// 如果runStatus=STOP,则退出循环
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 当前工作者数量
int wc = workerCountOf(c);

// 是否做线程超时存活判断
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果工作者数量大于maximumPoolSize,则退出循环
// 如果当前线程做超时判断并且超时了,workCount>1,且等待队列为空
// 则减少workCount,并返回空任务,退出循环
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 如果当前工作者做会做线程超时判断,则去等待队列拉取任务,等待keepAliveTime时间
// 如果当前工作者做不做线程超时判断,则去等待队列拉取任务,无限期等待,直到有任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

/**
* 处理工作者的退出
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作者由于异常退出,则手动减少workCount
if (completedAbruptly)
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
// 操作工作者集合,需要锁
mainLock.lock();
try {
// 计算线程池完成任务数,并移除工作者
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试转换状态到TERMINATED
tryTerminate();

int c = ctl.get();
// 线程池状态处于运行或shutdown时【todo】
if (runStateLessThan(c, STOP)) {
// 工作者正常完成任务
// 如果允许核心线程回收的话,当workQueue非空时,保留一个worker
// 如果不允许核心线程回收的话,如果workerCount>corePoolSize,则返回,否则,保留corePoolSize个worker
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
...
}

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
/**
* 执行一个线程任务,该任务可能被新线程执行或者已经存在的线程执行
* 如果任务无法提交执行,可能因为线程池已经shutdown或者线程池已经满了
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 若workCount小于corePoolSize,则添加该任务的工作者(若成功则返回,失败进入下一步)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 若线程正在执行,则把任务加到等待队列中
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取当前状态进行判断
// 如果线程池非运行状态了,则移除等待队列中的任务,并tryTerminate,移除成功后调用拒策略
// 如果线程池在运行,并且workerCount=0,则创建一个空的工作者
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果加入队列失败,则创建工作者去执行,用非核心线程,否则拒绝
else if (!addWorker(command, false))
reject(command);
}

/**
* 检查是否可以针对当前池状态和给定的界限(核心或最大值)添加新的工作者
* 如果是这样,则将调整workCount计数,并在可能的情况下创建并启动一个新的工作者,并将firstTas作为其第一个任务运行。
* 如果线程池池已停止或可以关闭,则此方法返回false。
* 如果在询问时线程工厂无法创建线程,它也会返回false。
* 如果线程创建失败(由于线程工厂返回null或由于异常(通常是Thread.start()中OutOfMemoryError)),
* 我们将进行干净的回滚
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池状态是否SHUTDOWN,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 判断线程池工作线程数是否超过容量,返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加workCount,成功跳出最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,重读ctl,如果线程池状态改变则,重复外层循环,直到CAS成功
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获得线程池可重入锁,加锁,操作工作集合需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新校验线程池状态,避免获取锁后线程池被shutdown
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 预检查t是否可启动
if (t.isAlive())
throw new IllegalThreadStateException();
// 加入线程池工作集合,更新largestPoolSize,workerAdded=true
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果工作者成功加入工作集合,则启动该工作者的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果工作者未启动,则执行失败处理
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

/**
* 添加工作者失败,执行回滚:
* 移除工作集合中的该工作者,CAS来减少workCount,重新检查是否终止,以防此工作者的存在阻止了终止
* 尝试转换状态到TERMINATED
*/
private void addWorkerFailed(Worker w) {
// 获得线程池可重入锁,加锁,操作工作集合需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

/**
* 如果执行器存在该任务,则将其从执行器的内部队列中移除
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

/**
* 尝试转换状态到TERMINATED,当线程池SHUTDOWN,则中断可能正在等待任务的线程(一个)
* 必须在可能导致终止的任何操作之后调用此方法,以减少关闭状态下的工作人员计数或从队列中删除任务
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池还在运行 或 线程池已经终止 或 线程池SHUTDOWN,工作列表不为空时,放弃转换
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果workerCount不为0,则中断可能正在等待任务的线程(一个)
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 如果workerCount=0,获取锁,CAS更新ctl为终止
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 线程池终止时执行的动作,现在为空
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 释放所有锁
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// CAS失败则重复循环
}
}

/**
* 中断可能正在等待任务的线程,以便检查终止或配置更改
* 注意,此处忽略无法获得到锁的线程,这些线程后续完成后会自己退出
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 尝试获取工作者内部AQS锁,如果获取到则可以中断,否则忽略等它执行完
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务
*/
public void shutdown() {
// 获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果有安全管理器,请确保调用者具有权限关闭线程
checkShutdownAccess();
// 通过CAS更新runState为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有工作者
interruptIdleWorkers();
// 【todo】
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试转换状态到TERMINATED
tryTerminate();
}

/**
* 如果有安全管理器,请确保调用者具有权限,通常可以关闭线程
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}

/**
* 将runState转换为给定目标,如果已经至少为给定目标,则将其保留
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

/**
* 中断所有工作者
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

shutdownNow()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 通过CAS更新runState为STOP
advanceRunState(STOP);
// 强制中断所有线程,即使处于活动状态也是如此
interruptWorkers();
// 获取等待队列的所有任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试转换状态到TERMINATED
tryTerminate();
return tasks;
}

/**
* 强制中断所有线程,即使处于活动状态也是如此
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

/**
* 将任务队列排到一个新列表中,通常使用drainTo。
* 但是,如果队列是一个延迟队列或任何其他类型的队列,
* poll或drainTo可能无法删除某些元素,那么它将逐个删除这些元素。
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

思考与总结

线程池中包含如下参数:

  • AtomicInteger声明的ctl,它包含了两个含义:workerCount(当前有效线程数)、runState(运行状态)
  • ReentrantLock声明的可重入锁mainLock,主要用在操作工作者集合时
  • HashSet<Worker>声明的工作者集合workers,持有mainLock才可以访问
  • BlockingQueue<Runnable>声明的等待队列workQueue(包含还没执行的任务列表)

runState控制线程池的生命周期:

  • RUNNING(-1):接受新任务并处理排队的任务
  • SHUTDOWN(0):不接受新任务,而是处理排队的任务
  • STOP(1):不接受新任务,不处理排队任务,中断进行中的任务
  • TIDYING(2):所有任务都已终止,workerCount=0,线程转换为状态TIDYING将运行Terminated()挂钩方法
  • TERMINATED(3):调用Terminated()完成

Worker工作者是继承AQS实现的,能够控制内部线程对任务的执行,内部包含:

  • thread:工作者内部拥有一个线程
  • firstTask:工作者的第一个任务,可以为空
  • 工作者内部执行一个任务,需要获得AQS的独占锁,防止内部错乱,执行任务和尝试终止时都需要获得独占锁
  • 执行完一个任务后,再去workQueue拉取任务,如果空闲的工作者太多,则会根据corePoolSizemaximumPoolSize来销毁工作者

往线程池中加入一个任务的逻辑如下:

  • 如果线程池中工作者数量小于corePoolSize,则增加工作者(附加当前任务)
  • 如果线程池中工作者数量大于等于corePoolSize,则把任务加入到workQueue等待队列中
  • 如果线程池中工作者数量大于等于corePoolSize,并且加入workQueue队列失败,则尝试开新的工作者去执行(这些工作者数量大于corePoolSize,空闲后会被销毁)
  • 如果线程池中工作者数量大于等于corePoolSize,并且加入workQueue队列失败,并且开新的工作者也失败,则根据线程池的拒绝策略,拒绝任务

增加工作者,并开始工作的逻辑如下:

  • 判断线程池状态是否SHUTDOWN
  • 判断线程池工作线程数是否超过容量
  • CAS增加workCount
  • 新建一个工作者,并把任务作为工作者的第一个任务,获取线程池可重入锁,加锁,把工作者加入到工作集合中,解锁
  • 启动工作者,开始执行任务(内部线程执行任务)
  • 如果工作者启动失败,则执行回滚(把工作者移出工作集合)

源码中多次执行tryTerminate()方法,尝试终止线程池,逻辑如下:

  • 如果线程池处于RUNNINGTIDYINGSHUTDOWN且工作列表不为空时,放弃转换
  • 在可能导致终止的任何操作之后调用此方法,可能是最后一个线程退出所以需要尝试终止线程池
  • 如果线程池处于SHUTDOWN且工作列表为空,则尝试中断一个工作者(中断时,会先获取线程池锁,再尝试获取每一个工作者的AQS独占锁),此处仅中断一个(因为调用tryTerminate都是因为某个工作者退出)
  • 如果线程池处于SHUTDOWN且工作列表为空,且工作者数量为空,则终止线程池

为什么即需要ReentrantLock还需要volatileAQS独占锁?

  • ReentrantLock主要用在操作工作集合,防止工作集合在多线程环境下出问题
  • volatile声明的workCountrunState,变化用CAS去做,这块的变化在ReentrantLock锁之外做,提升效率
  • Worker中用AQS独占锁来控制工作者的任务执行和任务中断

线程池工厂类:Executors

Executors:提供了一系列静态工厂方法用于创建各种线程池,实际上也是间接调用了ThreadPoolExocutor,不过是传的不同的构造参数

  • Executors.newFixedThreadPool:创建可重用且固定线程数的线程池,corePoolSize=maximumPoolSize,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程

  • Executors.newCachedThreadPool:创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程

  • Executors.newScheduledThreadPool:创建一个可延迟执行或定期执行的线程池,内部使用无界延迟阻塞队列DelayedWorkQueue

  • Executors.newSingleThreadExecutor:创建一个单线程的线程池,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务,适用于有顺序的任务的应用场景

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

线程的join

join的作用是转并为串行,注意join要在线程start之后执行

  • join()

在A线程中调用了B线程的join()方法时,表示只有当B线程执行完毕时,A线程才能继续执行

join的意思是使得放弃当前线程的执行,并返回对应的线程,例如下面代码的意思就是:
程序在main线程中调用t1线程的join方法,则main线程放弃cpu控制权,并返回t1线程继续执行直到线程t1执行完毕,所以结果是t1线程执行完后,才到主线程执行,相当于在main线程中同步t1线程,t1执行完了,main线程才有执行的机会

1
2
3
4
5
Thread t1 = new Thread(new ThreadA("线程一"));
Thread t2 = new Thread(new ThreadA("线程二"));
t1.start();
t1.join();
t2.start();
  • join(10)

如果A线程中调用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行

1
2
3
4
5
Thread t1 = new Thread(new ThreadA("线程一"));
Thread t2 = new Thread(new ThreadA("线程二"));
t1.start();
t1.join(10);
t2.start();

参考文献