AbstractQueuedSynchronizer解析(2)

针对性重复练习。持续做你不会做的事。

Node类

  Node节点组成的队列是CLH(Craig, Landin, and Hagersten)的变种。CLH通常作为自旋锁使用。我们将它作为阻塞同步器使用,但是前驱节点持有线程的控制信息的策略是相同的。每个节点中的状态字段保存线程的阻塞信息。前驱节点释放时,当前节点将得到通知信号。每个队列中的节点作为specific-notification-style监视器,持有唯一的等待线程。队列中的第一个线程有机会尝试获取锁,但是不保证一定成功获得(非公平锁),它只是给了线程竞争的权利,所以被释放的竞争者可能再次等待。

1
2
3
     +------+  prev +-----+       +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+

  进入到CLH队列,你只需要一个原子操作将它拼接在队列尾部。出队,你只需要重置头部节点。
插入CLH队列需要一个放入尾部的原子操作,因此存在一个指针指向队中和非队中的分界点。出队需要更新“head”指针。CLH队列需要花费大量的工作决定队首能否成功获取锁,还需要花费一部分时间处理超时和中断导致的任务取消。
  pre指针链接的链表主要保存取消节点。如果一个节点已经取消,它的后继指针被连接到
一个未取消的前驱节点。
  我们使用next指针的链接的链表实现阻塞队列机制。每个节点内部保存线程的id,所以一个前驱节点通过遍历next指针查找下一个应该被唤醒的节点。确认后继者过程必须避免新入队节点设置next指针的并发竞争。

Node源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

static final class Node {

//需要注意的是下面两个节点都是静态属性,用来表示当前共享还是独占模式
static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;

/**
* 状态字段 ,只有一下几种值
* SIGNAL:当前节点后继者被阻塞(通过park方法)。节点被释放或者取消时,
* 后继节点必须被唤醒。为了避免竞争,获取方法,acquire 方法的过程必须是:
* 首先获取一个锁释放信号量;然后使用原子操作获取锁;最后获取失败阻塞线程。
* CANCELLED: 当前节点因为超市或者中断被取消。节点永远不会离开
* 这个状态。尤其取消的线程节点不会再次被阻塞。
* CONDITION:节点处于Condition队列。状态值改变前,它不会被用作同步队列
* 的节点,这个时候状态值被设置为0
* PROPAGATE:共享锁被释放时,把这个信号传递给其他节点。及时其他操作被
* 干扰doReleaseShared操作仍然能够传递。
* 0: None of the above
* 为负数时,表示当前节点不需要被唤醒。这个字段的初始值是0。
*/
volatile int waitStatus;


/**
* 参考waitStatus字段,连接到一个前驱节点。入队时赋值,出队设置为null。
* 在前驱节点取消时,我们将当前节点链接到一个永远不会取消的节,
* head节点永远不会取消。只有被成功获取锁的节点才能变为head节点。
* 取消的节点不可能成功
*/
volatile Node prev;

/**
* 链接到后继节点,在它释放的时候唤醒后继节点。入队时复制,
* 绕过取消的前驱节点时被修改,出队时被设置为null。直到节点成功加入
* 到对位,enq操作才会为它的前驱节点赋值。说以看到next字段为null
* 的节点不意味着他是队尾。 However, if a next field appears
* to be null, we can scan prev's from the tail to double-check. 、
* 取消节点的nex字段被设置为它自身 而不是null,这样有利于isOnSyncQueue
* 操作。
*/
volatile Node next;

//拥有当前结点的线程。
volatile Thread thread;

/**
* 链接等待condition的下一个节点,或者特定值 SHARED。
* 因为condition队列只有在独占模式下才会持有。再次获取锁的时候转变。
*/
Node nextWaiter;

}

addWaiter入队操作

  利用当前线程和给定的模式创建Node节点并且入队。head和tail字段的初始化在这个操作中完成。初始化过程:

  • 如果tail字段为null,生成一个新的Node节点。
  • head和tail指向这个新节点。
  • 新的节点waitStatus为0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

Node pred = tail;

