ConcurrentHashMap源码解析

   JDK1.8中,ConcurrentHashMap的源码竟然有6300行,作者是大名鼎鼎Doug Lea(感谢开源让我离大神可以这么近,不过代码风格明显没打算交给他人维护)。本文只讲述扩容相关的内容和get操作的并发安全实现,其他操作就放弃了。文章参考一下别人的文章,主要是以下两篇文章:
  

http://blog.csdn.net/u010887744/article/details/51002358
http://www.cnblogs.com/huaizuo/p/5413069.html
  友情提示:看ConcurrentHashMap源码还是需要一些Java并发编程的了解,比如内存可见性、原子性、顺序性、无锁。另一点就是先阅读HashMap的源码。

JDK8 HashMap数据结构

图1

1
2
3
4
5
6
7
8
 static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//省略省略
}
transient volatile Node<K,V>[] table;

  HashMap底层由数组、链表和红黑树三者组成。我们从图1和源码大改也可以推断出来。首先是HashMap使用Node组成的数组table装载所有的键值对,再者HashMap发生Hash值碰撞使用链表将它们连起来,最后是某个链表长度大于指定值链表转化为红黑树。

重要的常量

基本常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 transient volatile Node<K,V>[] table;

//扩容时使用,其他时候为null
private transient volatile Node<K,V>[] nextTable;

//基础计数器,主要用于无争用,同时也作为table数组初始化的回退记录
private transient volatile long baseCount;

//table数组初始化和扩容的控制标志。为负数时,table数组正在扩容或初始化,
//-1代表初始化,其他负数表示 -(1+参数扩容的线程数量)。当table为null时,
//sizeCtl持有扩容的阈值,0为默认值。
private transient volatile int sizeCtl;

//table扩容时 每个线程最少需要处理的元素数量。看不懂就跳过去,下同
private static final int MIN_TRANSFER_STRIDE = 16;

//当table扩容时,transferIndex持有下一个线程或者下次循环可以处理的数组界限
private transient volatile int transferIndex;

//table数组的默认长度
private static final int DEFAULT_CAPACITY = 16;

//table数组的负载因子. 小数形式不经常使用,它更常见的表示形式是 n - (n >>> 2)
private static final float LOAD_FACTOR = 0.75f;

Unsafe相关常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL; //sizeCtl的偏移量
private static final long TRANSFERINDEX; //transferIndex的偏移量
private static final long BASECOUNT; //
private static final long CELLSBUSY;
private static final long CELLVALUE;
//数组中第一个元素在数组中的位置
private static final long ABASE;
//Node数组元素大小scale的二进制去除前导零后的位数
private static final int ASHIFT;

static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
  //获取sizeCtl的偏移量
SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));
  //省略代码,和上面一样获取各个变量的偏移量
...
Class<?> ak = Node[].class;
//Arrays和Java别的对象一样,都有一个对象头,它是存储在实际的数据前面的。
//这个头的长度可以通过unsafe.arrayBaseOffset(T[].class) 方法来获取到,
   //这里T是数组元素的类型
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak); //获取数组元素的大小
if ((scale & (scale - 1)) != 0) //元素大小必须是2的倍数,不知道为什么
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}

重要的方法

无锁方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    //内存中直接获取tab数组张第i个元素,保证内存可见性
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//为tab数组的第i个元素复制,使用CAS,保证操作的原子性

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
/**设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者
有延迟的putObjectVolatile方法,并且不保证值的改变被其他线程立
即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候
使用才有用。*/
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

扩容方法transfer

扩容过程解析

  ConcurrentHashMap无锁多线程扩容,减少扩容时的时间消耗。单线程构建两倍容量的nextTable;允许多线程复制原table元素到nextTable。为每个内核均分任务,并保证其不小于16(常量MIN_TRANSFER_STRIDE)。
1)若nextTab为null,则初始化其为原table的2倍;
2)死循环遍历,直到finishing。

  • 节点为空,则插入ForwardingNode;
  • 链表节点(fh>=0),分别插入nextTable的i和i+n的位置;
  • TreeBin节点(fh<0),判断是否需要untreefi,分别插入nextTable的i和i+n的位置;
  • finishing时,nextTab赋给table,更新sizeCtl为新容量的0.75倍 ,完成扩容。
    以上说的都是单线程,多线程又是如何实现的呢?
    遍历到ForwardingNode节点((fh = f.hash) == MOVED),说明此节点被处理过了,直接跳过。这是控制并发扩容的核心 。由于给节点上了锁,只允许当前线程完成此节点的操作,处理完毕后,将对应值设为ForwardingNode(fwd),其他线程看到forward,直接向后遍历。如此便完成了多线程的复制工作,也解决了线程安全问题。
    

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n表示原数组的大小 ,stride表示每个线程或者单次循环负责节点的个数
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 为nextTable申请空间
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//构建一个连节点的指针,用于标识位
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
//循环的关键变量,判断是否已经扩容完成
boolean finishing = false; // to ensure sweep before committing nextTab
//i--操作保证了倒序遍历数组,bound表示新的线程或者单次循环的下边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;

while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) //循环没有到达下边界可以继续for循环
advance = false;
else if ((nextIndex = transferIndex) <= 0) { //整个for循环过程结束
i = -1;
advance = false;
}
  //只有通过CAS成功更新ransferIndex才能获取到扩容的资格
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//i<0说明已经遍历完旧的数组tab;i>=n什么时候有可能呢?
 //在下面看到i=n,所以目前i最大应该是n吧,导致的原因可能是多线程
//i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//a
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,
  //说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,
  //因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,
   //始终保证只有一个线程执行了 a(上面注释a)
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
  //finishing和advance保证线程已经扩容完成了可以退出循环
finishing = advance = true;
i = n; // recheck before commit
}
}
 //如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
 //那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//把链表分表拆分为,hash&n等于0和不等于0的,
    //然后分别放在新表的i和i+n位置。n一定是2的幂,这个很重要
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab
setTabAt(tab, i, fwd);
advance = true;
}//下面红黑树基本和链表差不多
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//判断扩容后是否还需要红黑树结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

get操作

get操作没有使用锁,怎么保证并发安全的?
线程安全基本概念可以参考这篇笔记:GYM2-Java内存模型和三个概念
线程安全:

  • 内存可见性
  • 操作原子性
  • 线程内代码执行顺序性

分析get在其他操作发生的线程安全保障:

  • table数组使用volatile修饰,保证table数组的内存可见性。
  • 线程内代码执行顺序性保证,能力有限不知道怎么分析。
  • put操作将Node节点放入table[i]时(table[i]==null、table[i]是一个链表或者是一个红黑树):查找放置位置是不改变table数组,找到安放位置,使用CAS更新tab[i]保证操作原子性,没有
  • remove操作与put操作类似,remove操作过程不改变table[i],改变table[i]使用CAS保证原子性。
  • put操作结束时链表转换为红黑树:转化过程不会直接改变链表,而是用链表中的Node节点生成新的TreeBin节点。最后,将红黑树的根节点更新table[i],当然更新操作使用CAS以保证原子性。
  • 扩容操作中table[i]移到nextTable时,这个过程使用CAS操作,get操作会在nextTable中查找。没有移到到nextTable时,不会修改table[i],不存在线程安全问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}