从源码看ConcurrentHashMap

谁借莪1个温暖的怀抱¢ 2023-07-18 12:54 12阅读 0赞

简介

ConcurrentHashMap是线程安全的HashMap实现,这里主要研究JDK8后的ConcurrentHashMap,下面是ConcurrentHashMap的简单结构:在这里插入图片描述
ConcurrentHashMap基于HashMap的基本逻辑,通过CAS + synchronized 来保证并发安全性。ConcurrentHashMap使用的数组及数组的每个节点都为volatile类型,通过CAS进行更新删除操作,而所有的链表操作都需要通过synchronized锁定链表的头节点,然后进行操作。

  1. /**
  2. * The array of bins. Lazily initialized upon first insertion.
  3. * Size is always a power of two. Accessed directly by iterators.
  4. */
  5. transient volatile Node<K,V>[] table;
  6. /**
  7. * Key-value entry. This class is never exported out as a
  8. * user-mutable Map.Entry (i.e., one supporting setValue; see
  9. * MapEntry below), but can be used for read-only traversals used
  10. * in bulk tasks. Subclasses of Node with a negative hash field
  11. * are special, and contain null keys and values (but are never
  12. * exported). Otherwise, keys and vals are never null.
  13. */
  14. static class Node<K,V> implements Map.Entry<K,V> {
  15. final int hash;
  16. final K key;
  17. volatile V val;
  18. volatile Node<K,V> next;
  19. }

核心方法

1.put方法

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) {
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0)
  8. tab = initTable();
  9. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  10. if (casTabAt(tab, i, null,
  11. new Node<K,V>(hash, key, value, null)))
  12. break; // no lock when adding to empty bin
  13. }
  14. else if ((fh = f.hash) == MOVED)
  15. tab = helpTransfer(tab, f);
  16. else {
  17. V oldVal = null;
  18. synchronized (f) {
  19. if (tabAt(tab, i) == f) {
  20. if (fh >= 0) {
  21. binCount = 1;
  22. for (Node<K,V> e = f;; ++binCount) {
  23. K ek;
  24. if (e.hash == hash &&
  25. ((ek = e.key) == key ||
  26. (ek != null && key.equals(ek)))) {
  27. oldVal = e.val;
  28. if (!onlyIfAbsent)
  29. e.val = value;
  30. break;
  31. }
  32. Node<K,V> pred = e;
  33. if ((e = e.next) == null) {
  34. pred.next = new Node<K,V>(hash, key,
  35. value, null);
  36. break;
  37. }
  38. }
  39. }
  40. else if (f instanceof TreeBin) {
  41. Node<K,V> p;
  42. binCount = 2;
  43. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  44. value)) != null) {
  45. oldVal = p.val;
  46. if (!onlyIfAbsent)
  47. p.val = value;
  48. }
  49. }
  50. }
  51. }
  52. if (binCount != 0) {
  53. if (binCount >= TREEIFY_THRESHOLD)
  54. treeifyBin(tab, i);
  55. if (oldVal != null)
  56. return oldVal;
  57. break;
  58. }
  59. }
  60. }
  61. addCount(1L, binCount);
  62. return null;
  63. }

我们可以看到put是通过一个死循环来实现,在循环逻辑内:

  1. 首先检查核心的Node[] table是否已经初始化,如果没有初始化,则利用CAS将sizeCtl的值置为-1进行初始化。
  2. 通过CAS查询key相应的槽位是否为 null,若为null直接通过CAS将键值对放入槽位。
  3. 如果相应的槽位已经有节点,并且其hash值为-1,则表示正在进行扩容,则当前线程帮忙进行扩容。
  4. 否则通过synchronized锁住槽位内的节点即链表的头结点,然后遍历链表,寻找是否有hash值及key值相同的节点,若有则将value设置进去,否者创建新的节点加入链表。
  5. 通过addCount函数更新ConcurrentHashMap键值对的数量。

