ConcurrentHashMap

概述

顾名思义,ConcurrentHashMap,是支持并发的HashMap。作为散列表,它采用的数据结构与HashMap基本一致,都是采取array+list(linkedlist)/tree(red black tree)的形式。作为支持并发的集合,与Hashtable简单的采取synchronized关键字实现同步,ConcurrentHashMap使用了更为复杂的机制,包括volatile变量、原子操作CAS、synchronized等。借此,ConcurrentHashMap在get()时不需要加锁,put()时也只是对应bin加锁,比Hashtable更快。由于ConcurrentHashMap的数据结构及其实现与HashMap相似,所以本文不再多述,而关注于并发实现。

导航

构造方法

CurrentHashMap的构造方法很简单,设置capacity(容量)、loadFactor(装载因子)、concurrencyLevel(并发数量,保留参数,实际上没有使用)等属性。同HashMap,CurrentHashMap也选择了延时初始化:在第一次put的时候进行初始化。
与HashMap相比,CurrentHashMap拥有一个控制并发的关键变量:sizeCtl。当map未初始化时,sizeCtl=初始化容量;初始化后,sizeCtl>0时,则代表着下次再散列的门槛容量,sizeCtl<0时,则代表map正在进行初始化或者再散列(-1表示正在初始化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// MAXIMUM_CAPACITY=1 << 30
// 同HashMap,进行了散列优化,需要保证cap为2的次方形式
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 未初始化时,sizeCtl=初始化容量
this.sizeCtl = cap;
}

private static final int tableSizeFor(int c) {
// 假设c>2^(i-1) && c<=2^i, 则n=2^i-1, n+1=2^i,即不小于n的最大2次方
int n = -1 >>> Integer.numberOfLeadingZeros(c - 1);
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

get

get操作相对简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 合理散列的情况下,bin大多只有一个node。所以这里先判断头结点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// static final int MOVED = -1; // hash for forwarding nodes
// static final int TREEBIN = -2; // hash for roots of trees
// static final int RESERVED = -3; // hash for transient reservations,computeIfAbsent和 compute方法使用
// 调用各自结点(Node的子类)的find方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

// 此处只含Forwarding Node、ReservationNode,treebin见下文

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
// 遍历
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}

// ForwardingNode
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 未完成转移,跳出循环
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
// 继续循环
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}

// ReservationNode
Node<K,V> find(int h, Object k) {
return null;
}

put

put时,如果未进行过初始化,将会initTable。如果当前的bin正在进行resize,则会帮助resize,直到整个resize完成,插入新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
// key与value均不支持为null
if (key == null || value == null) throw new NullPointerException();
// 获得hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
// 初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 新bin
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) // 原子操作
break; // no lock when adding to empty bin
}
// 当一个bin的头结点的hash值=MOVED时,这个结点叫做forwarding nodes(正在转移的结点),代表这个bin正在被转移(oldTable->newTable)
else if ((fh = f.hash) == MOVED)
// 帮助转移
tab = helpTransfer(tab, f);
// onlyIfAbsent并且first结点相等
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
// first node加锁
synchronized (f) {
// 再次检查
if (tabAt(tab, i) == f) {
if (fh >= 0) { // listbin
// bin中元素数量
// addCount方法参数
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 遍历,增加新结点至末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
// addCount方法参数(>=1)
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数,见下文
addCount(1L, binCount);
return null;
}

// 初始化(只有一个线程可以进行初始化)
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
// sizeCtl=1.5n
sizeCtl = sc;
}
break;
}
}
return tab;
}

size

在高并发的情况下,只有一个线程能完成CAS,其他线程会不断的循环等待。如果简单的设置count字段来统计,无疑会产生较大的资源浪费。为此,ConcurrentHashMap使用了baseCount与CounterCell[] counterCells来进行count统计与处理。每个counterCell对应着线程的计数,并使用了@Contended注解来避免false sharing的发生。线程只有在尝试更新baseCount失败时,才会尝试去更新counterCell[current]。因此,size()的值由baseCount与counterCells共同决定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 避免false sharing
@jdk.internal.vm.annotation.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] cs = counterCells;
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}

