领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

十年之重修ConcurrentHashMap原理

nixiaole 2025-04-08 17:10:25 知识剖析 16 ℃

弱小和无知并不是生存的障碍,傲慢才是。

---- ---- 面试者

总结

ConcurrentHashMap就是一个线程安全的HashMap,基本数据结构和HashMap一样,都是数组+链表/红黑树结构,但是在数据读写时,运用了大量的CAS操作以及synchronized锁,锁的级别是桶,这里与JDK1.7有了进一步优化,在JDK1.7时是基于分段锁,锁的是Segment,一个segment里包括好几个桶,在JDK1.8之后,就直接细化到桶级别了。

可以先了解HashMap:HashMap原理 下一篇:十年之重修synchronized原理

基本属性

// 最大罩杯
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认罩杯
private static final int DEFAULT_CAPACITY = 16;
// 最大的数组长度,这里-8预留长度,是针对不同的
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 表数组长度未达到64之前,如果链表长度达到了8,先扩容,不进行红黑树转换
static final int MIN_TREEIFY_CAPACITY = 64;
// 表示参与扩容的线程,最小处理的桶数量
private static final int MIN_TRANSFER_STRIDE = 16;
// 存储桶的数组,注意 volatile修饰
transient volatile Node[] table;
// -1:表示初始化中,其他负数表示正在迁移中,正数:表示扩容的临界值
private transient volatile int sizeCtl;
// 节点信息类
static class Node implements Map.Entry {
        // key的hash值
        final int hash;
        // key
        final K key;
        // val值,注意这里使用了volatile,保证线程之间透明
        volatile V val;
        // 下一个节点
        volatile Node next;
}
// 空的构造函数
public ConcurrentHashMap() {
}

put一个元素

为了保证线程安全,put元素时直接开启一个死循环,如果Map为空则先进行初始化默认16容量,如果不为空则则根据(table.length - 1) & hash值进行选桶,如果当前桶是空的直接复制,这里的判定tabAt方法基于volatile逻辑保证拿到最新数据,这里赋值基于cas逻辑,这样 一开始的死循环 + cas 组成乐观锁逻辑,如果桶不为空,且当前桶头结点的hash是-1,则表示当前桶在扩容迁移,于是当前线程则参与加入扩容操作,如果非扩容流程,则对桶的头结点进行synchronized加锁,如果是链表则进行判定存在则覆盖,不存在则进行尾插,如果是树节点则进行红黑树节点操作,最后如果链表长度超过阈值则转成红黑树。最后检测是否需要扩容,如果需要则扩大到2倍,进行数据复制扩容处理。

