您当前的位置:首页 > 计算机 > 编程开发 > Java

ConcurrentHashMap 源码解析

时间:12-14来源:作者:点击数:
城东书院 www.cdsy.xyz

之前我们学习了 HashMap 类,知道了它的基本工作原理,JDK7 中 hashmap 是数组+链表的方式存储数据,JDK8 中是数组+链表+红黑树的模式。

当然上篇文章也存在一些错漏,这里也统一改正和补充一下。

1、JDK8 中链表转为红黑树的时机

前篇我写的是链表数据达到 8 个的时候转为红黑树,这里数量有错误,看源码

if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
    treeifyBin(tab, hash);
break;
这里的 TREEIFY_THRESHOLD = 8

也就是说当插入数据的时候,之前已经有7个数据了,那么第8个数据来了,就转为红黑树,也就是红黑树最初有8个数据。

2、为什么选择红黑树,不是其他二叉树

我们知道JDK8是尾插法,当链表数据达到8个时,会转为红黑树,那么为什么不选择其他二叉树呢?假设是平衡二叉树呢?其实也可以选择平衡二叉树,但是因为这个红黑树还要转为链表的,综合查询和删除新增操作后,才选择的红黑树。

好了,之前的结束,正文开始

在不特别注明的情况下,下文JDK7默认指的是JDK7以及之前的版本,JDK8指的是JDK8以及之后的版本。

在正式讲解 ConcurrentHashMap 之前,我们先要了解一些其他的基础知识

  • JMM 基本原理
  • CAS 基本原理
  • ClassLoader 基本知识
  • UNSAFE 基本用法
  • ReentrantLock 一点点

下面我简单的介绍一下这几点知识,大家要深入研究再去自行查看源码。

准备工作

JMM 基本原理

JMM 不是 JVM,JMM 是内存模型,我尽量简单的讲解一下他的原理。

在 jdk1.2 之前,java 的内存模型实现总是从主内存(共享内存)读取变量,不需要特别注意。但是之后的内存模型中,线程可以把变量保存到本地内存中(工作内存),而不是直接在主内存读写,那么就可能出现主内存修改了某个变量的值,另外一个线程还继续使用它在工作内存中此变量的拷贝,那么数据就不一致。

从硬件的角度看,就是原来 java 总是将数据保存在内存中,读写都是直接针对的内存,所以数据可以实时拿到最新的。但是之后的设计改变为,内存中的数据被称为主内存,然后每个线程都有自己的一小块内存区域,称为工作内存。每个线程只在自己的工作内存中进行数据操作,但是线程又要操作主内存中的数据怎么办。那么就有一个工作内存从主内存读写数据的步骤,正是这个步骤导致了数据不一致的问题。

也就是说,线程的工作内存拿到的某个数据不一定是最新的。为了解决这个问题,就用到了关键字Volatile,告诉jvm,此变量直接从内存读取,保证可见性(但是不保证一致性)。

好了,就先了解这么多,我们知道了线程读到的不一定是真的实际数据。

CAS 原理

我们说ConcurrentHashMap是线程安全的(数据一致性),那么显然它必然用到了锁的机制。那么我们第一个想到的应该是 Synchronized 的锁,但是我们也知道这个是一个重量级锁,虽然之后的jdk对它进行了优化,已经变成了轻量级。但是,这个不是我们的最优解,使用了 Synchronized 关键字的 map,可以查看 HashTable。HashTable 是线程安全的,因为他在关键方法上都加了 Synchronized 关键字。

那么我们就要使用一个轻量级的乐观锁,就自然用到了 CAS。

CAS(compare and swap)是一个乐观锁,简单的概括乐观锁的概念:

乐观锁:假设最好的情况,每次拿数据的时候认为别人不会修改,所以不上锁。但是在更新的时候会判断一下其他人是否更改了数据。使用版本号机制和CAS算法实现。适用于多读的应用类型

