1. 容器初始化

https://www.bilibili.com/video/BV17i4y1x71z?t=421.7

1.1. 源码分析

在 jdk8 的 ConcurrentHashMap 中一共有 5 个构造方法,这四个构造方法中都没有对内部的数组做初始化, 只是对一些变量的初始值做了处理

jdk8 的 ConcurrentHashMap 的数组初始化是在第一次添加元素时完成

PS: 传入 32,最终大小为 64,与前面的都不同

注意,调用这个方法,得到的初始容量和我们之前讲的 HashMap 以及 jdk7 的 ConcurrentHashMap 不同,即使你传递的是一个 2 的幂次方数,该方法计算出来的初始容量依然是比这个值大一点的 2 的幂次方数

JDK1.8: 传递进来一个初始容量,ConcurrentHashMap 会基于这个值计算一个比这个值大一半的 2 的幂次方数作为初始容量

1
2
3
4
5
6
7
8
9
10
11
12
13
//没有维护任何变量的操作,如果调用该方法,数组长度默认是16  
public ConcurrentHashMap() {
}

//传递进来一个初始容量,ConcurrentHashMap会基于这个值计算一个比这个值大一半的2的幂次方数作为初始容量
public ConcurrentHashMap(int initialCapacity) {
   if (initialCapacity < 0)
       throw new IllegalArgumentException();
   int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
              MAXIMUM_CAPACITY :
              tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
   this.sizeCtl = cap;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//调用2个参数的构造  
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
   this(initialCapacity, loadFactor, 1);
}

//计算一个大于或者等于给定的容量值,该值是2的幂次方数作为初始容量
public ConcurrentHashMap(int initialCapacity,
                        float loadFactor, int concurrencyLevel) {
   if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
       throw new IllegalArgumentException();
   if (initialCapacity < concurrencyLevel)   // Use at least as many bins
       initialCapacity = concurrencyLevel;   // as estimated threads
   long size = (long)(1.0 + (long)initialCapacity / loadFactor);
   int cap = (size >= (long)MAXIMUM_CAPACITY) ?
       MAXIMUM_CAPACITY : tableSizeFor((int)size);
   this.sizeCtl = cap;
}

//基于一个Map集合,构建一个ConcurrentHashMap
//初始容量为16
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
   this.sizeCtl = DEFAULT_CAPACITY;
   putAll(m);
}

1.2. sizeCtl 含义解释

注意:以上这些构造方法中,都涉及到一个变量 sizeCtl,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样,这里我们先对这个变量不同的值的含义做一下说明,后续源码分析过程中,进一步解释

sizeCtl 为 0,代表数组未初始化, 且数组的初始容量为 16

sizeCtl 为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值

sizeCtl 为 -1,表示数组正在进行初始化

sizeCtl 小于 0,并且不是 -1,表示数组正在扩容, -(1+n),表示此时有 n 个线程正在共同完成数组的扩容操作

2. 添加安全

2.1. 总体概述⭐️🔴

image.png

  1. 如果 tab 为空,调用 initTable() 方法进行初始化,通过 CAS+自旋 保证线程安全
  2. 如果 tab 不为空,就判断所在的桶是否为空,如果是的话,说明是第一个元素,就调用 casTabAt() 方法直接新建节点添加到 Node 数组中就可以了
  3. 如果正在扩容,就帮助扩容
  4. 如果没有扩容也不为空,就把元素插入桶中,先使用 synchronized 进行加锁,这个锁的粒度就是数组的具体的一个元素,fh 是当前索引位置的 hash 值,如果大于等于 0,说明是链表,否则是红黑树。链表插入会对 binCount 加一操作,新元素插入尾部,如果 key 相同覆盖原来的值
  5. 判断 binCount 是否大于等于 TREEIFY_THRESHOLD(值为 8) ,这时候调用 treeifyBin() 方法考虑将链表转换为红黑树,真正要转为红黑树还要求数组长度大于 64

