博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
jdk源码(二):你知道ConcurrentHashMap的具体实现细节吗?
阅读量:6948 次
发布时间:2019-06-27

本文共 11313 字,大约阅读时间需要 37 分钟。

  hot3.png

1、首先抛出几个问题(文章最后有答案):

a、ConcurrentHashMap在put的时候,key经过几次hash计算?

b、segment 会增大吗?

c、新的值是放在链表的表头还是表尾?

2、ConcurrentHashMap是如何存储数据的?

先看图:

从图中我们可以看出ConcurrentHashMap有两个种数据结构:数组和单向链表

那ConcurrentHashMap和如何存放一对key和value呢?

put的具体过程:

a、根据key计算hash值

b、根据hash值找到segment数组的下标

c、根据上面的下标获取tab数组,

d、根据hash值,获取tab数组的下标

c、如果tab当前下标位置上没有值,就直接把存储有key和value的HashEntry存放在tab的当前下标下,否则就是形成一个链表(解决了Hash值冲突)

这就是整个put的大概过程。

是不是有小伙伴说,裤子都脱了,你给我看这个?哈哈哈哈哈,好,上代码

public V put(K key, V value) {        Segment
s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根据key获取hash值 int j = (hash >>> segmentShift) & segmentMask; // 定位segment数组的下标j if ((s = (Segment
)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // 根据下标j获取数组该下标的元素s s = ensureSegment(j); return s.put(key, hash, value, false); }

看代码,大家是不是也有些不太明白呢?那我就一行一行的解释吧

