SynchronousQueue源码解析

SynchronousQueue

  SychronousQueue类的大部分代码都交给Transfer内部类的两个子类ThransferQueue和ThransferStack完成。我只写关于ThransferStack的代码。
  

这两个子类使用非阻塞无锁操作实现,理解起来有些困难。无锁是提高性能的重要手段,与此同时它让代码变得复杂。接下来要自己动手写无锁操作,增加对无锁操作的理解

http://www.tuicool.com/articles/aYrEZf

简介

  一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null元素。

  同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

  对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。

take操作

public E take() throws InterruptedException {

    E e = transferer.transfer(null, false, 0);

    if (e != null)

        return e;

    Thread.interrupted();

    throw new InterruptedException();

}

TransferStack类

属性

1
2
3
4
5
6
7
8

static final int REQUEST = 0; //代表未完成的消费者

static final int DATA = 1; //代表未完成的生产者

static final int FULFILLING = 2; //已经完成状态

volatile SNode head; //栈的顶端

transfer操作

图片是粗略的过程,很多东西无法表示,还是看源码。无锁操作还是要看好几遍才能领会作者的意图,加油吧少年!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

/**

* 放置或者获取一个节点

*/

E transfer(E e, boolean timed, long nanos) {



SNode s = null; // constructed/reused as needed

//节点e为null,代表这是一个消费者,否则它是一个生产者

int mode = (e == null) ? REQUEST : DATA;



for (;;) {

SNode h = head;

//同为消费者或者生产者

if (h == null || h.mode == mode) { // empty or same-mode

if (timed && nanos <= 0) { // can't wait 不允许等待

if (h != null && h.isCancelled())//栈顶节点请求已取消

casHead(h, h.next); // pop cancelled node

else

return null;

//允许等待,尝试将e设置为栈顶,失败就继续循环尝试。

//如果栈顶元素正在处理,说明栈顶元素找到自己匹配对象(看不懂跳过)

} else if (casHead(h, s = snode(s, e, h, mode))) {

//返回s的匹配元素

SNode m = awaitFulfill(s, timed, nanos);

//m == s 说明被取消,等待时间截止未出现匹配元素

if (m == s) { // wait was cancelled

clean(s);

return null;

}

//新的节点覆盖s节点,移除s

if ((h = head) != null && h.next == s)

casHead(h, s.next); // help s's fulfiller

//返回生产者的item

return (E) ((mode == REQUEST) ? m.item : s.item);

}

} else if (!isFulfilling(h.mode)) { // try to fulfill

//颜色不同且栈顶元素未被处理

//栈顶元素被取消,移除栈顶元素

if (h.isCancelled()) // already cancelled

casHead(h, h.next); // pop and retry

//将s放入栈顶,并且设置为正在处理

else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

    //栈中元素有可能时间到达,所以要循环遍历

for (;;) { // loop until matched or waiters disappear

SNode m = s.next; // m is s's match

if (m == null) { // all waiters are gone

casHead(s, null); // pop fulfill node

s = null; // use new node next time

break; // restart main loop

}

SNode mn = m.next;

if (m.tryMatch(s)) {

casHead(s, mn); // pop both s and m

return (E) ((mode == REQUEST) ? m.item : s.item);

} else // lost match

s.casNext(m, mn); // help unlink

}

}

} else { // help a fulfiller

//栈顶元素正在处理,帮助栈顶处理

SNode m = h.next; // m is h's match

if (m == null) // waiter is gone

casHead(h, null); // pop fulfilling node

else {

SNode mn = m.next;

if (m.tryMatch(h)) // help match

casHead(h, mn); // pop both h and m

else // lost match

h.casNext(m, mn); // help unlink

}

}

}

}

awaitFulfill操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

SNode awaitFulfill(SNode s, boolean timed, long nanos) {

//计算等待终结时间,naoTime()函数更加精确

final long deadline = timed ? System.nanoTime() + nanos : 0L;

Thread w = Thread.currentThread();

int spins = (shouldSpin(s) ?

(timed ? maxTimedSpins : maxUntimedSpins) : 0);

for (;;) {

if (w.isInterrupted()) //线程中断,取消s节点

s.tryCancel();

SNode m = s.match;

if (m != null) //匹配成功返回

return m;

if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) { //截止时间到达,取消当前线程的SNode

s.tryCancel();

continue;

}

}

//线程挂起

if (spins > 0)

 spins = shouldSpin(s) ? (spins-1) : 0;

else if (s.waiter == null)

s.waiter = w; // establish waiter so can park next iter

else if (!timed) //无限期等待

LockSupport.park(this);

else if (nanos > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanos);

}

}