2.2. 数组初始化,initTable 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//cas+自旋,保证线程安全,对数组进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//sizeCtl为0,取默认长度16,否则去sizeCtl的值
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//基于初始长度,构建数组对象
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值,并赋值给sc
sc = n - (n >>> 2);
}
} finally {
//将扩容阈值,赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

2.2.1. 流程图

https://www.processon.com/view/link/6369aced0e3e74618c3a6872

2.3. 添加元素 put/putVal 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public V put(K key, V value) {  
   return putVal(key, value, false);
}

static final int HASH_BITS = 0x7fffffff; // 保证hash为正数
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
   //如果有空值或者空键,直接抛异常
   if (key == null || value == null) throw new NullPointerException();
   //基于key计算hash值,并进行一定的扰动
   int hash = spread(key.hashCode());
   //记录某个桶上元素的个数,如果超过8个,会转成红黑树
   int binCount = 0;
   for (Node<K,V>[] tab = table;;) {
       Node<K,V> f; int n, i, fh;
       //如果数组还未初始化,先对数组进行初始化
       if (tab == null || (n = tab.length) == 0)
           tab = initTable();
    //如果hash计算得到的桶位置没有元素,利用cas将元素添加
       else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
           //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
           if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
               break;                   // no lock when adding to empty bin
      }
       //如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
       else if ((fh = f.hash) == MOVED)
           tab = helpTransfer(tab, f);
       else {
           //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
           V oldVal = null;
           //对当前桶进行加锁,保证线程安全,执行元素添加操作
           synchronized (f) {
               if (tabAt(tab, i) == f) {
                   //普通链表节点
                   if (fh >= 0) {
                       binCount = 1;
                       for (Node<K,V> e = f;; ++binCount) {
                           K ek;
                           if (e.hash == hash &&
                              ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                               oldVal = e.val;
                               if (!onlyIfAbsent)
                                   e.val = value;
                               break;
                          }
                           Node<K,V> pred = e;
                           if ((e = e.next) == null) {
                               pred.next = new Node<K,V>(hash, key,
                                                         value, null);
                               break;
                          }
                      }
                  }
                   //树节点,将元素添加到红黑树中
                   else if (f instanceof TreeBin) {
                       Node<K,V> p;
                       binCount = 2;
                       if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                      value)) != null) {
                           oldVal = p.val;
                           if (!onlyIfAbsent)
                               p.val = value;
                      }
                  }
              }
          }
          //注意下,这个判断是在同步锁外部,因为 treeifyBin内部也有同步锁,并不影响
           if (binCount != 0) {
               //链表长度大于/等于8,将链表转成红黑树
               if (binCount >= TREEIFY_THRESHOLD)
                   treeifyBin(tab, i);
               //如果是重复键,直接将旧值返回
               if (oldVal != null)
                   return oldVal;
               break;
          }
      }
  }
   //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
   addCount(1L, binCount);
   return null;
}

通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率

2.3.1. 图示

2.3.2. 流程图

https://www.processon.com/view/link/636a35ec5653bb5ba36c0559

3. 扩容安全

3.1. 源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果是扩容线程,此时新数组为null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//两倍扩容创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//记录线程开始迁移的桶位,从后往前迁移
transferIndex = n;
}
//记录新数组的末尾
int nextn = nextTab.length;
//已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//i记录当前正在迁移桶位的索引值
//bound记录下一次任务迁移的开始桶位

//--i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
//没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果没有更多的需要迁移的桶位,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//扩容任务线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断当前所有扩容任务线程是否都执行完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有扩容线程都执行完,标识结束
finishing = advance = true;
i = n; // recheck before commit
}
}
//当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//当前节点已经被迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//当前节点需要迁移,加锁迁移,保证多线程安全
//此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

3.2. 疑问

3.2.1. fh>=0⭐️🔴

%%
▶1.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230411-1512%%
❕ ^chx2qw

因为在 spread(key.hashCode()) 方法中 (h ^ (h >>> 16)) & HASH_BITS 保证了 hash 为非负数
然后 TreeBin 中的 static final int TREEBIN = -2,所以可以如此判断链表还是红黑树

3.3. 图解

图示为方便画图为 4,实际源码中为 16

4. 多线程扩容

多线程协助扩容的操作会在两个地方被触发:

① 当添加元素时,发现添加的元素对用的桶位为 fwd 节点,就会先去协助扩容,然后再添加元素
② 当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现 sizeCtl 的值小于 0,并且新数组不为空,这个时候,会去协助扩容

4.1. 源码分析

4.1.1. 元素未添加时协助扩容

元素未添加,先协助扩容,扩容完后再添加元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//发现此处为fwd节点,协助扩容,扩容结束后,再循环回来添加元素
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

//省略代码
1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//扩容,传递一个不是null的nextTab
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