在put时,会调用addCount方法,对baseCount与counterCells进行操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 直接更新basecount失败
CounterCell c; long v; int m;
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null || // 当前线程对应的cs[current]=null
!(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { // 通过CAS更新cs[current]失败
fullAddCount(x, uncontended); // 竞争条件下
return;
}
// check<=1(存在空bin)直接返回不进入resize检查
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) // 当前容量大于再散列门槛
&& (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
// sizeCtl中用于管理多线程resize的stamp(容量=n时)
int rs = resizeStamp(n);
// 协助resize,与helpTransfer方法类似
if (sc < 0) {
// stamp不等(n发生了变化)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 此时sizeCtl是一个绝对值很大的复数,以sc+1来统计当前resize线程数
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 开始resize时,设sc=rs << RESIZE_STAMP_SHIFT + 2<0
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] cs; CounterCell c; int n; long v;
// counterCells已被初始化过
if ((cs = counterCells) != null && (n = cs.length) > 0) {
// 当前线程对应的cs[cur]=null,无冲突
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { // lock
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// unlock
cellsBusy = 0;
}
if (created)
break; // 成功,跳出循环
// Recheck失败,继续循环
continue; // Slot is now non-empty
}
}
collide = false;
}
// 产生冲突,推进一次hash值h,继续循环
else if (!wasUncontended) // CAS already known to fail,肯定是竞争状态下
wasUncontended = true; // Continue after rehash
// 尝试CAS
else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
break;
else if (counterCells != cs || n >= NCPU)
collide = false; // At max size or stale
// 上述条件皆不成立,说明产生了冲突。若再次推进h后仍然失败,则进行resize
else if (!collide)
collide = true;
// 进行resize
else if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == cs) // Expand table unless stale
counterCells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
// counterCells未被初始化且lock counterCells成功,则进行初始化
else if (cellsBusy == 0 && counterCells == cs &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == cs) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// lock counterCells失败,尝试直接更新base
else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

再散列

再散列入口在addcount方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
private final void addCount(long x, int check) {
// ...
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// sizeCtl中用于管理多线程resize的stamp(容量=n时)
int rs = resizeStamp(n);
// 协助resize,与helpTransfer方法类似
if (sc < 0) {
// stamp不等(n发生了变化)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// sizeCtl是一个绝对值很大的复数,以sc+1来统计当前resize线程数
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 开始resize时,设sc=rs << RESIZE_STAMP_SHIFT + 2<0
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
// =null则已经结束
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// sizeCtl中用于管理多线程的stamp
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
// 正在resize
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

// 将第RESIZE_STAMP_BITS设为1,确保开始resize时,sizeCtl=(rs << RESIZE_STAMP_SHIFT) + 2<0
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME(Out of Memory Error)
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// The next table index (plus one) to split while resizing
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 是否推进
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {

// 分配转移区间

Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// 越界||已完成
if (--i >= bound || finishing)
advance = false;
// 转移任务已被分完
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 竞争成功,当前线程负责转移nextIndex-1至nextBound(nextIndex - stride)部分
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}

// 执行转移

// 转移结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
// sizeCtl=1.5n
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// resize线程数-1
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// n发生了改变
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 头结点为null时,直接设为ForwardingNode完成转移
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 开始转移
else {
// 头结点加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// 假设n=2^k,若runBit=0,则fh的k位为0,fh&(2*2^k-1)=fh&(2^k-1),所以f在nextTab与当前tab中的下标相等,否则同理,f在nextTab为当前下标+n
int runBit = fh & n;
Node<K,V> lastRun = f;
// 遍历至尾结点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
// 下标不动的node list
ln = lastRun;
hn = null;
}
else {
// 下标+n的node list
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 设头结点为ForwardingNode
setTabAt(tab, i, fwd);
// 继续推进
advance = true;
}
else if (f instanceof TreeBin) {
// ...
}
}
}
}
}
}

TreeBin

相对于HashMap的table[] 直接存储tree的root节点,ConcurrentHashMap存的则是一个特别的结点:first。顾名思义,first指向TreeBin中第一个插入的结点

1
2
3
4
5
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null);
this.first = b;
// ...
}

first结点与next字段的存在,treebin可以在等待或树正在进行重构时,进行顺序遍历来寻找元素。而root只有在真正需要加锁的时候(树重构)的时候才会被加锁,提高了根据root遍历的效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
// 低两位表示等待或持有写锁,第三位开始计数读锁数量
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

// 树重构时需要写锁
private final void lockRoot() {
// cas设置写锁失败
if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
}
// 解除全部锁状态
private final void unlockRoot() {
lockState = 0;
}

private final void contendedLock() {
boolean waiting = false;
for (int s;;) {
// s&11..01=0,s=00..x0,除了等待锁其他均未被抢占(写写、读写互斥)
if (((s = lockState) & ~WAITER) == 0) {
// 直接竞争写锁
if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) {
if (waiting)
// 清除waiter(当前线程)
waiter = null;
return;
}
}
// 等待锁未被抢占
else if ((s & WAITER) == 0) {
// 竞争等待锁
if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
// 竞争等待锁成功,waiter=当前线程
waiter = Thread.currentThread();
}
}
else if (waiting)
// 竞争等待锁成功,阻塞当前线程
LockSupport.park(this);
}
}