解释之前我们得先了解几个参数:(注:涉及到unsafe的相关用法参考:https://my.oschina.net/huangy/blog/1620321)

segmentShift、segmentMask、SSHIFT,SBASE,segments
public ConcurrentHashMap(int initialCapacity,                             float loadFactor, int concurrencyLevel) {        // 默认值initialCapacity=16 loadFactor=0.75 concurrencyLevel=16        // initialCapacity 决定tab数组的初始化长度,concurrencyLevel决定segment数组的长度        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)            throw new IllegalArgumentException();        if (concurrencyLevel > MAX_SEGMENTS)            concurrencyLevel = MAX_SEGMENTS;        // Find power-of-two sizes best matching arguments        // 找到一个ssize不小于concurrencyLevel且必须是2的n次幂,为什么呢?下面解释        int sshift = 0;        int ssize = 1;                while (ssize < concurrencyLevel) {            ++sshift;            ssize <<= 1;        }        this.segmentShift = 32 - sshift; // sshift=4,segmentShift=28        this.segmentMask = ssize - 1; // segment数组的长度=ssize=16,segmentMask=15        if (initialCapacity > MAXIMUM_CAPACITY)            initialCapacity = MAXIMUM_CAPACITY;        int c = initialCapacity / ssize;        if (c * ssize < initialCapacity)            ++c;        int cap = MIN_SEGMENT_TABLE_CAPACITY;        while (cap < c)            cap <<= 1;        // create segments and segments[0]        Segment
s0 = new Segment
(loadFactor, (int)(cap * loadFactor), (HashEntry
[])new HashEntry[cap]); Segment
[] ss = (Segment
[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; // segments长度=16 }
static {        int ss, ts;        try {            UNSAFE = sun.misc.Unsafe.getUnsafe();            Class tc = HashEntry[].class;            Class sc = Segment[].class;            TBASE = UNSAFE.arrayBaseOffset(tc);// 获取HashEntry[]的基本偏移量=6            SBASE = UNSAFE.arrayBaseOffset(sc);// 获取Segment[]的基本偏移量=6            ts = UNSAFE.arrayIndexScale(tc);// 获取HashEntry[]单位偏移量=4            ss = UNSAFE.arrayIndexScale(sc);//获取Segment[]单位偏移量=4            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(                ConcurrentHashMap.class.getDeclaredField("hashSeed"));            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(                ConcurrentHashMap.class.getDeclaredField("segmentShift"));            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(                ConcurrentHashMap.class.getDeclaredField("segmentMask"));            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(                ConcurrentHashMap.class.getDeclaredField("segments"));        } catch (Exception e) {            throw new Error(e);        }        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)            throw new Error("data type scale not a power of two");        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);//把Segment[]单位偏移量转成移位的次数=2        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);//把HashEntry[]单位偏移量转成移位的次数=2    }

开始解释代码: 

public V put(K key, V value) {        Segment
s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根据key获取hash值 int j = (hash >>> segmentShift) & segmentMask; // 定位segment数组的下标j // 上面知道segmentShift=28,segmentMask=15(二进制:00000000000000000000000000001111) // 假设hash=994162679 二进制:00111011010000011011011111110111 // (hash >>> segmentShift) // 这句话的意思就是hash右移28 结果=3 (二级制:00000000000000000000000000000011) // int j = (hash >>> segmentShift) & segmentMask; // 就变成 // int j = 3 & segmentMask; // 00000000000000000000000000000011 & 00000000000000000000000000001111 // 的结果=00000000000000000000000000000011 // 所以 j = 3 // 这就根据hash值定位segments的数组下标 j=3 // 我们回想一下:segments数组长度ssize=16 // segmentMask = ssize-1 = 15 // 然后每个经过右移28位的hash值和segmentMask进行与操作, // 就能保证j一定落在数组内,保证了不越界,同时效率也非常高 // 这就是要找到一个ssize不小于concurrencyLevel且必须是2的n次幂的原因 // HashMap 也是这么干的,可以关注我查看我的相关文章。 if ((s = (Segment
)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // 根据下标j获取数组该下标的元素s // UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) // 根据上面知道:SSHIFT=2 SBASE=6 // 同时刚刚求出 j=3 // (j << SSHIFT) + SBASE 的结果是12+6=20 // UNSAFE.getObject()会根据segments和偏移量得到数组下标=3的元素 // UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) 可以简单认为是 segments[3] // 只不过这是直接内存操作非常高效而已,这样的操作ConcurrentHashMap用的非常多 // 不明白可以查看我的相关文章 s = ensureSegment(j); return s.put(key, hash, value, false); }
private Segment
ensureSegment(int k) { // 这里一系列的unsafe操作请查看我的相关文章 final Segment
[] ss = this.segments; long u = (k << SSHIFT) + SBASE; //还是获取偏移量 Segment
seg; if ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment
proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; // cap=16 float lf = proto.loadFactor; // lf =0.75 int threshold = (int)(cap * lf); HashEntry
[] tab = (HashEntry
[])new HashEntry[cap]; if ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment
s = new Segment
(lf, threshold, tab); while ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) { // UNSAFE.compareAndSwapObject 保证了原子性,可以思考一下没有原子保证,会有什么后果 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }

分析:s.put(key, hash, value, false);

分析之前我们了解Segment,它继承了ReentrantLock,我们知道ConcurrentHashMap 是线程安全的,这就是关键点了

s.put()执行过程

a、尝试获取锁

b、根据hash值获取tab数组下标

c、tab数组当前下标,是否有HashEntry,有则遍历,没有则创建一个HashEntry

d、释放锁

大概就是这么一个过程

看代码

final V put(K key, int hash, V value, boolean onlyIfAbsent) {            // 尝试获取锁            HashEntry
node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry
[] tab = table; int index = (tab.length - 1) & hash; // tab.length=16 16-1=15(二进制00000000000000000000000000001111) // 看到这个是不是很熟悉,不解释了 HashEntry
first = entryAt(tab, index);// 根据数组和下标获取元素,还是unsafe操作 //遍历一次就搞定了所有事情,如果是你写,你会怎么写? for (HashEntry
e = first;;) { if (e != null) { // e,如果不为空,则表示tab数组当前这个下标,已经有值,很可能形成一个链表, // K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 如果key和hash都相同,表示同一个,按要求看是否要更新value oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first);// 从这里可以看出新进来的,会做链表头 else node = new HashEntry
(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }

其实最让人膜拜的代码是:

HashEntry
node = tryLock() ? null :scanAndLockForPut(key, hash, value);

tryLock()尝试获取锁,获取不到就执行scanAndLockForPut,我们看看scanAndLockForPut都干嘛了

private HashEntry
scanAndLockForPut(K key, int hash, V value) { // entryForHash根据hash值获取tab中对应的元素,看不懂可以参考之前的分析 HashEntry
first = entryForHash(this, hash); HashEntry
e = first; HashEntry
node = null; int retries = -1; // negative while locating node // 不断的在尝试获取锁 while (!tryLock()) { HashEntry
f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry
(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { // 为了防止无限次尝试,做了个限制,一般MAX_SCAN_RETRIES=64 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } // 这段代码主要做两件事: // 1、获取锁,执行完这个方法肯定能得到锁 // 2、在获取锁等待的过程中,有必要的创建新HashEntry // 这段代码主要优化的是: // 利用在获取锁等待的时间,如果发现tab当前这个下标的值为空 // 那么创建HashEntry,然后继续获取锁,知道超过MAX_SCAN_RETRIES的次数 // 执行到lock(),然后整个线程就会进入等待 // 如果不是使用tryLock(),而是上来就是lock(),那么整个线程就会进入等待,什么都干不了 // 这是一个非常小的优化,但是绝大部分应用场景都是新创建HashEntry这样的情况的, // 所以这个优化还是非常值得肯定的 // 大神写的代码,是不是有种眼界提高的感觉,哈哈哈哈哈哈

到此,ConcurrentHashMap的最关键的代码就是这些了,只要你能看懂这些,其他的都不在话下。

Unsafe的相关操作参考:

https://my.oschina.net/huangy/blog/1620321

3、总结

开头的问题有答案了吗?

好,揭晓答案

a、ConcurrentHashMap在put的时候,key经过几次hash计算?一次

b、segment 会增大吗?不会

c、新的值是放在链表的表头还是表尾?表头

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

initialCapacity:控制tab数组的大小(默认16)

loadFactor:tab进行rehash阈值百分数(默认0.75)

concurrencyLevel:控制segment的大小 (默认16)

所以,一旦concurrencyLevel指定了就不能改变了

那么ConcurrentHashMap里为什么分segment呢?

这就是ConcurrentHashMap高明之处,通过之前的分析我们都知道锁只在segment中存在,这样就把锁的粒度变小,提高并发,同时还是线程安全的,

所以,如果我们使用ConcurrentHashMap存放数据的时候,数据非常大的时候,concurrencyLevel的指定就尤为重要了,合适concurrencyLevel的可以让ConcurrentHashMap性能最佳。

最后留个问题:

hash值相同的两个对象是同一个吗?欢迎大家评论里留言

欢迎关注,转发

持续更新有意思的代码

转载于:https://my.oschina.net/huangy/blog/1620779

你可能感兴趣的文章
疑似微信企业版曝光 网友留言称“心疼阿里”
查看>>
高通每天提供超过一百万颗芯片 助力物联网发展
查看>>
Python Selenium的js扩展实现
查看>>
全球最大规模窄带物联网智慧水务商用项目在福州启动
查看>>
iDTRONIC推出RFID平板和手持移动设备
查看>>
圆通速递率先推出隐形面单 为个人信息安全再添安全锁
查看>>
当200亿个物联网设备同时产生数据 是时候重新认识传感器了
查看>>
深度专访:深谈的故事 (LinuxDeepin)
查看>>
《构建高可用Linux服务器 第3版》—— 1.2 全面了解Linux服务器
查看>>
《开放复杂智能系统——基础、概念、分析、设计与实施》—第1章1.5节 小结...
查看>>
《测试驱动数据库开发》——2.4 增量构建
查看>>
网站图片优化你需要知道的地方
查看>>
《计算机科学概论》—第3章3.4节音频数据表示法
查看>>
深入理解Java内存模型(一)——基础
查看>>
C++程序设计:原理与实践(进阶篇)15.3 序列和迭代器
查看>>
后台开发:核心技术与应用实践2.3 类的多态
查看>>
智能哲学:在AI前沿上人类伦理学的挑战与应战
查看>>
简单5步隐藏Ubuntu13.04 Unity 启动器
查看>>
《Redis入门指南》一5.4 Node.js与Redis
查看>>
《精通Python网络爬虫:核心技术、框架与项目实战》——2.3 用户爬虫的那些事儿...
查看>>