AbstractQueuedSynchronizer解析(1)

不以文害辞,不以辞害志;以意逆志,是为得之。——孟子

 最近在看ThreadPoolExecutor类的源码。我发现其中一个重要的子类Woker继承自AbstractQueuedSynchronizer(简称AQS)。AQS帮助Woker类轻松实现一个非重入的排它锁。不看不知道一看吓一跳,竟然有11个类继承自AQS,而且这些类都是并发包下面使用频率很高的类。这足以说明这个类的含金量,值得认真研究一下。
 

源码注释

  为实现依赖于等待队列的阻塞锁和相关同步器(信号量、事件)提供一个框架。此类的设计目标是成为依靠单个原子整形数值来表示状态的大多数同步器的基础。子类必须定义更改此状态的方法,并定义哪种状态对于此对象意味着被获取或被释放。实现这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但是只有原子更新整形值的应用方法(涉及到同步)被追踪,以下是这三个应用方法 getState()、setState(int) 和compareAndSetState(int, int)。

  应该将子类定义为非公共内部帮助类,可用它们来实现其内部类的同步属性。AbstractQueuedSynchronizer类 没有实现任何同步接口。而是定义了诸如acquireInterruptibly(int) 之类的方法,在适当的时候可以通过具体的锁和相关同步器调用它们,以达到同步的目的。

  此类支持默认的独占 模式和共享 模式任何一种方式,也可以同时支持两种模式。处于独占模式下,其他线程不可能再成功获取锁。在共享模式下,多个线程获可能成功获得锁。此类只是机械地感知到在共享模式下成功获取某一锁时,下一个等待线程(如果存在)也必须确定是否可以成功获取该锁。处于不同模式下的等待线程可以共享相同的 FIFO 队列。通常,子类只支持其中一种模式,但两种模式都可以在ReadWriteLock 中发挥作用。只支持独占模式或者只支持共享模式的子类不必定义未使用模式的方法。

使用

描述

AQS简核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:

  • 线程阻塞队列的维护
  • 线程阻塞和唤醒。

  共享变量的修改都是通过CAS操作完成的。AbstractQueuedSynchronizer类是一个典型的模板方法类。它的主要方法是acquire和release。 acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lock和unlock方法。

下面这4个方法由子类去实现:

1
2
3
4
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)

ThreadPoolExecutor.Worker

下面就来看一下以下Worker的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103


/**

* Worker类为正在运行任务的线程维护中断状态控制,以及其他次要的工作。该类

* 通过继承AbstractQueuedSynchronizer类,环绕任务执行加锁,简化锁地获取与释放

* 过程。不使用中断旨在唤醒唤醒一个等待任务的线程,而不是一个正在运行的任务线程。

* 我们实现一个简单的不可重入锁,而不是直接使用互斥锁ReentrantLock。因为我们

* 不希望当他们调用线程池控制方法如setCorePoolSize时,工作者的任务的锁被再次获取。

* 为了抑制线程真正启动前中断,我们初始化锁的状态为负值,开始时变为正值。

*/

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

{

...

// Lock methods state是父类AbstractQueuedSynchronizer的字段

// The value 0 represents the unlocked state.

// The value 1 represents the locked state.

protected boolean isHeldExclusively() {

return getState() != 0;

}

//尝试获取当前的worker。重载父类方法,父类中不支持该方法。

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

//释放当前的worker

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

//获取失败放入队列,并且线程被中断

public void lock() { acquire(1); }

public boolean tryLock() { return tryAcquire(1); }

public void unlock() { release(1); }

public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

}

//原子操作改变锁的状态

protected final boolean compareAndSetState(int expect, int update) {

// See below for intrinsics setup to support this

return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

}