CAS 算法

compare and swap 比较交换,无锁算法,在不使用锁的情况下实现多线程之间的变量同步,(以 i++ 为例)

  • 读写的内存值V(主内存中 i 的值)
  • 进行比较的值A(工作内存中 i 的值)
  • 拟写入的新值B(工作内存中被修改后的 i 的值)

当且仅当V == A时,CAS通过原子方式用B来更新V的值,否则不执行任何操作。自旋,不断重试。

简单白话解释下就是:假设有2个线程1,和线程2,他们同时修改内存中的数据 i,进行 i++操作(注意i++不是一个原子操作,它可以分解为 tmp = i,tmp = tmp + 1, i = tmp,取值,计算i+1,结果赋值给i)。

  • 程序一开始主内存 i = 0;
  • 线程1,线程2,同时从主线程读取 i = 0;2个线程的工作内存中 i = 0,A = 0;
  • 线程1先执行,i++,执行完了,准备将 i++ 的结果写回到主内存,此时线程1的工作内存中i = 1,也就是B = 1
  • 然后线程1从主内存读取i的值,i = 0,也就是 V = 0,对比下 V == A == 0,那么线程1就可以将 B赋值给i,主内存中的 i = 1;
  • 此时线程2得到了执行,线程2 的 A = 0,然后计算一下i++,B = 1;
  • 线程2 执行完操作,准备将i写入到主内存,从主内存读取一下 i,也就是 V,但是此时主内存中的i==1了,也就是 V = 1,拿回来对比A,发现 V !== A,失败。不赋值。
  • 然后线程2自旋,再次从主内存得到i,V = 1,线程2的A = 1,然后计算i++,B = 2;
  • 然后线程2再去将i写入到主内存,此时 v == a == 1,可以写入,结果 i = B = 2;
  • 整个流程完毕。

缺点:

  • ABA 问题:假设主内存中的i = 0,然后3个线程来操作i,线程1将它改为1,线程2又将它改为了0,此时线程3再次去读的时候,i 还是0,实际上他不知道这个i的值已经经过了很多修改了。
  • 开销问题:因为要自旋,不断的重试,相当于有一个 while(true)

好了CAS了解以上就够了。

ClassLoader 基本知识

有3个 classloader

Bootstrap ClassLoader

系统级的,加载 jre 目录下的 jar,比如 rt.jar,resources.jar,charsets.jar 和一个 classes

System.out.println(System.getProperty("sun.boot.class.path"));

Extention ClassLoader

加载拓展的jar,默认在 jre\lib\ext 目录下的文件

System.out.println(System.getProperty("java.ext.dirs"));

可以修改 dirs 的路径改变加载的 jar

AppClassLoader

当前开发的应用的classpath的所有类

System.out.println(System.getProperty("java.class.path"));

既然是目录,那么就说明我们是可以去修改的,使用一些启动参数,可以修改这些目录。或者加入我们自己写的一些 jar 和 class 文件到目录中,从而进行默认加载。

classloader 有一个默认的前提,每个类加载器都有一个父加载器,appclassloader 的父加载器是 extclassloader,ext 的父加载器是 Bootstrap,测试类:

@Test
public void testStart() {
    // blue就是个简单的class
    System.out.println(Blue.class.getClassLoader().toString());
    System.out.println(Blue.class.getClassLoader().getParent().toString());
}
打印:
sun.misc.Launcher$AppClassLoader@18b4aac2
sun.misc.Launcher$ExtClassLoader@1be6f5c3

可以看到他们的 classloader,但是如果我们再加一级的 getParent时,也就是希望获取 boostrap 时,会报错。因为 BootstrapClassLoader 是 c++ 编写的,它本身是虚拟机的一部分,所以它并不是 java 类,没法在java代码中获取它的引用。jvm 启动的时候通过 boostrapclassloader 类加载器加载rt.jar等核心包中的 class,int.class、String.class 都是在这个时候加载的。

