弱小和无知并不是生存的障碍,傲慢才是。
---- ---- 面试者
总结
ConcurrentHashMap就是一个线程安全的HashMap,基本数据结构和HashMap一样,都是数组+链表/红黑树结构,但是在数据读写时,运用了大量的CAS操作以及synchronized锁,锁的级别是桶,这里与JDK1.7有了进一步优化,在JDK1.7时是基于分段锁,锁的是Segment,一个segment里包括好几个桶,在JDK1.8之后,就直接细化到桶级别了。
可以先了解HashMap:HashMap原理 下一篇:十年之重修synchronized原理
基本属性
// 最大罩杯
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认罩杯
private static final int DEFAULT_CAPACITY = 16;
// 最大的数组长度,这里-8预留长度,是针对不同的
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 表数组长度未达到64之前,如果链表长度达到了8,先扩容,不进行红黑树转换
static final int MIN_TREEIFY_CAPACITY = 64;
// 表示参与扩容的线程,最小处理的桶数量
private static final int MIN_TRANSFER_STRIDE = 16;
// 存储桶的数组,注意 volatile修饰
transient volatile Node[] table;
// -1:表示初始化中,其他负数表示正在迁移中,正数:表示扩容的临界值
private transient volatile int sizeCtl;
// 节点信息类
static class Node implements Map.Entry {
// key的hash值
final int hash;
// key
final K key;
// val值,注意这里使用了volatile,保证线程之间透明
volatile V val;
// 下一个节点
volatile Node next;
}
// 空的构造函数
public ConcurrentHashMap() {
}
put一个元素
为了保证线程安全,put元素时直接开启一个死循环,如果Map为空则先进行初始化默认16容量,如果不为空则则根据(table.length - 1) & hash值进行选桶,如果当前桶是空的直接复制,这里的判定tabAt方法基于volatile逻辑保证拿到最新数据,这里赋值基于cas逻辑,这样 一开始的死循环 + cas 组成乐观锁逻辑,如果桶不为空,且当前桶头结点的hash是-1,则表示当前桶在扩容迁移,于是当前线程则参与加入扩容操作,如果非扩容流程,则对桶的头结点进行synchronized加锁,如果是链表则进行判定存在则覆盖,不存在则进行尾插,如果是树节点则进行红黑树节点操作,最后如果链表长度超过阈值则转成红黑树。最后检测是否需要扩容,如果需要则扩大到2倍,进行数据复制扩容处理。
这里边细节有很多吊炸天的写法,可以详细研究。
// put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
// 包含onlyIfAbsent的put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算hash,跟HashMap的还不太一样
int hash = spread(key.hashCode());
int binCount = 0;
// 开启死循环,保证数据最后更新成功
for (Node[] tab = table;;) {
Node f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
// 如果是空map,直接进行初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 不为空,则进行 (n - 1) & hash 选择桶的位置,通过tabAt(tab, index)获取值
// tabAt(tab, index) 底层调用getReferenceVolatile获取桶的值,volatile逻辑保证拿到最新的
// 如果是空的,直接基于casTabAt进行cas赋值
if (casTabAt(tab, i, null, new Node(hash, key, value)))
// 成功,则直接结束
break;
}
// 当发生扩容时,如果当前桶正在迁移,则会把hash值改成 -1
else if ((fh = f.hash) == MOVED)
// 如果是在扩容,则加入帮助扩容,略吊还没细看
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
// 进入正常的put流程
V oldVal = null;
// 对头结点进行加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// key的hash大于等于0
binCount = 1;
for (Node e = f;; ++binCount) {
// 进行链表遍历
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 如果存在,则覆盖
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
// 如果遍历到最后,直接尾插
pred.next = new Node(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
// 如果是树结构,按照树结构处理
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果链表超过阈值长度,则转成红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 里边包含扩容逻辑
addCount(1L, binCount);
return null;
}
// 初始化方法,保证线程安全
private final Node[] initTable() {
Node[] tab; int sc;
// 直接死循环,多个线程可能都在初始化
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0 mapcpu thread.yield else if u.compareandsetintthis sizectl sc -1 compareandsetint casintsizectl-1 thread.yield try if tab='table)' tab.length='= 0)' 16 int n='(sc'> 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
// 由于table是volatile,非空之后,其他线程就跳出循环了
table = tab = nt;
// sc为n的0.75倍
sc = n - (n >>> 2);
}
} finally {
// sizeCtl 为容量的0.75倍
sizeCtl = sc;
}
break;
}
}
return tab;
}
// 协助扩容逻辑
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
// 先判定当前桶,确实处于扩容迁移状态
// 获取
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0 if sc>>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
// 这里的判断逻辑比较多,还没看明白,就是超过迁移的限制条件,直接退出协助迁移了
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 修改sizeCTL + 1,表示参与迁移的线程+1
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
扩容
ConcurrentHashMap的扩容是一个支持多线程并发处理的数据迁移操作,方法无状态,任何线程进入该方法,会根据CPU个数拆分迁移桶的任务,一个线程至少处理16个桶,如果要迁移的nextTable是空的,则先进行初始化默认扩容为原有大小的两倍,然后开始进入死循环,每一个线程都在此遍历获取16个桶进行迁移处理,每迁移完一个桶,基于cas操作更新到新的map中,旧的map这个桶的根节点hash会设置成-1,表示已经迁移完了。
在迁移的过程中,由于基于hash & oldTable.length 选择桶,故链表中的节点要么保留在当前桶index位置,要么迁移到index + oldTable.length的位置,同时会先遍历一遍链表,找到lastRun节点(lastRun节点和之后的连续的节点都在一个桶里),这样再遍历迁移进行高低位拆迁,当碰到lastRun节点时,把这个节点迁移,后边的节点就全部迁移了,由于存在lastRun逻辑,故ConcurrentHashMap是头插,这里与HashMap迁移尾插不一样;如果是红黑树,得遍历每一个节点进行高位桶选择,然后进行迁移,同样红黑树迁移之后,如果节点个数小于6了,需要转换成链表。
// 迁移方法 tab:原来的数组,nextTab:迁移的目标数组
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 这里基于cpu个数运算,就是每一个线程如果要参与迁移,最少处理16个桶
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
// 如果目标数组为空,则进行初始化
try {
@SuppressWarnings("unchecked")
// new一个两倍大的数字
Node[] nt = (Node[])new Node,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 这里transferIndex表示接下来需要迁移的桶的index,从大到小
transferIndex = n;
}
int nextn = nextTab.length;
// 这里构建一个ForwardingNode 继承于Node ,会把hash设置成-1,表示当前桶在迁移
ForwardingNode fwd = new ForwardingNode(nextTab);
// 是否前进继续获取迁移任务
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
// 死循环直到迁移完成,
// i表示需要迁移的桶index,bound表示一次迁移中最小桶的index,bound = i - stride
Node f; int fh;
while (advance) {
// true死循环,直到获取到迁移任务,或是迁移任务已经完成,退出
int nextIndex, nextBound;
if (--i >= bound || finishing)
// 迁移结束了,或者迁移桶的index达到了当前任务的最小index
advance = false;
else if ((nextIndex = transferIndex) <= 0 transferindexindex0 i='-1;' advance='false;' else if u.compareandswapint this transferindex nextindex nextbound='(nextIndex'> stride ?
nextIndex - stride : 0))) {
// 这里基于cas,去尝试获取迁移任务,一次16个桶,然后设置idnex和最小index
bound = nextBound;
i = nextIndex - 1;
// 获取到迁移任务,跳出循环
advance = false;
}
}
if (i < 0 i>= n || i + n >= nextn) {
// 获取迁移任务,进行index边界判定
int sc;
if (finishing) {
// 任务结束,
nextTable = null;
// 将迁移的nextTab赋值给当前map的table
table = nextTab;
// 下次迁移的容量,n*2 - n/2 即0.75倍的新容量大小
sizeCtl = (n << 1 - n>>> 1);
return;
}
// 基于cas将sizeCtl修改-1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 判定是否是最后一个扩容线程,因为第一个扩容线程库容时,会把 SIZECTL = resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
// 如果不是最后的线程,并且迁移已经结束了,直接退出
return;
finishing = advance = true;
// 最后一个线程检测所有节点,避免有遗漏的
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// 如果当前桶是null的,理论上不应该是null的,要么直接结束,要么继续获取迁移任务
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 已经在迁移中了,继续获取迁移任务
advance = true; // already processed
else {
// 对桶的头结点加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node ln, hn;
if (fh >= 0) {
// 头结点hash大于0,非迁移状态
// 这里通过fh&n 计算当前节点迁移,是需要桶位不变,还是迁移到高位index + n
int runBit = fh & n;
Node lastRun = f;
for (Node p = f.next; p != null; p = p.next) {
// 这里遍历一遍,找到lastRun,不关心是高位还是地位,
// 只关注从lastRun开始,之后元素的高低位跟lastRun是一样的
// 这样就只需要把lastRun迁移,那么后边的节点就全部迁移了
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
// 如果是地位
ln = lastRun;
hn = null;
}
else {
// 如果是高位
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
// 重新开始遍历迁移,到lastRun停止,因为链表结构,lastRun迁移后边的就同步迁移了
int ph = p.hash; K pk = p.key; V pv = p.val;
// 这里采用的是头插法,因为lastRun机制
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
// 基于cas更新桶
setTabAt(nextTab, i, ln);
// 高位的桶index = i + n ,n就是之前桶的长度
setTabAt(nextTab, i + n, hn);
// 这里把旧的桶设置为fwd,其中hash = -1 ,表示已经迁移完了
setTabAt(tab, i, fwd);
// 设置为true,继续获取迁移任务
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
// 这里同样将红黑树拆成高位和地位两棵树
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
// 低位树计数
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
// 高位树计数
++hc;
}
}
// 当数的树节点低于6时,将数转成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
// 更新节点值
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}