Java集合 - ConcurrentHashMap

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

介绍 ConcurrentHashMap

技术是为了解决问题而生的ConcurrentHashMap 解决了多个线程同时操作一个 HashMap 时可能出现的内部问题。当多个线程同时操作一个 HashMap 时有可能会出现多线程同时修改一个共享变量HashMap 类的成员变量导致数据被覆盖产生意想不到的错误。


ConcurrentHashMap 内部使用了锁和各种数据结构来保证访问 Map 是线程安全的。

ConcurrentHashMap 和 HashMap 底层的数组、链表结构几乎相同底层对数据结构的操作思路是相同的。ConcurrentHashMap 除了数组 + 链表 + 红黑树的基本结构外新增了转移节点结构ForwardingNode。


介绍转移节点ForwardingNode

转移节点是 ForwardingNode 结构 ForwardingNode 继承了 Node。ForwardingNode 节点的 hash 值固定为 -1。ForwardingNode 比 Node 多了一个 nextTable 成员变量nextTable 成员变量的类型是 Node 数组。nextTable 成员变量是扩容之后的新数组。

如果数组在索引 i 上的结构是 ForwardingNode那么表示这个哈希桶内的全部节点都已经转移到扩容之后的新数组旧的哈希桶内的数据不能发生改变。转移节点ForwardingNode是为了保证 ConcurrentHashMap 扩容时的线程安全。保证了当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前旧的哈希桶内的数据不发生变化。

当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前如果有其他的线程执行 put 操作需要将新增的节点 put 到旧的哈希桶内那么这个线程会调用 helpTransfer() 方法帮助扩容。扩容完成之后这个线程再将要新增的节点 put 到新的哈希桶内。

ConcurrentHashMap 的新增操作

ConcurrentHashMap 在 put 方法上对数据结构的操作思路和 HashMap 相同但 ConcurrentHashMap 的 put() 方法写了很多保障线程安全的代码。当调用 ConcurrentHashMap 的 put() 方法时put() 方法的处理逻辑如下

  • 首先如果 CHM 的成员变量 table 数组为空null 或者 length 为 0则调用 initTable() 方法初始化 table 数组。由于 initTable() 方法操作了共享变量因此 initTable() 方法采用了一些手段来保证线程安全。
  • 接下来它会调用 spread() 方法根据 key 计算出 hash 值然后根据计算出的 hash 值计算出 key 对应的数组索引 i
  • 计算出 key 对应的数组索引 i 之后它调用 tabAt() 方法tabAt() 方法返回数组在索引 i 上的值。然后它根据数组在索引 i 上的值进行处理。由于 tabAt() 方法读取了共享变量 table 数组在索引 i 上的值因此 tabAt() 方法调用 Unsafe 类的 get 方法保证数据的可见性
    • 如果数组在索引 i 上的值为 null则直接生成一个新的节点并调用 casTabAt() 方法让 tab[i] 指向该节点。由于 casTabAt() 方法操作了共享变量 tab 数组因此 casTabAt() 方法调用 Unsafe 类的 compareAndSwap 方法保证数据不被覆盖
    • 如果数组在索引 i 上的值不为 null则意味着需要解决 hash 冲突问题、扩容冲突问题。
  • 接上一步骤如果数组在索引 i 上的值不为 null。
    • 如果索引 i 上的结构是转移节点ForwardingNode 结构节点的 hash 值为 -1表明这个哈希桶内的全部节点都已经转移到扩容之后的新数组旧的哈希桶内的数据不能发生改变它就会调用 helpTransfer() 方法helpTransfer() 方法会帮助扩容。扩容完成之后它再将要新增的节点 put 到扩容之后的新数组中。
    • 如果索引 i 上的结构不是转移节点那么它会使用 synchronized 关键字给索引 i 上的结构加锁保证同时最多只有一个线程操作索引 i 上的结构。给索引 i 上的结构加锁之后它会判断数组在索引 i 上的结构是链表 还是 红黑树然后调用相应的新增代码。
      • 如果索引 i 上的结构是链表则把新生成的节点加到链表的末尾
      • 如果索引 i 上的结构是红黑树那么使用红黑树方式新增。
  • 接上一步骤如果索引 i 上的结构是普通链表则把新生成的节点加到链表的末尾之后需要判断是否需要将链表转为红黑树
    • 如果链表的长度大于等于 8并且数组的长度大于等于 64则调用 treeifyBin() 将链表转为红黑树
    • 如果链表的长度大于等于 8但是数组的长度小于 64则调用 tryPresize() 方法执行扩容操作
    • 当红黑树中的节点个数小于等于 6 时红黑树会转为链表。
  • 将节点加入 CHM 集合之后put() 方法的最后一步是调用 addCount() 方法增加 ConcurrentHashMap 中元素个数的计数值。addCount() 方法的任务是增加 ConcurrentHashMap 中元素的计数值。如果元素的数量超过了 ConcurrentHashMap 扩容的阈值sizeCtl那么就会调用 transfer() 方法执行扩容操作。如果此时有其他的线程已经在执行扩容操作那么当前线程就协助扩容。