2.get方法

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0)
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

get方法实现的逻辑比较简单:

  1. 利用key通过cas的方式获取其对应槽位的节点,若该节点就是想要查询的节点,那就直接返回value。
  2. 如果槽位内节点的hash值小于0则说明正在进行扩容,则通过ForwardingNode的find函数去新的数组nextTable中进行查找。
  3. 以上都不符合的话,就直接遍历节点,匹配就返回,否则最后就返回null。

3.扩容方法

  1. /**
  2. * Helps transfer if a resize is in progress.
  3. */
  4. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  5. Node<K,V>[] nextTab; int sc;
  6. if (tab != null && (f instanceof ForwardingNode) &&
  7. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  8. int rs = resizeStamp(tab.length);
  9. while (nextTab == nextTable && table == tab &&
  10. (sc = sizeCtl) < 0) {
  11. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  12. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  13. break;
  14. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  15. transfer(tab, nextTab);
  16. break;
  17. }
  18. }
  19. return nextTab;
  20. }
  21. return table;
  22. }
  23. /**
  24. * Moves and/or copies the nodes in each bin to new table. See
  25. * above for explanation.
  26. */
  27. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  28. int n = tab.length, stride;
  29. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  30. stride = MIN_TRANSFER_STRIDE; // subdivide range
  31. if (nextTab == null) {
  32. // initiating
  33. try {
  34. @SuppressWarnings("unchecked")
  35. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  36. nextTab = nt;
  37. } catch (Throwable ex) {
  38. // try to cope with OOME
  39. sizeCtl = Integer.MAX_VALUE;
  40. return;
  41. }
  42. nextTable = nextTab;
  43. transferIndex = n;
  44. }
  45. int nextn = nextTab.length;
  46. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  47. boolean advance = true;
  48. boolean finishing = false; // to ensure sweep before committing nextTab
  49. for (int i = 0, bound = 0;;) {
  50. Node<K,V> f; int fh;
  51. while (advance) {
  52. int nextIndex, nextBound;
  53. if (--i >= bound || finishing)
  54. advance = false;
  55. else if ((nextIndex = transferIndex) <= 0) {
  56. i = -1;
  57. advance = false;
  58. }
  59. else if (U.compareAndSwapInt
  60. (this, TRANSFERINDEX, nextIndex,
  61. nextBound = (nextIndex > stride ?
  62. nextIndex - stride : 0))) {
  63. bound = nextBound;
  64. i = nextIndex - 1;
  65. advance = false;
  66. }
  67. }
  68. if (i < 0 || i >= n || i + n >= nextn) {
  69. int sc;
  70. if (finishing) {
  71. nextTable = null;
  72. table = nextTab;
  73. sizeCtl = (n << 1) - (n >>> 1);
  74. return;
  75. }
  76. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  77. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  78. return;
  79. finishing = advance = true;
  80. i = n; // recheck before commit
  81. }
  82. }
  83. else if ((f = tabAt(tab, i)) == null)
  84. advance = casTabAt(tab, i, null, fwd);
  85. else if ((fh = f.hash) == MOVED)
  86. advance = true; // already processed
  87. else {
  88. synchronized (f) {
  89. if (tabAt(tab, i) == f) {
  90. Node<K,V> ln, hn;
  91. if (fh >= 0) {
  92. int runBit = fh & n;
  93. Node<K,V> lastRun = f;
  94. for (Node<K,V> p = f.next; p != null; p = p.next) {
  95. int b = p.hash & n;
  96. if (b != runBit) {
  97. runBit = b;
  98. lastRun = p;
  99. }
  100. }
  101. if (runBit == 0) {
  102. ln = lastRun;
  103. hn = null;
  104. }
  105. else {
  106. hn = lastRun;
  107. ln = null;
  108. }
  109. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  110. int ph = p.hash; K pk = p.key; V pv = p.val;
  111. if ((ph & n) == 0)
  112. ln = new Node<K,V>(ph, pk, pv, ln);
  113. else
  114. hn = new Node<K,V>(ph, pk, pv, hn);
  115. }
  116. setTabAt(nextTab, i, ln);
  117. setTabAt(nextTab, i + n, hn);
  118. setTabAt(tab, i, fwd);
  119. advance = true;
  120. }
  121. else if (f instanceof TreeBin) {
  122. TreeBin<K,V> t = (TreeBin<K,V>)f;
  123. TreeNode<K,V> lo = null, loTail = null;
  124. TreeNode<K,V> hi = null, hiTail = null;
  125. int lc = 0, hc = 0;
  126. for (Node<K,V> e = t.first; e != null; e = e.next) {
  127. int h = e.hash;
  128. TreeNode<K,V> p = new TreeNode<K,V>
  129. (h, e.key, e.val, null, null);
  130. if ((h & n) == 0) {
  131. if ((p.prev = loTail) == null)
  132. lo = p;
  133. else
  134. loTail.next = p;
  135. loTail = p;
  136. ++lc;
  137. }
  138. else {
  139. if ((p.prev = hiTail) == null)
  140. hi = p;
  141. else
  142. hiTail.next = p;
  143. hiTail = p;
  144. ++hc;
  145. }
  146. }
  147. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  148. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  149. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  150. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  151. setTabAt(nextTab, i, ln);
  152. setTabAt(nextTab, i + n, hn);
  153. setTabAt(tab, i, fwd);
  154. advance = true;
  155. }
  156. }
  157. }
  158. }
  159. }
  160. }

