很久之前我写了这么一个并发异步的工具方法,传入的 suppliers 是具体的业务方法函数引用,考虑这个一个业务场景,我们需要去数据库统计某个表最近一个月在某个条件下的数量,那么为了提高响应效率,可以通过按天切分日期,然后组装成相应的 supplier,再通过多线程异步处理,最后统一收集完结果返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static <R> Collection<R> supplyAsyncWithExpDeal(Collection<Supplier<R>> suppliers,
                                                       boolean distinct, Executor executor, 
                                                       Function<Throwable, R> expHandler) {
    Collection<R> res = distinct ? new ConcurrentHashSet<>() : new CopyOnWriteArrayList<>();
    if (CollectionUtils.isNotEmpty(suppliers) && Objects.nonNull(executor)) {
        log.info("FutureUtils.supplyAsync ==> size of suppliers: {}", suppliers.size());
        // wait for all tasks to finish
        CompletableFuture.allOf(
            suppliers.stream().filter(Objects::nonNull).map(supplier ->
          		CompletableFuture.supplyAsync(supplier, executor).thenApply(r -> {
                	if (Objects.nonNull(r)) {
						// concurrent contending point
                    	res.add(r);
					}
                    return r;
				}).exceptionally(throwable -> {
                	R r = expHandler.apply(throwable);
					res.add(r);
					return r;
				}
            )
		).toArray(CompletableFuture[]::new)).join();
    }
    return res;
}

最近有同事提醒我,收集结果的地方存在潜在的并发问题,原来我的考虑是在一个方法里创建的变量是在栈里面的,所以多线程并发时,创建的 res 对象是不一样的,所以没有线程安全问题,但是 CompletableFuture 处理是基于线程池的,即后面在方法内部再分发任务到 executor 中的工作线程(我没睡醒),由于此处使用的是线程不安全的集合类,所以 add 方法存在线程安全问题,解决方案是将问题行替换为:

1
Collection<R> res = distinct ? new ConcurrentHashSet<>() : new CopyOnWriteArrayList<>();

其中因为 JDK 没有提供 ConcurrentHashSet 实现,此处的 ConcurrentHashSet 来自 hutool 工具包。

然而改完之后,合代码到生产环境时,我还是有点心虚,因为我不确定为什么这样改就能高枕无忧了,先不管 List,在千千万万的博客文章中,人人都说 HashMap、HashSet、ArrayList 等等不是线程安全的,要使用相应的 JUC 包下的工具类。周遭那些整天大喊 “进大厂必背的面试题” 中必然出现的一点是,被问到 “HashMap 和 ConcurrentHashMao 的区别” 时一定要答 “HashMap 不是线程安全的,ConcurrentHashMap 是线程安全的”,那么,先不说 ConcurrentHashMap 是怎么保证线程安全的,HashMap 在多线程并发的场景下会有什么问题呢?有人说可能会出现死循环,可能会导致 CPU 100%,可能会导致丢数据,那么真的是那样吗?如果是,又是怎么出现的呢?所谓实践出真知,我们不妨自己试着一一验证,而不是人云亦云。

一、环境准备

为了模拟多线程并发的场景,我需要在源码中进行判断和控制,所以我需要能够随意地修改 HashMap 的源码,但是 JDK 是只读的,那么怎么办呢?简单,把源码及相关的依赖复制出来就行,因为人们都说 JDK 8 的 HashMap 对大部分问题进行了修复,尽管它还不是线程安全的。那我们就一步步来,先看看 1.7 版本中会有什么问题,所以我搞了个 jdk-7u80 中的哈希表源码抠了出来,并进行了修改,它和一些测试代码组成的完整的工程可见 这里

二、HashMap 的运作