//tail为null,说明tail第一次被初始化
if (pred != null) {

node.prev = pred;

//cas操作设置node为tail,失败进入enq
if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Node enq(final Node node) {

//多个线程加入到队尾,或者tail未被初始化。
//cas操作可能失败,必须使用循环保证最终成功。
for (;;) {

Node t = tail;

if (t == null) { //尾部为null,初始化tail

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

acquire独占模式获取锁操作

  子类通过调用这个方法获取锁。取得锁的规则有tryAcquire方法持有,子类重写这个方法而且不会阻塞线程。

1
2
3
4
5
6
7
8
9
10
11
12

public final void acquire(int arg) {

// tryAcquire返回false,那么就加入阻塞队列阻塞线程,并等待前继结点释放锁。
// acquireQueued返回true说明线程应该被中断
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

// acquireQueued返回true,说明当前线程被中断唤醒后获取到锁,
// 重置其interrupt,status为true。
selfInterrupt();

}

  tryAcquire失败线程会加入队列 线程可能会反复的被阻塞和唤醒直到tryAcquire成功,这是因为线程可能被中断, 而acquireQueued方法中会保证忽视中断,只有tryAcquire成功了才返回。
  中断版本的独占获取是acquireInterruptibly方法,doAcquireInterruptibly这个方法中如果线程被中断则acquireInterruptibly会抛出InterruptedException异常。addWaiter方法只是入队操作,acquireQueued方法是主要逻辑,需要重点理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

// 自旋re-check
for (;;) {

final Node p = node.predecessor();

// 前继是head,说明next就是node了,则尝试获取锁。
if (p == head && tryAcquire(arg)) {

// 前继出队,node成为head
setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// p != head 或者 p == head但是tryAcquire失败了,那么
// 应该阻塞当前线程等待前继唤醒。需要设置前继的waitStaus为SIGNAL
// parkAndCheckInterrupt返回可能是前继unpark或线程被中断。
// parkAndCheckInterrupt返回值代表线程是否被中断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

// 说明当前线程是被中断唤醒的。
//注意:线程被中断之后会继续走到if处去判断,也就是会忽视中断。
// 除非碰巧线程中断后acquire成功了,那么根据Java的最佳实践,
// 需要重新设置线程的中断状态(acquire.selfInterrupt)。
interrupted = true;

}

}

finally {

// 出现异常

if (failed)

cancelAcquire(node);

}

}

shouldParkAfterFailedAcquire方法的作用是:

  • 确定后继是否需要park;
  • 跳过被取消的结点;
  • 设置前继的waitStatus为SIGNAL.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

// 前继结点已经准备好unpark其后继了,所以后继可以安全的park
if (ws == Node.SIGNAL)

/*
 * This node has already set status asking a release to signal it,
 * so it can safely park.
 */

return true;

if (ws > 0) {// CANCELLED

 // 跳过被取消的结点。
 do {

 node.prev = pred = pred.prev;

 } while (pred.waitStatus > 0);

 pred.next = node; 

} else {// 0 或 PROPAGATE (CONDITION用在ConditonObject,这里不会是这个值)

 /**
* waitStatus 等于0(初始化)或PROPAGATE。说明线程还没有park,会先重试
 * 确定无法acquire到再park。
 */
 // 更新pred结点waitStatus为SIGNAL 
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

线程被唤醒只可能是:被unpark,被中断或伪唤醒。被中断会设置interrupted,acquire方法返回前会 selfInterrupt重置下线程的中断状态,如果是伪唤醒的话会for循环re-check。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

独占模式释放

比较简单只要直接唤醒后续结点就可以了,后续结点会从parkAndCheckInterrupt方法中返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public final boolean release(int arg) {

// tryReease由子类实现,通过设置state值来达到同步的效果。
if (tryRelease(arg)) {

Node h = head;

// waitStatus为0说明是初始化的空队列
if (h != null && h.waitStatus != 0)

// 唤醒后续的结点
unparkSuccessor(h);

return true;

}

return false;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

private void unparkSuccessor(Node node) {

/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

acquireShared共享获取模式

获取共享锁过程与独占模式基本相同。
- 尝试获取加入队列失败加入队列
- 如果是队首再次获取锁
- 再次失败挂起当前线程,等待被唤醒。
与独占模式不同的是成功获取共享锁之后调用setHeadAndPropagate,继续向后遍历队列,寻找相邻的共享节点给予锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final void acquireShared(int arg) {
//如果没有许可了则入队等待
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
// 添加队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 等待前继释放并传递
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);// 尝试获取
if (r >= 0) {
// 获取成功则前继出队,跟独占不同的是
// 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快 获取到剩下的许可。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}

// p != head || r < 0
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
}
finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate方法会将node设置为head。如果当前结点acquire到了之后发现还有许可可以被获取,则继续释放自己的后继, 后继会将这个操作传递下去。这就是PROPAGATE状态的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 以下情况尝试唤醒后继的结点:
* propagate > 0说明许可还有能够继续被线程acquire;
* 或者之前的head被设置为PROPAGATE(PROPAGATE可以被转换为SIGNAL)说明需要往后传递;
* 或者为null,我们还不确定什么情况。
* 并且后继结点是共享模式或者为如上为null。
* 上面的检查有点保守,在有多个线程竞争获取/释放的时候可能会导致不必要的唤醒。
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
// 后继结是共享模式或者s == null(不知道什么情况)
// 如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作
// 即使后面有结点是共享模式。
if (s == null || s.isShared())
// 唤醒后继结点
doReleaseShared();
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
// 队列不为空且有后继结点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 不管是共享还是独占只有结点状态为SIGNAL才尝试唤醒后继结点
if (ws == Node.SIGNAL) {
// 将waitStatus设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);// 唤醒后继结点
// 如果状态为0则更新状态为PROPAGATE,更新失败则重试
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果过程中head被修改了则重试。
if (h == head) // loop if head changed
break;
}
}

参考文章:http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-AbstractQueuedSynchronizer.html