当调用 CHM 的 put() 方法时如果 CHM 中已经存在要新增的 key并且方法的入参 onlyIfAbsent 为 false则替换旧值并返回旧值。

ConcurrentHashMap 的扩容机制

ConcurrentHashMap 的扩容时机和 HashMap 相同都是在 put() 方法的最后一步检查是否需要扩容。ConcurrentHashMap 扩容的方法叫做 transfer()从 put() 方法的 addCount() 方法进去就能找到 transfer() 方法。

如果 ConcurrentHashMap 中元素的数量超过了扩容的阈值sizeCtl那么它会调用 transfer() 方法执行扩容操作。ConcurrentHashMap 的扩容机制是扩容为原来容量的 2 倍。ConcurrentHashMap 扩容的处理逻辑和 HashMap 完全不同。


ConcurrentHashMap 扩容的大体思路如下扩容需要把旧数组上的全部节点转移到扩容之后的新数组上节点的转移是从数组的最后一个索引位置开始一个索引一个索引进行的。每个线程一轮处理有限个数的哈希桶。当旧数组上的全部节点转移到扩容之后的新数组后ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组扩容操作完成。transfer() 方法的处理逻辑如下

  • 首先根据 CPU 核数和 table 数组的长度计算当前线程一轮处理哈希桶的个数。ConcurrentHashMap 的 transferIndex 成员变量会记录下一轮 或者是 下一个线程要处理的哈希桶的索引值 + 1

    • 如果 CPU 核数为 1那么当前线程一轮处理哈希桶的个数为 table 数组的长度
    • 如果 CPU 核数大于 1那么先计算 num = (tab.length >>> 3) / NCPU的值
      • 如果 num 值大于等于 16那么 num 值就是当前线程一轮处理哈希桶的个数
      • 如果 num 值小于 16那么当前线程一轮处理哈希桶的个数为 16。也就是说线程一轮处理哈希桶的个数最小值为 16。
  • 领取完任务之后线程就开始处理哈希桶内的节点。节点的转移是从数组的最后一个索引位置开始一个索引一个索引进行的。在转移索引 i 上的节点之前它会使用 synchronized 关键字给索引 i 上的结构加锁保证同时最多只有一个线程操作索引 i 上的结构。

  • 给索引 i 上的结构加锁之后它会判断数组在索引 i 上的结构是链表 还是 红黑树然后调用相应的节点转移代码。

    • 如果索引 i 上的结构是链表它通过将节点 key 的 hash 值 和 数组的长度 n 做与运算获得 n 对应的二进制表示中的 1 这一位在 hash 值中是 0 还是 1即 b = p.hash & n。获取到 b 之后用 b 来判断要转移的节点是要挂到低位哈希桶里还是挂到高位哈希桶里。遍历完链表形成两个链表低位链表、高位链表之后将链表的头节点赋值给对应的 tab[i]

      • 如果 b 的值为 0则要转移的节点挂到位哈希桶里
      • 如果 b 的值非 0则要转移的节点挂到位哈希桶里
    • 如果索引 i 上的结构是红黑树那么使用红黑树方式转移节点。

  • 在将索引 i 上的全部节点转移到扩容之后的新数组后它让旧数组 tab[i] 指向转移节点ForwardingNode。

  • 当旧数组上的全部节点转移到扩容之后的新数组后ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组扩容操作完成。

介绍低位哈希桶、高位哈希桶如果 ConcurrentHashMap 当前的数组长度为 n 时一个节点的 key 对应的哈希桶索引为 i。那么 ConcurrentHashMap 扩容之后数组长度为 2n 时这个节点的 key 对应的低位哈希桶的索引为 i对应的高位哈希桶的索引为 i + n。


ConcurrentHashMap 支持多线程扩容

  • 如果在扩容的过程中有其他的线程执行新增操作新增操作完成后这个线程会调用 transfer() 方法协助扩容。
  • 如果一个线程在扩容时有其他的线程执行新增操作需要把节点 put 到索引为 i 的哈希桶内。其他的线程它发现索引 i 上的结构是转移节点ForwardingNode 结构 节点的 hash 值为 -1表明这个哈希桶内的元素已经扩容迁移完成那么这个线程它就会调用 helpTransfer() 方法helpTransfer() 方法会调用 transfer() 帮助扩容。扩容完成之后它再将要新增的节点 put 到扩容之后的新数组中。