拓展知识

这个就是想到了多说几句

双亲委托

一个类加载器在查找 class 和 resource 的时候,通过委托的模式进行,并不是立刻就从自己的管辖地去查询这个 class。比如程序要加载 Blue.class 文件,那么顺序是:

  • 从 appclassloader 的缓存中找有没有 blue.class 文件
  • 没有找到,交给extclassloader去缓存查询
  • 还没有找到,交给bootstrapclassloader去缓存查询。
  • 还没有找到,就从boostrap的路径去找,
  • 任然没有找到,就交给下一级ext去他的路径找,
  • 还没有,就交给appclassloader的路径去找。
  • 最后都没有就报错。
  • 当上面任何一步找到了,就直接返回。

这就是双亲委托,那么我们说这么多 ClassLoader 有什么意义呢,这就影响到了我们下面的UNSAFE这个工具类。

UNSAFE 基本用法

Unsafe 是位于 sun.misc 包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于 Unsafe 类使 Java 语言拥有了类似 C语言指针 一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得 Java 这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。

参考:https://www.cdsy.xyz/computer/programme/java/250913/cd74727.html

我们查看 UNSAFE 的源码,知道他是 rt.jar 包下的,也就是说它的 ClassLoader 是 BootstrapClassLoader。那么知道这有什么用呢?我们看看源码

public static Unsafe getUnsafe() {
    Class<?> caller = Reflection.getCallerClass();
    if (!VM.isSystemDomainLoader(caller.getClassLoader()))
        throw new SecurityException("Unsafe");
    return theUnsafe;
}

也就是说,如果unsafe对象不是经过系统级的 ClassLoader 加载的,那么就报错??

@Test
public void testStart() {
    Unsafe unsafe = Unsafe.getUnsafe();
    System.out.println(unsafe);
}

你直接这样使用是会报错的。不巧 ConcurrentHashMap 也是 rt.jar 包下的,所以它可以使用这个 UNSAFE。

正确的姿势:

Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
Unsafe unsafe = field.get(null);

到这里你就只需要了解,unsafe为我们提供了一个cas的操作命令,可以用来保证操作原子性。

好了说了这么多,下面开始源码部分,其实如果你了解hashmap,这里ConcurrentHashMap就很简单了。

源码解析

我们知道 hashmap 的底层是一个数组(先忽略链表和红黑树)。我们为了达到线程安全的目的,肯定是需要加锁的,如果在方法上加锁,那就是 HashTAble 了,显然 ConcurrentHashMap 不是。

原理分析

我们将数组的 put 和 get 操作分解一下,因为我们在插入数据的时候,代码是:

public static void main(String[] args) {
    ConcurrentHashMap<String, Float> map = new ConcurrentHashMap();
    map.put("路飞", 1f);
    map.put("路飞", 10f);
    map.put("乔巴", 0.5f);
    map.put("索隆", 9f);
    System.out.println(map);
}

我们在 put 乔巴、索隆的时候,这2个数据的key明显不一样,他们的hash值也很可能不一样(但是不保证真的不一样)。所以如果key的hash值我们已经明确他们都不一样了,那么他们就是在数组的不同位置进行插入操作,其实相互之间是没有任何影响的。

假设索隆插入到 index = 4,乔巴插入到 index = 7,我们有必要将整个数组都锁住嘛?显然没必要,我们只需要保证在这个数组的这个位置上的操作是线程安全的就可以了。也就是只锁住数组的这个位置,保证只有一个线程来put,不就实现了put操作的线程安全嘛。

JDK7 方法分析

为了达到上面的目的,jdk7 提出了一个 segment 的概念,就是将原来的基本数据数组 table,划分成一个个的小块

看看segment的源码:

final Segment<K,V>[] segments;
...
static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;
    ...