JDK 1.7 中的 HashMap 采用数组作为哈希表,使用键值对节点头插成链的拉链法解决哈希冲突,当整个 HashMap 中的键值对节点超过一定阈值时,就会进行扩容,扩容的基本运作过程是,新建一个原哈希表数组双倍大小的新数组,然后(可选地)重新计算哈希值,再根据新的哈希值将节点复制到新的哈希表中,当然冲突链中的数据也通过头插法进行重新排布。

对于 put 方法,主要思路是:计算哈希值得到索引,判断是否发生哈希冲突,是则遍历链,如果能找到 key 对应的值,则进行更新,否则进行插入。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public V put(K key, V value) {
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
    // 如果 key 已经存在,则更新 value,并返回旧值
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    // 如果 key不存在,则插入新的元素
    addEntry(hash, key, value, i);
    return null;
}

对于插入键值对,需要检查此时(不算上将要插入的节点)的节点数是否超过阈值,是则先进行扩容,再将新节点插到新的哈希表中。

1
2
3
4
5
6
7
8
void addEntry(int hash, K key, V value, int bucketIndex) {
    if ((size >= threshold) && (null != table[bucketIndex])) {
        resize(2 * table.length);
        hash = (null != key) ? hash(key) : 0;
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}

对于扩容,先计算出新的数组的长度(原表长度的两倍)并创建新数组,然后将数据从原数组搬运过去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    ...
    Entry[] newTable = new Entry[newCapacity];
    ...
    transfer(newTable, rehash);
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

对于搬运数据,需要遍历原数组及冲突链,进行重新哈希(如果需要的话)得到新的下标,然后进行搬运,一样地使用头插法排布节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    int cnt = 0;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];
            newTable[i] = e;
            e = next;
        }
    }
}

三、场景模拟

很多人说多线程环境下哈希表的扩容操作并发时,会出现节点成环的情况,从而在 get 某个节点的时候出现死循环,但他们没有给出实际的 debug 过程,我们不妨来尝试模拟这种场景。

HashMap 的实时容量记为 size,扩容阈值为 threshold,如果 size >= threshold,则进行扩容,其中 threshold = capacity * loadFactor,capacity 为用户指定的表大小,如果不指定则默认为 16,loadFactor 可理解为一个比例,即达到百分之多少进行扩容,默认地 loadFactor = 0.8,为了便于测试,我们将 capacity 设为 4,loadFactor 设为 1.0,并进行数据初始化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
final HashMap<CustomKey, String> map = new HashMap<>(4, 1);
final CustomKey k1 = new CustomKey(1, "a");
CustomKey k2 = new CustomKey(1, "b");
CustomKey k3 = new CustomKey(1, "c");
final CustomKey k4 = new CustomKey(1, "d");
final CustomKey k5 = new CustomKey(1, "e");
map.put(k1, "aa");
map.put(k2, "bb");
map.put(k3, "cc");
map.put(k4, "dd");

final HashMap.Entry<CustomKey, String>[] table = map.table;
for (HashMap.Entry<CustomKey, String> customKeyStringEntry : table) {
    System.out.println(customKeyStringEntry);
}

其中,CustomKey 是我自定义的一个类,作为键值对的键对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CustomKey {
    private int hashCode;
    private String uid;
    
	// constructors...
    
    @Override
    public int hashCode() {
        return hashCode;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        CustomKey customKey = (CustomKey) o;
        return Objects.equals(uid, customKey.uid);
    }

    @Override
    public String toString() {
        return uid;
    }
}

不得不提的是,HashMap 的哈希函数如下所示,很显然这个算法是跟对象的 hashCode 有关的,所以为了模拟哈希冲突,我需要能够控制每个测试对象的 hashCode,所以将之作为类的属性之一。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
final int hash(Object k) {
    int h = hashSeed;
    if (0 != h && k instanceof String) {
        return sun.misc.Hashing.stringHash32((String) k);
    }

    h ^= k.hashCode();

    // This function ensures that hashCodes that differ only by
    // constant multiples at each bit position have a bounded
    // number of collisions (approximately 8 at default load factor).
    h ^= (h >>> 20) ^ (h >>> 12);
    return h ^ (h >>> 7) ^ (h >>> 4);
}

