ArrayBlockingQueue和LinkedBlockingQueue

两点疑问没有解决:

1、为什么ArrayBlockingQueue不使用takeLock、putLock的双锁机制?使用这种锁分离的策略明显效率更高。

2、为什么判断是否存在剩余空间,使用while 而不是if?难道作者怕出现意外,但是什么时候会出现意外呢?

ArrayBlockingQueue

  一个数组组成的有界阻塞队列,保持队列基本特性先进先出。
  此类对等待的生产者线程和使用者线程进行排序的公平策略选项提供支持。默认情况下,不保证是这种排序策略。通过将fairness设置为true构造的队列允许按照FIFO顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。公平性的保证是通过ReentrantLock的公平锁实现。阻塞超时通过Condition的awaitNanos实现。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

//保存队列的数组,循环使用数组。泛型是不能使用数组

final Object[] items;

int takeIndex;//下一次移除元素所在下标

int putIndex; //下一次添加元素所在下标

int count; //队列中元素数量

final ReentrantLock lock; //重入锁保证线程安全

//Condition的await相当于Object提供的wait

//signals相当于notify方法

private final Condition notEmpty; //获取元素线程使用

private final Condition notFull; //放置元素线程使用

put操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

public void put(E e) throws InterruptedException {

checkNotNull(e); //不能放入null

final ReentrantLock lock = this.lock;

lock.lockInterruptibly(); //中断异常交给调用者处理

try {

//整个数组被填满,线程阻塞。为什么用while而不用if语句?

//难道是为了方式被唤醒时队列再次被填满。但是当前线程占用锁,

//不可能有其他线程调用put方法并且成功添加元素。

//想想肯定有不使用lock的方法添加元素成功。

//如果真有这样的方法它一定不是线程安全的。往下看吧- -!

while (count == items.length)

notFull.await();

enqueue(e);

} finally {

lock.unlock();

}

//插入元素到指定位置,放置下标前移,发出非空信号量 。持有锁才能调用

private void enqueue(E x) {

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)//循环使用数组

putIndex = 0;

count++;

notEmpty.signal();//发出非空信号

}

take操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51


public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {//队列为空等待

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}



private E dequeue() {

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex]; //Object转泛型

items[takeIndex] = null;

if (++takeIndex == items.length)//循环数组

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

notFull.signal();//发出非空通知

return x;

}

drainTo操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87


public int drainTo(Collection<? super E> c, int maxElements) {

checkNotNull(c);

if (c == this)

throw new IllegalArgumentException();

if (maxElements <= 0)

return 0;

final Object[] items = this.items;

final ReentrantLock lock = this.lock;

lock.lock();

try {

int n = Math.min(maxElements, count);

int take = takeIndex;

int i = 0;

try {

while (i < n) { //有add存在不能保证n的正确性

@SuppressWarnings("unchecked")

E x = (E) items[take];

c.add(x);

items[take] = null;

if (++take == items.length)//循环添加

take = 0;

i++;

}

return n;

} finally {

// Restore invariants even if c.add() threw

if (i > 0) {

count -= i;

takeIndex = take;//下一次获取元素的位置

if (itrs != null) {

if (count == 0)

itrs.queueIsEmpty();

else if (i > take)

itrs.takeIndexWrapped();

}

for (; i > 0 && lock.hasWaiters(notFull); i--)

notFull.signal();//取走多个元素可以发出多个未满信号

}

}

} finally {

lock.unlock();

}

}

LinkedBlockingQueue

  一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。链表动态分配空间可以处理更多的节点,但是动态分配空间会两份一些时间。
  可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
与ArrayBlockingQueue的不同点1)一个使用数组,固定大小,一个使用链表,非固定大小,最大是int最大值2)LinkedBlockingQueue使用锁分离策略,正类包含takeLock和putLock两把锁,提高了并发效率。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

private final int capacity;//记录队列容量

//使用原子整数记录当前队列中元素的个数

private final AtomicInteger count = new AtomicInteger();

transient Node<E> head;//头部元素

private transient Node<E> last;//尾部元素

//获取头部元素锁和

private final ReentrantLock takeLock = new ReentrantLock();

private final Condition notEmpty = takeLock.newCondition();

//插入尾部元素锁

private final ReentrantLock putLock = new ReentrantLock();

private final Condition notFull = putLock.newCondition();

构造函数

1
2
3
4
5
6
7
8
9
10

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);

}

我们可以看出初始化的时候LBQ含有一个Node节点,头指针和尾指针都指向它,count=0。也就是说LBQ一直包含一个多余的Node节点。

put操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

//putLock锁住以后队列中元素个数只会减小不会增大

final AtomicInteger count = this.count;

putLock.lockInterruptibly();//锁住put锁

try {

while (count.get() == capacity) {

notFull.await();

}

enqueue(node);

//原子操作保证take与put不会导致线程安全问题

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

}



private void enqueue(Node<E> node) {

last = last.next = node;

}

总结

  • 需要公平机制来避免消费者、生产者饥饿,这时使用ABQ再好不过了。

  • 如果队列的大小是有界的话,首选还是ABQ,ABQ的性能更好,不需要申请空间。否则还是LBQ吧!LBQ可以接受突入起来的大量生产者。

  • 阻塞队列的效率某些场景的效率会低于非阻塞,下一次再写关于非阻塞队列CourrentLinkedQueue