ConcurrentHashMap 的查找操作

当调用 ConcurrentHashMap 的 get() 方法时get() 方法的处理逻辑如下

  • 首先它会根据传入的 key 计算出 hash 值然后根据计算出的 hash 值计算出 key 对应的数组索引 i
  • 计算出 key 对应的数组索引 i 之后根据存储位置从数组中取出对应的 Entry然后通过 key 对象的 equals() 方法判断传入的 key 和 Entry 中的 key 是否相等
    • 如果传入的 key 和 Entry 中的 key 相等则查找操作完成返回 Entry 中的 value
    • 如果传入的 key 和 Entry 中的 key 不相等则再判断数组在索引 i 上的结构是链表 还是 红黑树然后调用相应的查找数据的方法。直到找到相等的 Entry 或者没有下一个 Entry 为止。

ConcurrentHashMap 的容量大小问题

ConcurrentHashMap 的数组长度总是为 2 的幂次方。不论传入的初始容量是否为 2 的幂次方最终都会转化为 2 的幂次方。

ConcurrentHashMap 中根据 key 计算出 hash 值然后根据计算出的 hash 值计算出 key 对应的数组索引 i

  • 根据 key 计算处 hash 值在计算 hash 值时它先将 key 的 hashCode 值无符号右移 16 位然后再和 key 的 hashCode 值做 异或 运算即 hash = (hashCode >>> 16) ^ hashCode
  • 根据 hash 值计算出 key 对应的数组索引 i在计算 key 对应的数组索引 i 时它将 hash 值 和 数组的长度 - 1 做与运算获得 key 对应的数组索引 i即 i = hash & (n - 1)

ConcurrentHashMap 的数组长度总是为 2 的幂次方设计的非常巧妙

  • 在计算 hash 值时它先将 key 的 hashCode 值无符号右移 16 位然后再和 key 的 hashCode 值做 异或 运算即 hash = (hashCode >>> 16) ^ hashCode。使 key 的 hashCode 值高 16 位的变化映射到低 16 位中使 hashCode 值高 16 位也参与后续索引 i 的计算减少了碰撞的可能性。
  • 在计算 key 对应的数组索引 i 时它将 hash 值 和 数组的长度 - 1 做与运算获得 key 对应的数组索引 i即 i = hash & (n - 1)。由于数组的长度 n 是 2 的幂次方n - 1 可以保证它的二进制表示中的后几位都是 1n 对应的二进制位及之前的位都是 0。因此计算出的数组索引 i 和 hash 值的二进制表示中后几位有关而与前面的二进制位无关
  • 当 b 是 2 的幂次方时a % b == a & (b - 1)。CPU 处理位运算比处理数学运算的速度更快效率更高。
  • 在 ConcurrentHashMap 扩容时它通过将 key 的 hash 值 和 数组的长度 n 做与运算获得 n 对应的二进制表示中的 1 这一位在 hash 值中是 0 还是 1即 b = p.hash & n。获取到 b 之后用 b 来判断要转移的节点是要挂到低位哈希桶里还是挂到高位哈希桶里
    • 如果 b 的值为 0则要转移的节点挂到位哈希桶里
    • 如果 b 的值非 0则要转移的节点挂到位哈希桶里

ConcurrentHashMap 的计数

当调用 ConcurrentHashMap 的 put() 方法时put() 方法的最后一步是调用 addCount() 方法。

addCount() 方法的任务是增加 ConcurrentHashMap 中元素的计数值。如果元素的数量超过了 ConcurrentHashMap 扩容的阈值sizeCtl那么就会调用 transfer() 方法执行扩容操作。如果此时有其他的线程已经在执行扩容操作那么当前线程就协助扩容。


ConcurrentHashMap 采用了一些数据结构和手段来支持高效的并发计数。ConcurrentHashMap 使用 long 类型的 baseCount 成员变量和 CounterCell 类型的 counterCells 数组来支持高效的并发计数。

  • baseCount 是基础的计数值。主要通过调用 Unsafe 类的 compareAndSwap 方法更新 baseCount 的值
  • counterCells 数组的使用如果有多个线程调用 addCount() 方法增加元素的计数值那么每个线程将要增加的计数值保存在 counterCells 数组中。当调用 ConcurrentHashMap 的 size() 方法获取元素个数时size() 方法将循环遍历 counterCells 数组累加计数值得到当时元素。

ConcurrentHashMap 的计数将线程竞争分散到 counterCells 数组的每一个元素提高了并发计数的性能。

private transient volatile long baseCount;

// 如果 counterCells 数组不为空则数组的长度为 2 的幂次方。
private transient volatile CounterCell[] counterCells;

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java