另外,结合前面的 put 函数,如果 e.hash == hash && ((k = e.key) == key || key.equals(k)),则认为该键对象已经存在,而不是认为发生哈希冲突,所以我还需够重写 equals 方法,

因为默认地,所有对象的 equals 都继承自 Object 中的 equals 方法(比对堆地址),所以,如果一个类重写了 equals 方法,通过类的属性组合而不是地址作为对象的唯一性特征(即如果两个对象的各项特性一样,那么我们认为它们是 equals 的)时,那么新增元素会按照各个属性维度判断键值对是否已经存在,从而不往其中插入键一样的键值对,我们重写了 equals,那么往往还需要重写 hashCode 方法,结合对象的各个属性来计算 hashCode,因为前面的 e.hash == hash && ((k = e.key) == key || key.equals(k)) 判断才会认为两个属性相同的对象可认为同一对象。总有人说,如果重写了类的 equals 方法,一定要重写 hashCode 方法,但是倘若这个类的对象不作为哈希表的键,那么这个所谓的 “准测” 看起来让人觉得十二分莫名其妙。

1
2
3
public boolean equals(Object obj) {
    return (this == obj);
}

按照上面的理解,我们的 hashCode 方法应该由对象的各个属性决定的,但是为了模拟哈希冲突,我故意为多个对象设置了一样的 hashCode,让它们排成一条链表。为了便于观察,我重写了 HashMap.Entry 的 toString 方法,对冲突链进行格式化。

1
2
3
4
5
6
7
8
9
@Override
public String toString() {
    StringBuilder res =  new StringBuilder("{" + key + ", " + value + ", " + hash + "}");
    if (next != null) {
        res.append(" -> ");
        res.append(next.toString());
    }
    return res.toString();
}

看下上面我们构建的哈希表输出:

1
2
3
4
null
{d, dd, 1} -> {c, cc, 1} -> {b, bb, 1} -> {a, aa, 1}
null
null

现在整个哈希表的数据状态大致如下所示:

Snipaste_2021-06-03_11-35-42.png

由于扩容阈值是 4,所以当我们再尝试往其中添加一个 hash 到 1 号桶下标的键值对 {e, ee} 的时候,将会触发扩容,倘若正巧有多个线程在扩容操作中并发,那么会发生什么呢?我们假设现在有两条线程并发插入一个键值对到同一位置,触发扩容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
        ThreadUtils.sleep(1);
        map.put(k5, "ee");
    }
});
t1.setName("T1");
Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
        map.put(k5, "ee");
    }
});
t2.setName("T2");
t2.start();
t1.start();

四、分析与测试

对于上面的场景,因为线程 T1 启动后即进行了一秒休眠,记其后的某个 t1 时间点,线程 T2 先进入扩容方法,它创建了一个新的数组,并尝试将旧数组中的数据搬运到新数组中,此时其时间片用完,上下文切换到线程 T1 上。

Snipaste_2021-06-03_11-47-37.png

为了模拟线程 T2 用完时间片,我们在它遍历 {d, dd} 这个节点的时候让它陷入休眠,注意此时它的栈空间中持有了 e 和 next 句柄指向。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    int cnt = 0;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;
            // 线程 2 陷入此处,e = dd,next = cc
            if (cnt == 0  && "T2".equals(Thread.currentThread().getName()) && e.hash == 1) {
                System.out.println("T2进入休眠");
                ThreadUtils.sleep(2);
            }
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];
            newTable[i] = e;
            e = next;
        }
    }
}

接下来在 t2 时间段内,线程 T1 尝试放入 {e, ee},同样地也会触发扩容,在 resize 方法中创建自己的新数组,然后进行数据搬运,让我们来看看数据搬运的过程:

