ConcurrentHashMap 源碼分析
CocurrentHashMap 作用
HashTable通過對整張表加鎖的方式實現並發hash查找與儲存,CocurrentHashMapt通過Segment的方式可以實現相同的功能,不過效率更加高,在jdk1.6的時候,CocuentHashMap有弱一致性的問題,不過在jdk1.7的時候,這個問題已經修復了。所以是並發安全性還是性能都是非常高的。接下來我嘗試基於jdk1.7的源碼去分析CocurrentHashMap。
cocurrentHashMap 初始化預處理
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error(data type scale not a power of two);
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
代碼解析:首先獲取Unsafe提供cas操作,java底層多線程並發都是通過cas完成的,不過cas操作對於高精度的並發還是存在一定問題。【至於這個問題,以後再分析】。UNSAFE.arrayBaseOffset(tc)和UNSAFE.arrayBaseOffset(sc)這兩個都是用於計算HashEntry和Segment實體對象相對於數組對象的內存偏移值。這是cas操作必須要獲取的值。 注釋:
//獲取數組中第一個元素的偏移量(get offset of a first element in the array)
public native int arrayBaseOffset(java.lang.Class aClass);
//獲取數組中一個元素的大小(get size of an element in the array)
public native int arrayIndexScale(java.lang.Class aClass);
cocurrentHashMap 初始化
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
代碼解析:以上代碼主要的作用是初始化Segment[]數組對象以及Segment對象。解析來分析最重要的put和get方法。
put方法
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
代碼解析:大體的意思是先計算出key的hash值,然後利用這個hash值得到Segment對象。然後Segment對象執行put方法。這樣就完成了put操作。由於這個過程非常重要,我們肯定想要知道它是如何處理並發以及 內部實現。
ensureSegment
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
首先計算出偏移量,然後利用UnSafe去獲取對象。在這裡有可能大家對這個偏移值的獲取有點疑惑,在這裡我也分析一下這個偏移量獲取既long u=(k<
segment->put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
get()
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key.hashCode());
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
原理很簡單先定位到segment,然後定位到實體。並且通過getObjectVolatie保證能夠讀到最新的數據。
總結:concurrentHashMap實現涉及到很多多線程的知識和java內存模型這方面的知識,如果沒有足夠的能力,介意不要模仿,但是我們可以學習它的思想以及是如何實現的。