多么熟悉的感觉,segment 中的 table 不就相当于是一个 hashmap 嘛,ConcurrentHashMap 就是将hashmap变成一个数组segment,用来存储数据。

既然我们知道了这个原理,再来看源码就简单了。我们看到 segment 的 class 定义中,它extends了ReentrantLock,典型的锁机制,我们在源码中也能够看到lock方法。

关键参数
public class ConcurrentHashMap<K, V> {
    //用于根据给定的key的hash值定位到一个Segment
    final int segmentMask;

    //用于根据给定的key的hash值定位到一个Segment
    final int segmentShift;

    //HashEntry[]初始容量:决定了HashEntry数组的初始容量和初始阀值大小
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    //Segment对象下HashEntry[]的初始加载因子:
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    //Segment对象下HashEntry[]最大容量:
    static final int MAXIMUM_CAPACITY = 1 << 30;

    //Segment[]初始并发等级:决定了Segment[]的长度
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //最小Segment[]容量:
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    //最大Segement[]容量
    static final int MAX_SEGMENTS = 1 << 16;

    static final int RETRIES_BEFORE_LOCK = 2;

    //Segment[]
    final Segment<K,V>[] segments;
}

初始数组长度是16,也就是上图中的第二行的数组个数(其实他们是分成一个一个的小数组了的。)加载因子不解释了。

ConcurrentHashMap 的构造函数

我就结合默认的参数进行讲解源码,将参数代入到源码,看的更清晰。注意参数后面的=号看成是她的值,而不是运算符

先看完整参数的构造函数

//通过指定的容量,加载因子和并发等级创建一个新的ConcurrentHashMap
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    //对容量,加载因子和并发等级做限制
    if (!(loadFactor=0.75 > 0) || initialCapacity=16 < 0 || concurrencyLevel=16 <= 0)
        throw new IllegalArgumentException();
    //限制并发等级不可以大于最大等级,如果定义的concurrencyLevel数大于MAX_SEGMENTS = 1 << 16,那么就直接=1 << 16;
    if (concurrencyLevel=16 > MAX_SEGMENTS)
        concurrencyLevel=16 = MAX_SEGMENTS;
    // 下面即通过并发等级来确定Segment的大小
    //sshift 用来记录 ssize这个数据 向左按位移动的次数
    int sshift = 0;
    //ssize 用来记录 Segment 数组的大小,ConcurrentHashMap是由segment数组作为数据存储的。
    int ssize = 1;
    //Segment 的大小为 大于等于 concurrencyLevel 的第一个2的n次方的数
    // 不断的循环,
    // 第1次循环:ssize = 1 < 16,sshift++ = 1,ssize 左移1位 ssize = 2
    // 第2次循环:ssize = 2 < 16,sshift++ = 2,ssize 左移1位 ssize = 4
    // 第3次循环:ssize = 4 < 16,sshift++ = 3,ssize 左移1位 ssize = 8
    // 第4次循环:ssize = 8 < 16,sshift++ = 4,ssize 左移1位 ssize = 16
    // 第5次循环:ssize = 16 跳出
    while (ssize < concurrencyLevel=16) {
        ++sshift;
        ssize <<= 1;
    }
    // 经过上面循环:sshift = 4,ssize = 16
    // segmentShift、segmentMask用于元素在Segment[]数组的定位
    // segmentShift 其实就是得到 ssize 的高位有多少个0,
    // 比如 sshift = 4,ssize = 8 因为int有32位,就是:0000 0000 0000 0000 0000 0000 0000 1000,前面有28个0
    // 假设 sshift = 5,ssize = 16 就是:0000 0000 0000 0000 0000 0000 0001 0000,前面有27个0
    this.segmentShift = 32 - sshift;// segmentShift = 32 - 4 = 28,
    //segmentMask的值等于ssize - 1(这个值很重要)
    //segmentMask = ssize - 1 = 15,还记得hashmap中的hash&(length-1)嘛?
    this.segmentMask = ssize - 1;
    if (initialCapacity=16 > MAXIMUM_CAPACITY=1<<30)
        // 目前没有进入
        initialCapacity = MAXIMUM_CAPACITY;
    //c记录每个Segment上要放置多少个元素,c = 16 / 16 = 1,也就是说segment中的tables数组的长度是1.但是还没完。
    int c = initialCapacity / ssize;
    //假如有余数,则Segment数量加1,1 * 16 < 16不成立,所以没有执行++c
    if (c * ssize < initialCapacity)
        ++c;
    // cap = 2,默认segment中的table长度最小值又是2,
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 2 < 1 不成立,
    while (cap < c)
        cap <<= 1;
    // 创建第一个Segment,并放入Segment[]数组中,作为第一个Segment,指定segment的table属性的长度初始化为2
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