在遍历过程中,首先 e 和 next 分别指向 {d, dd} 和 {c, cc} 节点,然后 {d, dd} 和 {c, cc} 间的指针断开,{d, dd} 被拷贝到了新数组上,e 指针向前移动。

Snipaste_2021-06-03_12-43-36.png

经过多轮迭代,最后两个数组的情况如下所示:

Snipaste_2021-06-03_12-50-03.png

假设在此时开始的 t3 时间内,线程 T1 失去时间片,线程 T2 苏醒并继续它未完成的任务,我们在 table 赋值的地方对线程 T1 进行休眠处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }
    Entry[] newTable = new Entry[newCapacity];
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    if ("T1".equals(Thread.currentThread().getName())) {
        System.out.println("T1进入休眠");
        ThreadUtils.sleep(6);
    }
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

前面我们提到,线程 T2 已经持有了自己 e 和 next 句柄,分别指向 {d, dd} 和 {c, cc} 节点,但是由于线程 T1 在数据搬运过程中颠覆了整条链的指针指向情况,尽管数据搬运是一个拷贝过程,但是拷贝也分为深拷贝和浅拷贝,此处的拷贝属于浅拷贝!所以此时对于线程 T2 来说,它看到的情况是这样的:

Snipaste_2021-06-03_13-00-49.png

然后它尝试进行搬运,{d, dd} 被浅拷贝到新数组的下标为 1 的桶口,e 指向 next 记录的 {c, cc} 节点,而在下一个循环的开始时,next 指向原 {c, cc} 的下一个节点,因为是引用指向,所以 next 指向了线程 T1 中的新数组中的冲突链表记录的 {d, dd} 节点。

img

于是 {c, cc} 节点理所当然地被头插到 1 号桶口,其 next 指针指向了 {d, dd} 节点,接下来,e 指向了原局部变量 next 指向的 {d, dd},下一次迭代进来会尝试将 {d, dd} 头插到 {c, cc} 前面,而 {c, cc} 的 next 指针已经指向了 {d, dd},从而出现了循环引用的情况,试想如果此时线程 T2 调用 get 方法,那么将会触发死循环。

Snipaste_2021-06-03_14-02-34.png

我们尝试在线程 T2 中调用 get 方法进行测试:

1
2
3
4
5
6
7
Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
        map.put(k5, "ee");
        map.get(k1);
    }
});

为了结果直观化,我尝试将运行过程做成了 gif,看着还挺好玩。结果也很明显,的确出现了死循环。

deadloop.gif

与此同时,我们打开任务管理器查看 CPU 运行情况,可以看到 CPU 利用率已经去到了 100%,把程序停掉后恢复正常。

Snipaste_2021-06-03_15-00-43.png

如果线上环境出现 CPU 利用飙升的情况,对于 Windows服务器,通过 tasklist 命令拿到 JVM 的 PID,

img

然后使用 jstack 查看线程运行情况,然后定位到具体的代码位置。

Snipaste_2021-06-03_17-47-49.png

对于 Linux 服务器,可以使用 top -p 265 -H 命令查看 CPU 占用率高的进程,如果是 JVM 进程,拿到其 10 进制的 pid,然后通过 printf "%x" xx 转成十六进制,再使用 jstack 查看线程运行状况。

试想如果极端一点,某个系统的并发量极高,同一时刻有 100 条像 T2 这样的线程在成环的冲突链表上调用 get 方法,那么服务的可用性将会受到极大的威胁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
for (int i = 2; i < 102; i++) {
    Thread t2 = new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(k5, "ee");
            map.get(k1);
        }
    });
    t2.setName("T" + i);
    t2.start();
}

五、总结

JDK 1.7 版本的 HashMap 由于没有做同步控制(事实上,它的设计初衷本身不是用于并发场景),所以需要避免在多线程环境下使用它来存储数据,因为在多线程并发扩容的时候,由于采用头插法及浅拷贝,会导致数据丢失及节点循环引用的情况,从而在后续调用 get 方法时触发死循环。