PriorityBlockingQueue源码解析

优先阻塞队列,一定会涉及优先队列的实现,而且是平衡优先队列。一起看看Doug Lea大师是怎么做的。

PriorityBlockingQueue和

简介

  一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。

  此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。

  在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则需要插入一个新的FIFOEntry(anEntry) 来替换普通的条目对象。

##属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

//优先队列用平衡二叉堆表示:queue[n] queue[2*n+1]和queue[2*(n+1)]。最小堆。

private transient Object[] queue;

//优先队列元素个数

private transient int size;

//比较器

private transient Comparator<? super E> comparator;

//所有public方法的锁

private final ReentrantLock lock;

private final Condition notEmpty;

//分配自旋锁通过CAS获取

private transient volatile int allocationSpinLock;

//兼容以前的版本,序列化和反序列化使用

private PriorityQueue<E> q;

put 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153

public void put(E e) {

offer(e); // never need to block

}

public boolean offer(E e) {

if (e == null)

throw new NullPointerException();

final ReentrantLock lock = this.lock;

lock.lock();

int n, cap;

Object[] array;

//优先队列数组装满需要扩容,扩容可能失败,需要循环

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap);

try {

Comparator<? super E> cmp = comparator;

//将元素插入优先队列

if (cmp == null)

siftUpComparable(n, e, array);

else

siftUpUsingComparator(n, e, array, cmp);

size = n + 1;

notEmpty.signal();

} finally {

lock.unlock();

}

return true;

}

//优先队列扩容。考虑到有可能扩容过程中有消费线程获取元素,可以在不扩容

//的前提下继续使用队列。增加吞吐量

private void tryGrow(Object[] array, int oldCap) {

//我猜是为了消费者线程可以继续消费,增加并发量

lock.unlock(); // must release and then re-acquire main lock

Object[] newArray = null;

//获取到偏向锁标志位的才能扩容,使用CAS操作

if (allocationSpinLock == 0 &&

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

0, 1)) {

try { //oldCap容量小于64扩展一倍加二,其他情况扩展一半

int newCap = oldCap + ((oldCap < 64) ?

(oldCap + 2) : // grow faster if small

(oldCap >> 1));//边界检查还是很重要的防止内存泄漏

if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow

int minCap = oldCap + 1;

if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

throw new OutOfMemoryError();

newCap = MAX_ARRAY_SIZE;

}

if (newCap > oldCap && queue == array)

newArray = new Object[newCap];

} finally {

allocationSpinLock = 0;

}

}//其他线程还是有机会提前扩容新的数组

if (newArray == null) // back off if another thread is allocating

Thread.yield();

lock.lock(); //数组copy

if (newArray != null && queue == array) {

queue = newArray;

System.arraycopy(array, 0, newArray, 0, oldCap);

}

}

//优先队列(最小堆)平衡插入。k代表数组中元素个数。

//插入过程,将元素放在平衡堆的最后一个位置,父节点大于当前节点

//交换位置,递归重复这个过程。


private static <T> void siftUpComparable(int k, T x, Object[] array) {

Comparable<? super T> key = (Comparable<? super T>) x;

while (k > 0) {

int parent = (k - 1) >>> 1;//父节点下标

Object e = array[parent];

if (key.compareTo((T) e) >= 0) //大于父节点循环终止

break;

array[k] = e;

k = parent;

}

array[k] = key;

}

take 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

E result;

try {

while ( (result = dequeue()) == null)

notEmpty.await();

} finally {

lock.unlock();

}

return result;

}

//持有锁才可以访问,堆中第一个元素为最小


private E dequeue() {

int n = size - 1;

if (n < 0)

return null;

else {

Object[] array = queue;

E result = (E) array[0];

E x = (E) array[n];

array[n] = null;

Comparator<? super E> cmp = comparator;

if (cmp == null)

siftDownComparable(0, x, array, n);

else

siftDownUsingComparator(0, x, array, n, cmp);

size = n;

return result;

}

}

//堆的头元素被去掉,头元素作为Z,选取头元素较小的子节点占据头结点位置。

//将头元素较小子节点作为Z重复这个过程,直到Z小于队列最后一个元素key,

//或者Z没有子节点。将key放置到Z的位置。


private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {

if (n > 0) {

Comparable<? super T> key = (Comparable<? super T>)x;

int half = n >>> 1; // loop while a non-leaf

while (k < half) {

int child = (k << 1) + 1; // assume left child is least

Object c = array[child];

int right = child + 1;

if (right < n && //左子节点大于右子节点

((Comparable<? super T>) c).compareTo((T) array[right]) > 0)

c = array[child = right]; //c等于两个子节点中的较小值

if (key.compareTo((T) c) <= 0)//如果key小于c停止循环

break;

array[k] = c;

k = child;

}

array[k] = key;

}

}

SynchronousQueue

简介

  一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null元素。

  同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

  对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。

源码