目录

ThasBlog

学无止境

X

Java8 ConcurrentHashMap

ConcurrentHashMap是HashMap的线程安全版本, 使用了CAS, volatile 和 synchronize技术.

  1. 初始化时使用volatile修饰的sizeCtl作为互斥条件, 如果有其他线程正在操作则Thread.yeild().
  2. 在插入元素时, 如果元素不存在, 则使用CAS, 在插入新元素时, 如果元素不存在才能插入成功, 否则失败重试, 如果是 MOVED 状态, 则先辅助扩容, 完成后再尝试插入.
  3. 如果元素存在, 需要进行修改或者Hash冲突, 则加synchronize锁进行处理(因为情况比较复杂, 可能是链表, 可能还需要树化), synchronize修饰bin上的第一个元素
  4. 元素数量修改时, 并发高的情况容易 CAS 失败, 引入 counterCells 数组减小 CAS 失败的概率
  5. 数组扩容时, 分段迁移, 每段最少16, 迁移前 CAS 设置 transferIndex, 设置成功则得到该小段的迁移的权利. 迁移时, 如果该 bin 上有值, 则 synchronize 锁住, 如果无值, 则 CAS 填充 MOVED; 扩容后的另一段不用锁住, 因为跟扩容前的位置是一对, 锁住一个即可.

关键源码

sizeCtl为0 代表数组没有初始化
sizeCtl 为-1 代表数组正在初始化
sizeCtl为正数 数组已经初始化,记录的是数组的扩容阈值,如果没有初始化,记录的是数组初始容量
sizeCtl 为负数,并且不是-1,表示数组正在扩容

构造方法

不会初始化数组, 只有第一次put值得时候才会

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
 // jdk1.8初始化大小是(设定值+ 设定值/2 +1) 的最大最近的2的幂次方   
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));  
//tableSizeFor:输入低于最大容量的数c,返回大于等于且最接近c的2的幂次数。
        this.sizeCtl = cap;
    }

初始化数组

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0) //sizeCtl小于0,代表 数组正在初始化或者正在扩容
                Thread.yield(); // 会把CPU时间让掉,让其他或者自己的线程执行(也就是谁先抢到谁执行)
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {   
                try {
                    if ((tab = table) == null || tab.length == 0) {  //双重判断是为了不重复创建 因为一个线程创建完后,会更改sizeCtl值, 变成扩容阈值
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);    //可以换算为 n*0.75   n >>> 2= 四分之一  ,此时sizeCtl=扩容阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

设置值

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());  //算出hash值   这个值一定是个正数      0x7fffffff; //f的二进制为:1111,7的二进制位0111  换算完之后最高位为0,所以最终值肯定为正数
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)  //第一个线程进来的第一次添加必然为true
                tab = initTable();   //创建数组
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {   //拿出次key hash位置的value,如果为空则去添加
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)   //如果此判断成立  说明此数组正在扩容,则去协助扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {     //锁住此位置
                    if (tabAt(tab, i) == f) {  //再次判断是因为  此位置的value有可能变为红黑树,锁就没用了
                        if (fh >= 0) {     //如果成立说明是链表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {    //如果key和value都一样
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {            //如果没找到一样的,则加在末尾
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  //红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)   //如果链表的长度>=8  则变为黑红树
                        treeifyBin(tab, i);    //但是如果数组的长度小于64,那么则不变为红黑树,则扩容
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);  //维护集合长度,判断是否达到扩容阈值,如果是,则扩容
        return null;
    }

计数

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {   //加成功不会进入  加失败则进入
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();   //算出当前map中有多少元素,判断是否达到扩容阈值,如果是,则扩容
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);    //创建一个标识值
            if (sc < 0) {            //第一个线程进来不可能小于0
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);   //协助扩容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,    //将SIZECTL 改为一个负数
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);   //发起扩容(只有可能是第一个线程执行)
            s = sumCount();
        }
    }
}
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {     //以此线程生成一个随机数,作为数组下标
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {    //第一次进来肯定为false(一旦有线程进入,则为true,那么数组已经创建)
            if ((a = as[(n - 1) & h]) == null) {       //随机取值,如果为空,则赋值
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))   //此数组已经有值,那么直接累加
                break;
            else if (counterCells != as || n >= NCPU)   //避免数组扩容,当目前的数组长度>=CPU核数时
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {     //数组扩容
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {      //创建数组,并且赋值(第一次进来的线程)
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))   //数组正在创建  对baseCount++
            break;                          // Fall back on using base
    }
}

扩容


标题:Java8 ConcurrentHashMap
作者:thas
地址:https://thas.cc/articles/2020/12/05/1607178489851.html