final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
// 等待锁或写锁被抢占,树正在或等待进行重构,使用next进行线性搜索
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
// 设置读锁成功
else if (U.compareAndSetInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
// 释放读锁
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
// 存在waiter线程
(READER|WAITER) && (w = waiter) != null)
// 恢复waiter线程
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
}

遍历

key、value、entry迭代器的实现类似,这里key来举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Base of key, value, and entry Iterators. Adds fields to
* Traverser to support iterator.remove.
*/
static class BaseIterator<K,V> extends Traverser<K,V> {
final ConcurrentHashMap<K,V> map;
Node<K,V> lastReturned;
BaseIterator(Node<K,V>[] tab, int size, int index, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, size, index, limit);
this.map = map;
advance();
}

public final boolean hasNext() { return next != null; }
public final boolean hasMoreElements() { return next != null; }

public final void remove() {
Node<K,V> p;
if ((p = lastReturned) == null)
throw new IllegalStateException();
lastReturned = null;
// 替换成null,如果原值不为null且被替换成null,则size--
map.replaceNode(p.key, null, null);
}
}

static final class KeyIterator<K,V> extends BaseIterator<K,V>
implements Iterator<K>, Enumeration<K> {
KeyIterator(Node<K,V>[] tab, int size, int index, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, size, index, limit, map);
}

public final K next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
K k = p.key;
lastReturned = p;
advance();
return k;
}

public final K nextElement() { return next(); }
}

可见KeyIterator的实现是扩展了BaseIterator,而BaseIterator又扩展了Traverser。其中的关键方法hasNext()和next(),有来自Traverser的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
static class Traverser<K,V> {
Node<K,V>[] tab; // current table; updated if resized
Node<K,V> next; // the next entry to use
// 可以看做一个TableStack list
TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
int index; // index of bin to use next
int baseIndex; // current index of initial table
int baseLimit; // index bound for initial table
final int baseSize; // initial table size

Traverser(Node<K,V>[] tab, int size, int index, int limit) {
this.tab = tab;
this.baseSize = size;
this.baseIndex = this.index = index;
this.baseLimit = limit;
this.next = null;
}

/**
* Advances if possible, returning next valid node, or null if none.
*/
final Node<K,V> advance() {
Node<K,V> e;
// 如果bin中仍然有元素,则返回e.next
if ((e = next) != null)
e = e.next;
for (;;) {
Node<K,V>[] t; int i, n; // must use locals in checks
if (e != null)
return next = e;
// 非法范围
if (baseIndex >= baseLimit || (t = tab) == null ||
(n = t.length) <= (i = index) || i < 0)
return next = null;
// 下一个bin为listbin时,在下一次循环返回;不为listbin时,进入if
if ((e = tabAt(t, i)) != null && e.hash < 0) {
if (e instanceof ForwardingNode) {
// 切换至新tab
tab = ((ForwardingNode<K,V>)e).nextTable;
e = null;
// 保存当前遍历的状态
pushState(t, i, n);
continue;
}
// 返回treebin的first结点
else if (e instanceof TreeBin)
e = ((TreeBin<K,V>)e).first;
else
e = null;
}
// stack不为空,则说明有暂存的未遍历bin
if (stack != null)
recoverState(n);
// 扩容后,原bin中元素位于新tab的原tab下标i或者i+原tab.length处
else if ((index = i + baseSize) >= n)
index = ++baseIndex; // visit upper slots if present
}
}

/**
* Saves traversal state upon encountering a forwarding node.
*/
private void pushState(Node<K,V>[] t, int i, int n) {
TableStack<K,V> s = spare; // reuse if possible
if (s != null)
spare = s.next;
else
s = new TableStack<K,V>();
s.tab = t;
s.length = n;
s.index = i;
s.next = stack;
stack = s;
}

/**
* Possibly pops traversal state.
*
* @param n length of current table
*/
private void recoverState(int n) {
TableStack<K,V> s; int len;
while ((s = stack) != null && (index += (len = s.length)) >= n) {
n = len;
index = s.index;
tab = s.tab;
s.tab = null;
TableStack<K,V> next = s.next;
s.next = spare; // save for reuse
stack = next;
spare = s;
}
// 扩容后
if (s == null && (index += baseSize) >= n)
index = ++baseIndex;
}
}

static final class TableStack<K,V> {
int length;
int index;
Node<K,V>[] tab;
TableStack<K,V> next;
}