線程安全的HashMap版本。
1)基本思想:將整個大的hash table進一步細分成小的hash table,即Segment;
2)讀不用加鎖;寫操作在所在的Segmenet上加鎖,而不是整個HashMap,Hashtable就是所有方法競爭Hashtable上的鎖,導致並發效率低;
3)采用懶構造segment(除了segments[0]),以減少初始化內存。Unsafe類實現了AtomicReferenceArrays的功能,但減少了間接引用的程度。對Segment.table元素、HashEntry.next屬性采用更輕量級的“lazySet”寫(用Unsafe.putOrderedObject實現,確保用在lock中,其後有unlock釋放),以保證table更新的順序一致性。之前的ConcurrentHashMap類過度依賴final關鍵字,雖然能避免一些volatile讀,但是初始化時需要較大的內存。
4)並發級別由concurrencyLevel參數確定,太大會浪費內存空間和遍歷時間,太小則加強競爭;當只有一個線程寫,其他線程都是讀,則設置為1;
5)迭代器為其創建時或之後的某個時刻ConcurrentHashMap狀態,不會拋出ConcurrentModificationException,用來在一個線程中一次使用;
6)盡量避免rehash。
Segment
final Segment[] segments; // final方式 static final class Segment extends ReentrantLock implements Serializable { // 在預掃描中,segment阻塞前tryLock嘗試的最大數 // 在多處理器中,采用有限的嘗試次數使得在查找節點時緩存 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // segment的table數組 // 采用entryAt/setEntryAt提供對其元素的volatile讀寫 transient volatile HashEntry [] table; // segment中元素數目,獲取方式采用lock或其他volatile方式 transient int count; // 所在segment的寫總數 // 即使其overflows,也可以保證在isEmpty()、size()使用中的正確性 // 獲取方式采用lock或其他volatile方式 transient int modCount; // segment中table的負載阈值,超過則其table rehash transient int threshold; // 負載因子,對所有segment都一樣。 // 這個單獨作為segment的一個屬性是為了避免鏈接到外部對象 final float loadFactor; Segment(float lf, int threshold, HashEntry [] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } } static final class HashEntry { // hash、key為final;value、next為volatile final int hash; final K key; volatile V value; volatile HashEntry next; HashEntry(int hash, K key, V value, HashEntry next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 用UNSAFE.putOrderedObject進行next的volatile寫 final void setNext(HashEntry n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
segments.length = 16
segmentShift = 28
segmentMask = 15
Segment.table.length = 2
Segment.loadFactor = 0.75
Segment.threshold = 1
// 帶初始容量、負載因子、並發級別參數構造
// initialCapacity參數:用來確定Segment容量
// loadFactor參數:Segment的負載因子
// concurrencyLevel參數:用來確定segments長度,即並發級別
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift; // 用來確定segment的移位
this.segmentMask = ssize - 1; // 用來確定segment的掩碼,即Segment數組長度-1,key hashCode的高位確定segment index
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 初始容量/Segment數組長度,即用來確定Segment容量
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1; // Segment容量
// create segments and segments[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]); // 保證序列化與之前版本的兼容
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
// 帶初始容量、負載因子參數構造,默認並發級別為16
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
// 帶初始容量參數構造,默認負載因子0.75
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 無參構造,默認初始容量16
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* Creates a new map with the same mappings as the given map.
* The map is created with a capacity of 1.5 times the number
* of mappings in the given map or 16 (whichever is greater),
* and a default load factor (0.75) and concurrencyLevel (16).
*
* @param m the map
*/
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
1)當Segment中鍵值對數超過負載阈值且table長度小於MAXIMUM_CAPACITY = 1 << 30,則對其table進行容量調整,table容量翻倍;
2)table最大容量為MAXIMUM_CAPACITY = 1 << 30;
3)table容量達到MAXIMUM_CAPACITY 後,如果有put請求,則直接在相應的bucket中鏈接進來,不會控制鍵值對的添加;
4)若進行了table的容量調整,需要將舊table關聯的鍵值對重新在新table中確定bucket,再添加進來,也就是所說的hash table rehash,這裡可以重用一些舊table的節點,因為next屬性是不變的。
// rehash
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
// table長度翻倍
// 由於table長度為2的冪次方,在從就table到新table的rehash過程中,
// 元素的索引要麼不變,要麼移位2的冪次方;
// 一些老的節點因其next屬性不變可以重用,在默認的負載阈值下,
// 只有1/6的元素需要復制。Entry的讀取值采用普通的數組索引,這是
// 因為其後有table的volatile寫
HashEntry[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1; // table長度翻倍,長度依然為2的冪次方
threshold = (int)(newCapacity * loadFactor); // 負載阈值
HashEntry[] newTable =
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry e = oldTable[i]; // 采用普通的數組索引
if (e != null) {
HashEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else {
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun; // 在新的table中,重用索引相同的老的節點,其next屬性不變
// Clone remaining nodes
for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
確定segments數組中Segment索引、Segment中table數組bucket索引
基於key的hashCode再哈希產生hash碼,用其高位確定Segment索引,用其低位確定bucket索引:
// 對key的hashCode進行再次哈希,對其高位、低位進一步散列
// 由於segments、Segment.table length為均2的冪次方,所以對那些高位、低位相同的hashCode,容易產生hash碰撞;
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
// 用hash碼的高位在segments中確定index
int j = (hash >>> segmentShift) & segmentMask;
// 用hash碼的低位確定bucket index
int index = (tab.length - 1) & hash;
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)將put操作委托給確定的Segment進行put;
4)先采用自旋獲取該Segment的鎖;
5)用hash碼的低位確定該Segment中table的bucket;
6)先遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則替換新value後返回;否則用key、value、hash創建Entry,將其鏈接到bucket首位;
7)釋放該Segment的鎖。
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false); // 委托給確定的Segment進行put
}
// Segment put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value); // 采用自旋,獲取鎖與遍歷bucket熱身
V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash; // bucketIndex
HashEntry first = entryAt(tab, index); // table元素 volatile讀
for (HashEntry e = first;;) {
if (e != null) { // 先遍歷看是否已有鍵值對
K k;
// key相同或hash碼相等且key相等
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node); // table元素 volatile寫
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
// segments元素volatile讀,若不存在,則創建再用CAS寫入segments
@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
// 對segments中元素進行volatile讀,若為null則
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次檢查是否為null
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))// 用CAS方式寫
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
// 在嘗試獲取Segment鎖的過程中,遍歷hash碼所確定的bucket中的節點,
// 如果沒有,則創建返回,返回時確保獲取到當前Segment鎖。這裡只采用equals
// 來比較key,這不重要,主要是為了遍歷熱身。
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash);
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key)) // 不是真的比較,僅僅簡單遍歷
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
// table元素volatile讀
@SuppressWarnings("unchecked")
static final HashEntry entryAt(HashEntry[] tab, int i) {
return (tab == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
// table元素volatile寫
static final void setEntryAt(HashEntry[] tab, int i,
HashEntry e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)將put操作委托給確定的Segment進行remove;
4)先采用自旋獲取該Segment的鎖;
5)用hash碼的低位確定該Segment中table的bucket;
6)遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則刪除;
7)釋放該Segment的鎖。
public V remove(Object key) {
int hash = hash(key);
Segment s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
HashEntry pred = null;
while (e != null) {
K k;
HashEntry next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next); // 刪除的是bucket的第一個節點,volatile寫
else
pred.setNext(next); // 通過改變next刪除節點,next的volatile寫
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)用hash碼的低位確定該Segment中table的bucket;
4)遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則返回關聯的value;否則返回null。
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
// 判斷ConcurrentHashMap是否為空
// 用segment的modCount來確定調用時間段
public boolean isEmpty() {
long sum = 0L;
final Segment[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L) // modCount溢不溢出都沒有關系,只要sum為0L就表明沒有修改
return false;
}
return true;
}
// 獲取ConcurrentHashMap的鍵值對總數
// 用segment的modCount來確定調用時間段
// 先嘗試RETRIES_BEFORE_LOCK次數獲取size,獲取失敗則加全鎖
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
HashIterator為迭代器基類,從後往前對ConcurrentHashMap進行全遍歷,性能一般,KeyIterator、ValueIterator、EntryIterator都繼承於該類:
abstract class HashIterator {
int nextSegmentIndex;
int nextTableIndex;
HashEntry[] currentTable;
HashEntry nextEntry;
HashEntry lastReturned;
HashIterator() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
advance();
}
/**
* Set nextEntry to first node of next non-empty table
* (in backwards order, to simplify checks).
*/
final void advance() {
for (;;) {
if (nextTableIndex >= 0) {
if ((nextEntry = entryAt(currentTable,
nextTableIndex--)) != null) // 往前找到不為null的bucket
break;
}
else if (nextSegmentIndex >= 0) {
Segment seg = segmentAt(segments, nextSegmentIndex--);// 往前找到不為null的Segment
if (seg != null && (currentTable = seg.table) != null)
nextTableIndex = currentTable.length - 1;
}
else
break;
}
}
final HashEntry nextEntry() {
HashEntry e = nextEntry;
if (e == null)
throw new NoSuchElementException();
lastReturned = e; // cannot assign until after null check
if ((nextEntry = e.next) == null)
advance();
return e;
}
public final boolean hasNext() { return nextEntry != null; }
public final boolean hasMoreElements() { return nextEntry != null; }
public final void remove() {
if (lastReturned == null)
throw new IllegalStateException();
ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
}
}
1)segments、Segment.table的長度均為2的冪次方;
2)用hash函數,基於key的hashCode再次哈希,對其高位、低位進一步散列 ;
3)用hash碼的高位在segments中確定index,低位確定bucket index;
// 用hash碼的高位在segments中確定index int j = (hash >>> segmentShift) & segmentMask; // 用hash碼的低位確定bucket index int index = (tab.length - 1) & hash;
其並發體現在兩點:
1)Segment之間的並發;
2)Segment內部的讀寫並發;
1)並發讀取segments中的元素;2)將所有操作委托給Segment。
int h = hash(key); // hash碼 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// segment index s = (Segment)UNSAFE.getObjectVolatile(segments, u)// 對segment index元素volatile讀
1)寫線程之間競爭Segment同一把鎖,讀線程不加鎖;
2)利用volatile、final語義保證並發寫線程之間、並發讀寫線程之間的可見性。
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) { // table volatile讀
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);// table元素volatile讀
e != null; e = e.next) { // next volatile讀
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k))) // key、hash為final
return e.value;// value volatile讀
}
}
return null;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table; // // table volatile讀
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index); // table元素volatile讀
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {// key、hash為final
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;// value volatile寫
++modCount;
}
break;
}
e = e.next;// next volatile讀
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);// table元素volatile寫,弱一致性
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在put時,如果獲取的segment為null,則進行懶構造
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
這樣可以減少初始化內存空間。
final字段所在的類實例初始化完成後,在未在構造完成前暴露實例的this前提下,final字段對所有並發線程可見,可以部分減少volatile讀,但是會增加初始化內存空間。