这里边细节有很多吊炸天的写法,可以详细研究。

		// put方法
		public V put(K key, V value) {
        return putVal(key, value, false);
    }
		// 包含onlyIfAbsent的put方法
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
      	// 计算hash,跟HashMap的还不太一样
        int hash = spread(key.hashCode());
        int binCount = 0;
      	// 开启死循环,保证数据最后更新成功
        for (Node[] tab = table;;) {
            Node f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
              	// 如果是空map,直接进行初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
              	// 不为空,则进行 (n - 1) & hash 选择桶的位置,通过tabAt(tab, index)获取值
               // tabAt(tab, index) 底层调用getReferenceVolatile获取桶的值,volatile逻辑保证拿到最新的
              // 如果是空的,直接基于casTabAt进行cas赋值
                if (casTabAt(tab, i, null, new Node(hash, key, value)))
                  	// 成功,则直接结束
                    break;                   
            }
          	// 当发生扩容时,如果当前桶正在迁移,则会把hash值改成 -1
            else if ((fh = f.hash) == MOVED)
              	// 如果是在扩容,则加入帮助扩容,略吊还没细看
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                return fv;
            else {
              	// 进入正常的put流程
                V oldVal = null;
                // 对头结点进行加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                          	// key的hash大于等于0
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                              	// 进行链表遍历
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                  	// 如果存在,则覆盖
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                if ((e = e.next) == null) {
                                  	// 如果遍历到最后,直接尾插
                                    pred.next = new Node(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                          	// 如果是树结构,按照树结构处理
                            Node p;
                            binCount = 2;
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                      	// 如果链表超过阈值长度,则转成红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
				// 里边包含扩容逻辑
        addCount(1L, binCount);
        return null;
    }
    
    // 初始化方法,保证线程安全
    private final Node[] initTable() {
        Node[] tab; int sc;
      	// 直接死循环,多个线程可能都在初始化
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0 mapcpu thread.yield else if u.compareandsetintthis sizectl sc -1 compareandsetint casintsizectl-1 thread.yield try if tab='table)' tab.length='= 0)' 16 int n='(sc'> 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node[] nt = (Node[])new Node[n];
                      	// 由于table是volatile,非空之后,其他线程就跳出循环了
                        table = tab = nt;
                      	// sc为n的0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                  // sizeCtl 为容量的0.75倍
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
		// 协助扩容逻辑
    final Node[] helpTransfer(Node[] tab, Node f) {
        Node[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode)f).nextTable) != null) {
            // 先判定当前桶,确实处于扩容迁移状态
            // 获取
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0 if sc>>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                  	// 这里的判断逻辑比较多,还没看明白,就是超过迁移的限制条件,直接退出协助迁移了
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                  	// 修改sizeCTL + 1,表示参与迁移的线程+1
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

扩容

ConcurrentHashMap的扩容是一个支持多线程并发处理的数据迁移操作,方法无状态,任何线程进入该方法,会根据CPU个数拆分迁移桶的任务,一个线程至少处理16个桶,如果要迁移的nextTable是空的,则先进行初始化默认扩容为原有大小的两倍,然后开始进入死循环,每一个线程都在此遍历获取16个桶进行迁移处理,每迁移完一个桶,基于cas操作更新到新的map中,旧的map这个桶的根节点hash会设置成-1,表示已经迁移完了。

在迁移的过程中,由于基于hash & oldTable.length 选择桶,故链表中的节点要么保留在当前桶index位置,要么迁移到index + oldTable.length的位置,同时会先遍历一遍链表,找到lastRun节点(lastRun节点和之后的连续的节点都在一个桶里),这样再遍历迁移进行高低位拆迁,当碰到lastRun节点时,把这个节点迁移,后边的节点就全部迁移了,由于存在lastRun逻辑,故ConcurrentHashMap是头插,这里与HashMap迁移尾插不一样;如果是红黑树,得遍历每一个节点进行高位桶选择,然后进行迁移,同样红黑树迁移之后,如果节点个数小于6了,需要转换成链表。

		// 迁移方法 tab:原来的数组,nextTab:迁移的目标数组
		private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
          	// 这里基于cpu个数运算,就是每一个线程如果要参与迁移,最少处理16个桶
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
          	// 如果目标数组为空,则进行初始化
            try {
                @SuppressWarnings("unchecked")
              	// new一个两倍大的数字
                Node[] nt = (Node[])new Node[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
          	// 这里transferIndex表示接下来需要迁移的桶的index,从大到小
            transferIndex = n;
        }
        int nextn = nextTab.length;
      	// 这里构建一个ForwardingNode 继承于Node ,会把hash设置成-1,表示当前桶在迁移
        ForwardingNode fwd = new ForwardingNode(nextTab);
      	// 是否前进继续获取迁移任务
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            // 死循环直到迁移完成,
          	// i表示需要迁移的桶index,bound表示一次迁移中最小桶的index,bound = i - stride
          	Node f; int fh;
          	
            while (advance) {
              	// true死循环,直到获取到迁移任务,或是迁移任务已经完成,退出
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                  	// 迁移结束了,或者迁移桶的index达到了当前任务的最小index
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0 transferindexindex0 i='-1;' advance='false;' else if u.compareandswapint this transferindex nextindex nextbound='(nextIndex'> stride ?
                                       nextIndex - stride : 0))) {
                  	// 这里基于cas,去尝试获取迁移任务,一次16个桶,然后设置idnex和最小index
                    bound = nextBound;
                    i = nextIndex - 1;
                  	// 获取到迁移任务,跳出循环
                    advance = false;
                }
            }
            if (i < 0 i>= n || i + n >= nextn) {
              	// 获取迁移任务,进行index边界判定
                int sc;
                if (finishing) {
                  	// 任务结束,
                    nextTable = null;
                  	// 将迁移的nextTab赋值给当前map的table
                    table = nextTab;
                  	// 下次迁移的容量,n*2 - n/2 即0.75倍的新容量大小
                    sizeCtl = (n << 1 - n>>> 1);
                    return;
                }
              	// 基于cas将sizeCtl修改-1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                      	// 判定是否是最后一个扩容线程,因为第一个扩容线程库容时,会把 SIZECTL =  resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
                      	// 如果不是最后的线程,并且迁移已经结束了,直接退出
                        return;
                    finishing = advance = true;
                  	// 最后一个线程检测所有节点,避免有遗漏的
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
              	// 如果当前桶是null的,理论上不应该是null的,要么直接结束,要么继续获取迁移任务
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
              	// 已经在迁移中了,继续获取迁移任务
                advance = true; // already processed
            else {
              	// 对桶的头结点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node ln, hn;
                        if (fh >= 0) {
                          	// 头结点hash大于0,非迁移状态
                          	// 这里通过fh&n 计算当前节点迁移,是需要桶位不变,还是迁移到高位index + n
                            int runBit = fh & n;
                            Node lastRun = f;
                            for (Node p = f.next; p != null; p = p.next) {
                              	// 这里遍历一遍,找到lastRun,不关心是高位还是地位,
                              	// 只关注从lastRun开始,之后元素的高低位跟lastRun是一样的
                              	// 这样就只需要把lastRun迁移,那么后边的节点就全部迁移了
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                              	// 如果是地位
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                              	// 如果是高位
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node p = f; p != lastRun; p = p.next) {
                              	// 重新开始遍历迁移,到lastRun停止,因为链表结构,lastRun迁移后边的就同步迁移了
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                              	// 这里采用的是头插法,因为lastRun机制
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                          	// 基于cas更新桶
                            setTabAt(nextTab, i, ln);
                          	// 高位的桶index = i + n ,n就是之前桶的长度
                            setTabAt(nextTab, i + n, hn);
                          	// 这里把旧的桶设置为fwd,其中hash = -1 ,表示已经迁移完了
                            setTabAt(tab, i, fwd);
                          	// 设置为true,继续获取迁移任务
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin t = (TreeBin)f;
                            TreeNode lo = null, loTail = null;
                            TreeNode hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node e = t.first; e != null; e = e.next) {
                              	// 这里同样将红黑树拆成高位和地位两棵树
                                int h = e.hash;
                                TreeNode p = new TreeNode
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                  	// 低位树计数
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                  	// 高位树计数
                                    ++hc;
                                }
                            }
                          	// 当数的树节点低于6时,将数转成链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin(hi) : t;
                          	// 更新节点值
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

不积跬步,无以至千里

Tags:

最近发表
标签列表