扩容的逻辑比较复杂,如果扩容时有多个线程,那么每个线程都可以通过helpTransfer函数帮忙进行扩容:

  1. 首先新建一个两倍长度的数组nextTable。
  2. 初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位节点后当做占位节点,表示该槽位已经处理过了。
  3. 通过for循环处理每个槽位中的链表元素,处理完后在这个槽位内通过CAS插入初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了。
  4. 如果某个槽位已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个槽位内的节点。

4.计数方法

  1. /**
  2. * {@inheritDoc}
  3. */
  4. public int size() {
  5. long n = sumCount();
  6. return ((n < 0L) ? 0 :
  7. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
  8. (int)n);
  9. }
  10. /**
  11. * A padded cell for distributing counts. Adapted from LongAdder
  12. * and Striped64. See their internal docs for explanation.
  13. */
  14. @sun.misc.Contended static final class CounterCell {
  15. volatile long value;
  16. CounterCell(long x) {
  17. value = x; }
  18. }
  19. final long sumCount() {
  20. CounterCell[] as = counterCells; CounterCell a;
  21. long sum = baseCount;
  22. if (as != null) {
  23. for (int i = 0; i < as.length; ++i) {
  24. if ((a = as[i]) != null)
  25. sum += a.value;
  26. }
  27. }
  28. return sum;
  29. }

代码里的变量baseCount用于在无竞争环境下记录元素的个数,每当插入元素或删除元素时都会利用CAS更新键值对个数。
当有线程竞争时,会使用CounterCell数组来计数,每个ConuterCell都是一个独立的计数单元。线程可以通过ThreadLocalRandom.getProbe() & m找到属于它的CounterCell进行计数。这种方法能够降低线程的竞争,相比所有线程对一个共享变量不停进行CAS操作性能上要好很多。这里的CounterCell数组初始容量为2,最大容量是机器的CPU数。
注意这里有个@sun.misc.Contended,这个注解用于解决伪共享问题。所谓伪共享,就是在同一缓存行cache line(CPU缓存的基本单位)中连续存储了多个变量,当其中一个变量被修改时,会导致其他变量也失效,会降低计算机cache的缓存命中率并且导致内存总线流量大增。

发表评论

表情:
评论列表 (有 0 条评论,12人围观)

还没有评论,来说两句吧...

相关阅读