看了源码还是一脸懵的,简单的总结一下:

  • 初始化的时候,确定了segment数组的长度,默认是16
  • segment 数组存储的是一个segment对象,这个对象有一个数组属性transient volatile HashEntry<K,V>[] table;设定了她的初始长度为2;
ConcurrentHashMap 的 put 方法

为什么要强调是 ConcurrentHashMap 的put方法呢?因为segment中也有一个put方法,为了区分,我这里强调一下。

第一层 put 方法。

public V put(K key, V value) {
    // 创建了一个segment的引用,还没有初始化
    Segment<K,V> s;
    if (value == null)
        // value不允许为null,这里就看到了ConcurrentHashMap的特性之一。
        throw new NullPointerException();
    // 熟悉的hash方法,高位加入算法,均匀hash分布
    int hash = hash(key.hashCode());
    // segmentShift 还记得构造函数中的这个值么,ssize高位0的个数,这里就将hash右移,然后和 (length - 1)进行&操作,
    // 多么熟悉啊, hash & (length - 1)
    int j = (hash >>> segmentShift) & segmentMask;
    // 下面就是进行 UNSAFE操作,关心这个getObject的请再去看源码,这里意思就是判断 segment数组的index位置是否是null的 
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果找到的位置为空就创建一个,新创建的segment的中的table长度从segment[0]获取;这里就用到了UNSAFE.CAS
        s = ensureSegment(j);
    // 然后才是调用segment的put方法
    return s.put(key, hash, value, false);
}
Segment 的 put 方法

这个就简单了,参考 hashmap 的算法即可。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // ReentrantLock的方法tryLock尝试去获取锁,拿到锁返回true,
    HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

JDK8 的方法分析

jdk8 中的 ConcurrentHashMap 摒弃了 Segment(分锁段)的概念,而是启用了一种全新的方式实现,利用 CAS 算法。和 jdk8 的 HashMap 基本思想一样,底层依然由 数组 + 链表 + 红黑树的方式,但是为了做到并发,又增加了很多辅助的类,例如 TreeBin,Traverser 等对象内部类。

jdk7 在数组的基础上又增加了一层 segment,但是 jdk8 之后,干脆直接就在数组元素上一个一个的来锁,也不需要分段了。

重要成员
    /**
     * 盛装Node元素的数组 它的大小是2的整数次幂
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;
		
    /**
     hash表初始化或扩容时的一个控制位标识量。
     负数代表正在进行初始化或扩容操作
     -1代表正在初始化
     -N 表示有N-1个线程正在进行扩容操作
     正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
     */
    private transient volatile int sizeCtl; 
    // 以下两个是用来控制扩容的时候 单线程进入的变量
    private static int RESIZE_STAMP_BITS = 16;
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    // hash值是-1,表示这是一个forwardNode节点
    static final int MOVED     = -1;
构造函数

有参构造函数

public ConcurrentHashMap(int initialCapacity) {
    // tables的初始容量
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    // 初始容量限定一个大小,不能太大了。
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // 假设 initialCapacity = 12 ,sizeCtl = 32, 标识table还没有初始化,没有创建出来
    this.sizeCtl = cap;
}
put 方法