4.1.2. 先添加元素再协助扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final void addCount(long x, int check) {
//省略代码

if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//元素个数达到扩容阈值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//sizeCtl小于0,说明正在执行扩容,那么协助扩容
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

4.2. 图解

5. 集合长度的累计方式

5.1. addCount 方法

5.1.1. 作用

两个作用 : 添加 (统计) 容器元素个数 、 检查是否达到了阈值而执行扩容操作.

5.1.2. 主要逻辑

① CounterCell 数组不为空,优先利用数组中的 CounterCell 记录数量

② 如果数组为空,尝试对 baseCount 进行累加,失败后,会执行 fullAddCount 逻辑

③ 如果是添加元素操作,会继续判断是否需要扩容

5.1.3. 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//当CounterCell数组不为空,则优先利用数组中的CounterCell记录数量
//或者当baseCount的累加操作失败,会利用数组中的CounterCell记录数量
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
//标识是否有多线程竞争
boolean uncontended = true;
//当as数组为空
//或者当as长度为0
//或者当前线程对应的as数组桶位的元素为空
//或者当前线程对应的as数组桶位不为空,但是累加失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//以上任何一种情况成立,都会进入该方法,传入的uncontended是false
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//计算元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//当元素个数达到扩容阈值
//并且数组不为空
//并且数组长度小于限定的最大值
//满足以上所有条件,执行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//这个是一个很大的正数
int rs = resizeStamp(n);
//sc小于0,说明有线程正在扩容,那么会协助扩容
if (sc < 0) {
//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//扩容线程加1,成功后,进行协助扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//协助扩容,newTable不为null
transfer(tab, nt);
}
//没有其他线程在进行扩容,达到扩容阈值后,给sizeCtl赋了一个很大的负数
//1+1=2 --》 代表此时有一个线程在扩容

//rs << RESIZE_STAMP_SHIFT)是一个很大的负数
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//扩容,newTable为null
transfer(tab, null);
s = sumCount();
}
}
}

5.2. fullAddCount 方法

① 当 CounterCell 数组不为空,优先对 CounterCell 数组中的 CounterCell 的 value 累加

② 当 CounterCell 数组为空,会去创建 CounterCell 数组,默认长度为 2,并对数组中的 CounterCell 的 value 累加

③ 当数组为空,并且此时有别的线程正在创建数组,那么尝试对 baseCount 做累加,成功即返回,否则自旋

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//获取当前线程的hash值
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
//标识是否有冲突,如果最后一个桶不是null,那么为true
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//数组不为空,优先对数组中CouterCell的value累加
if ((as = counterCells) != null && (n = as.length) > 0) {
//线程对应的桶位为null
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
//创建CounterCell对象
CounterCell r = new CounterCell(x); // Optimistic create
//利用CAS修改cellBusy状态为1,成功则将刚才创建的CounterCell对象放入数组中
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//桶位为空, 将CounterCell对象放入数组
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
//表示放入成功
created = true;
}
} finally {
cellsBusy = 0;
}
if (created) //成功退出循环
break;
//桶位已经被别的线程放置了已给CounterCell对象,继续循环
continue; // Slot is now non-empty
}
}
collide = false;
}
//桶位不为空,重新计算线程hash值,然后继续循环
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//重新计算了hash值后,对应的桶位依然不为空,对value累加
//成功则结束循环
//失败则继续下面判断
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//数组被别的线程改变了,或者数组长度超过了可用cpu大小,重新计算线程hash值,否则继续下一个判断
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//当没有冲突,修改为有冲突,并重新计算线程hash,继续循环
else if (!collide)
collide = true;
//如果CounterCell的数组长度没有超过cpu核数,对数组进行两倍扩容
//并继续循环
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//CounterCell数组为空,并且没有线程在创建数组,修改标记,并创建数组
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//数组为空,并且有别的线程在创建数组,那么尝试对baseCount做累加,成功就退出循环,失败就继续循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

5.3. 图解

6. 集合长度获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
//获取baseCount的值
long sum = baseCount;
if (as != null) {
//遍历CounterCell数组,累加每一个CounterCell的value值
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

7. 参考

7.1. 黑马

%%
▶3.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230411-1521%%
❕ ^5h9x2g

7.1.1. 视频

https://www.bilibili.com/video/BV17i4y1x71z/?from=search&seid=3516507855592185473&spm_id_from=333.337.0.0&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204

7.1.2. 资料

1
/Users/taylor/Nutstore Files/Obsidian_data/pages/002-schdule/001-Arch/001-Subject/001-基础知识专题/001-集合框架

[[ConcurrentHashMap源码分析(二)]]

[[20221108-ConcurrentHashMap - 知乎]]