和 hashmap 一样,在第一次 put 数据的时候,才初始化 table

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key和value不允许为空
        if (key == null || value == null) throw new NullPointerException();
        // key的hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 无限循环,使用break退出
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 刚开始的时候,tab的空的,需要初始化
            if (tab == null || (n = tab.length) == 0)
                // 调用初始化方法,初始化之后,默认table的长度是16了。
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 如果要插入的数组的位置没有数据,直接插入不需要锁。
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                // 如果正在扩容,帮个忙
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 如果数组位置上有数据了,锁住f对象,开始数据插入
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // hash值一样,判定下是否equals,值不相等就覆盖,
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // hash值不一样,就直接链表的next指向新的这个数据。
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 当前数组已经是红黑树,就增加树节点
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 转为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 方法解析看后面
        addCount(1L, binCount);
        return null;
    }
初始化 table
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 这里相当于一个自旋,当tab没有创建的时候,进入方法
    while ((tab = table) == null || tab.length == 0) {
        // 判断一下 sizeCtl,还记得吗,标识在干什么的,初始化 sizeCtl = 0,
        if ((sc = sizeCtl) < 0)
            // 假设 sizeCtl 小于 0 标识正在扩容或者初始化,当先前程就停一下让出运行中状态,进入运行阶段【还记得线程的几个状态么】。因为是运行状态,所以会很快再次获取到执行权限。又进入了while判断
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 初始化状态,当前线程就可以进入初始化的工作。
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 刚开始的时候 sc = 0,所以 n = DEFAULT_CAPACITY = 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化table了
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 长度为16
                    table = tab = nt;
                    // 然后修改状态,16 - (16 右移 2 位 = 4) = 12
                    sc = n - (n >>> 2);
                }
            } finally {
                // sizeCtl = 12
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
addCount 方法
// 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 如果计数盒子不是空 或者
    // 如果修改 baseCount 失败
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 如果计数盒子是空(尚未出现并发)
        // 如果随机取余一个数组位置为空 或者
        // 修改这个槽位的变量失败(出现并发了)
        // 执行 fullAddCount 方法。并结束
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    // 如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
        // table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 根据 length 得到一个标识
            int rs = resizeStamp(n);
            // 如果正在扩容
            if (sc < 0) {
                // 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
                // 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
                // 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
                // 如果 nextTable == null(结束扩容了)
                // 如果 transferIndex <= 0 (转移状态变化了)
                // 结束循环 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 扩容
                    transfer(tab, nt);
            }
            // 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 更新 sizeCtl 为负数后,开始扩容。
                transfer(tab, null);
            s = sumCount();
        }
    }
}

这个方法做了2个事情

  1. 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
  2. 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
get 方法

get 方法就相对简单啦。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算下key的hash值。
    int h = spread(key.hashCode());
    // tab为空,直接返回null。然后计算得到数组的index,如果不为空,就继续
    if ((tab = table) != null && (n = tab.length) > 0 &&
        // tabAt方法,从数组tab拿到index = (n - 1) & h)的node
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 对比node的hash值是否和当前传入的key的hash值相等
        if ((eh = e.hash) == h) {
            // hash相等也要判定是否equals,多严谨,
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                // 返回val
                return e.val;
        }
        else if (eh < 0)
            // 不相等,是否红黑树
            return (p = e.find(h, key)) != null ? p.val : null;
        // 都不是,就是链表了,next查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

JDK8 总结

通过 put 方法,我们知道了 jdk8 时,ConcurrentHashMap 通过在 put 数据的时候,在数组的一个一个位置加 Synchronized 锁保证的线程安全。而且去掉了 jdk7 中的ReentrantLock,说明 Synchronized 优化的已经很好了(貌似 jdk9 之后更加牛了)。

城东书院 www.cdsy.xyz
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