程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> JDK容器與並發—Map—ConcurrentSkipListMap

JDK容器與並發—Map—ConcurrentSkipListMap

編輯:JAVA綜合教程

JDK容器與並發—Map—ConcurrentSkipListMap


概述

基於跳表實現的ConcurrentNavigableMap。

1)containsKey、get、put、remove等操作的平均時間復雜度為log(n);size非固定時間操作,因異步特性,需要遍歷所有節點才能確定size,且可能不是正確的值如果遍歷過程中有修改;批量操作:putAll、equals、toArray、containsValue、clear非原子性。

2)增刪改查操作並發線程安全;

3)迭代器是弱一致性的:map創建時或之後某個時間點的,不拋出ConcurrentModificationException,可能與其他操作並發執行。升序視圖及迭代器比降序的要快;

數據結構

基於鏈表,有三種類型節點Node、Index、HeadIndex,底層為Node單鏈表有序層(升序),其他層為基於Node層的Index層即索引層,可能有多個索引層。

相對於單鏈表,跳表查找節點很快是在於:通過HeadIndex維護索引層次,通過Index從最上層開始往下查找元素,一步步縮小查找范圍,到了最底層Node單鏈表層,就只需要比較很少的元素就可以找到待查找的元素節點。

// Node節點,擁有key-value對
// 單鏈表,有序,第一個節點為head.node標記節點,中間可能會穿插些刪除標記節點(即marker節點)
static final class Node {
	final K key;
	volatile Object value; // value為Object類型,便於區分刪除標記節點、head.node標記節點
	volatile Node next;

	/**
	 * Creates a new regular node.
	 */
	Node(K key, Object value, Node next) {
		this.key = key;
		this.value = value;
		this.next = next;
	}

	// 創建一個刪除標記節點
	// 刪除標記節點的value為this,以區分其他Node節點;
	// key為null,其他場合會用到,但不能用作區分,因為head.node的key也為null
	Node(Node next) {
		this.key = null;
		this.value = this;
		this.next = next;
	}

	// CAS value屬性
	boolean casValue(Object cmp, Object val) {
		return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
	}

	// CAS next屬性
	boolean casNext(Node cmp, Node val) {
		return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
	}

	// 是否為刪除標記節點
	boolean isMarker() {
		return value == this;
	}

	// 是否為header node節點
	boolean isBaseHeader() {
		return value == BASE_HEADER;
	}

	// 在當前Node節點後增加刪除標記節點(采用CAS next方式實現)
	boolean appendMarker(Node f) {
		return casNext(f, new Node(f));
	}

	 // 推進刪除Node節點
	 // 一般在遍歷過程中,如果遇到當前Node的value為null,會調用該方法
	void helpDelete(Node b, Node f) {
		// 首先檢查當前的鏈接是否為b——>this——>f;
		// 再進行推進刪除,過程分兩步,每一步都采用CAS實現
		// 每次只推進一步,以減小推進線程間的干擾
		if (f == next && this == b.next) { // 檢測是否為b——>this——>f,其中b為前驅節點,f為後繼節點
			if (f == null || f.value != f) // 待刪除節點未進行標記
				appendMarker(f);// 鏈接刪除標記節點
			else
				b.casNext(this, f.next); // 刪除當前節點及其刪除標記節點,完成刪除
		}
	}

	// 獲取節點的有效value
	V getValidValue() {
		Object v = value;
		if (v == this || v == BASE_HEADER) // 若為刪除標記節點、header node節點,則返回null
			return null;
		return (V)v;
	}

	// 對有效鍵值對封裝到不可變的Entry
	AbstractMap.SimpleImmutableEntry createSnapshot() {
		V v = getValidValue();
		if (v == null)
			return null;
		return new AbstractMap.SimpleImmutableEntry(key, v);
	}

	// UNSAFE mechanics

	private static final sun.misc.Unsafe UNSAFE;
	private static final long valueOffset;
	private static final long nextOffset;

	static {
		try {
			UNSAFE = sun.misc.Unsafe.getUnsafe();
			Class k = Node.class;
			valueOffset = UNSAFE.objectFieldOffset
				(k.getDeclaredField("value"));
			nextOffset = UNSAFE.objectFieldOffset
				(k.getDeclaredField("next"));
		} catch (Exception e) {
			throw new Error(e);
		}
	}
}

// 索引節點,用於從最上層往下縮小查找范圍
// 邏輯上的雙鏈表,非前後鏈接,而是上下鏈接
static class Index {
	final Node node; // 索引節點是基於Node節點的
	final Index down;// down為final的,簡化並發
	volatile Index right;

	/**
	 * Creates index node with given values.
	 */
	Index(Node node, Index down, Index right) {
		this.node = node;
		this.down = down;
		this.right = right;
	}

	// CAS right屬性
	final boolean casRight(Index cmp, Index val) {
		return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
	}

	// 其Node節點是否刪除
	final boolean indexesDeletedNode() {
		return node.value == null;
	}

	// 鏈接Index節點
	// 如果鏈接過程中,其Node節點已刪除,則不鏈接,以減小與解鏈接的CAS競爭
	/*
	 * @param succ the expected current successor
	 * @param newSucc the new successor
	 * @return true if successful
	 */
	final boolean link(Index succ, Index newSucc) {
		Node n = node;
		newSucc.right = succ; // 將newSucc鏈接進來
		return n.value != null && casRight(succ, newSucc); // CAS right
	}

	// 解鏈接index節點,如果其Node已刪除,則解鏈接失敗
	/**
	 * @param succ the expected current successor
	 * @return true if successful
	 */
	final boolean unlink(Index succ) {
		return !indexesDeletedNode() && casRight(succ, succ.right);// CAS right
	}

	// Unsafe mechanics
	private static final sun.misc.Unsafe UNSAFE;
	private static final long rightOffset;
	static {
		try {
			UNSAFE = sun.misc.Unsafe.getUnsafe();
			Class k = Index.class;
			rightOffset = UNSAFE.objectFieldOffset
				(k.getDeclaredField("right"));
		} catch (Exception e) {
			throw new Error(e);
		}
	}
}

// HeadIndex節點,跟蹤索引層次
static final class HeadIndex extends Index {
	final int level;// 索引層,從1開始,Node單鏈表層為0
	HeadIndex(Node node, Index down, Index right, int level) {
		super(node, down, right);
		this.level = level;
	}
}

JDK中一個實例結構圖:

\

WIKI上關於跳表的解釋圖:<喎?http://www.Bkjia.com/kf/ware/vc/" target="_blank" class="keylink">vcD4KPHA+PGltZyBzcmM9"http://www.2cto.com/uploadfile/Collfiles/20160421/201604210924001082.gif" alt="\">

構造器

無參構造,空Map
public ConcurrentSkipListMap() {
	this.comparator = null; // 用Key的Comparable接口排序
	initialize();
}

// 帶comparator參數構造
public ConcurrentSkipListMap(Comparator comparator) {
	this.comparator = comparator;
	initialize();
}

// 帶Map參數構造,采用Key的Comparable接口排序
public ConcurrentSkipListMap(Map m) {
	this.comparator = null;
	initialize();
	putAll(m);
}

// 帶SortedMap參數構造,采用SortedMap的comparator排序
public ConcurrentSkipListMap(SortedMap m) {
	this.comparator = m.comparator();
	initialize();
	buildFromSorted(m);
}

增刪改查

初始化

final void initialize() {
	keySet = null;
	entrySet = null;
	values = null;
	descendingMap = null;
	randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
	head = new HeadIndex(new Node(null, BASE_HEADER, null),
							  null, null, 1); // 初始化head節點,空Map
}

增、改

步驟:
1)查找比key小的前驅節點,查找過程中刪除待刪除Node節點的索引節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若已存在相關的key-value對,則CAS替換新的value,返回舊value;若不存在,則確定插入位置;遍歷過程中,推進刪除待刪除的Node節點;
3)用key、value創建新的Node節點,用CAS next方式鏈接進來;
4)給新的Node節點隨機生成一個索引層次,若層次大於0,則給其增加索引節點,返回null。

public V put(K key, V value) {
	if (value == null)
		throw new NullPointerException();
	return doPut(key, value, false);
}

private V doPut(K kkey, V value, boolean onlyIfAbsent) {
	Comparable key = comparable(kkey);
	for (;;) {
		Node b = findPredecessor(key); // 找到前驅節點後,接下來就是在Node單鏈表層精確找到插入位置
		Node n = b.next;
		for (;;) {
			// 遍歷清除Node節點操作同findNode
			if (n != null) {
				Node f = n.next;
				if (n != b.next) 
					break;
				Object v = n.value;
				if (v == null) { 
					n.helpDelete(b, f); 
					break;
				}
				if (v == n || b.value == null)
					break;
				int c = key.compareTo(n.key);
				if (c > 0) { // key大於n,繼續往後找
					b = n;
					n = f;
					continue;
				}
				if (c == 0) { // 已有相關的key-value對
					if (onlyIfAbsent || n.casValue(v, value))
						return (V)v;
					else
						break; // CAS value失敗,則重新開始,失敗原因可能是n變成了待刪除節點或有其他修改線程修改過
				}
				// else c < 0; fall through:說明新增的key-value對需要插到b和n之間
			}

			Node z = new Node(kkey, value, n);
			if (!b.casNext(n, z)) // 將新節點z插入b、n之間
				break;         // 失敗了,原因同n != b.next,重來
			int level = randomLevel(); // 給新增的Node節點隨機生成一個索引層次
			if (level > 0)
				insertIndex(z, level); // 給z增加索引節點
			return null;
		}
	}
}

// 查找比key小的前驅節點,若沒有,則返回head.node
// 一些操作依賴該方法刪除索引節點
private Node findPredecessor(Comparable key) {
	if (key == null)
		throw new NullPointerException(); // don't postpone errors
	for (;;) {
		Index q = head;
		Index r = q.right;
		// 從索引層最上層開始,往右往下,
		// 一直找到最下層索引層(即第一層),從而確定查找范圍,以在底層Node單鏈表遍歷精確找到
		for (;;) {
			if (r != null) { // 在索引層,往右找
				Node n = r.node;
				K k = n.key;
				if (n.value == null) { // 遍歷到待刪除節點n的索引節點
					if (!q.unlink(r))// 刪除其索引節點(采用CAS right屬性)
						//刪除失敗原因:q被標記為待刪除節點或在q後增加新索引節點或已刪除了其right節點
						break;   // 重新開始
					r = q.right;// 若刪除成功,則獲取新的right索引節點,繼續找
					continue;
				}
				if (key.compareTo(k) > 0) { // 若key大,說明可能還有小於key的更大的,繼續找
					q = r;
					r = r.right;
					continue;
				}
			}
			Index d = q.down;// 當層索引層沒有,則往下一層找,進一步縮小查找范圍
			if (d != null) {// 在下一層索引層,繼續找
				q = d;
				r = d.right;
			} else
				return q.node;// 確定前驅節點,如果沒有則為head.node標記節點
		}
	}
}

// 給新增的Node節點隨機生成一個索引層次
/**
     * Returns a random level for inserting a new node.
     * Hardwired to k=1, p=0.5, max 31 (see above and
     * Pugh's "Skip List Cookbook", sec 3.4).
     *
     * This uses the simplest of the generators described in George
     * Marsaglia's "Xorshift RNGs" paper.  This is not a high-quality
     * generator but is acceptable here.
     */
private int randomLevel() {
	int x = randomSeed;
	x ^= x << 13;
	x ^= x >>> 17;
	randomSeed = x ^= x << 5;
	if ((x & 0x80000001) != 0) // test highest and lowest bits
		return 0;
	int level = 1;
	while (((x >>>= 1) & 1) != 0) ++level;
	return level;
}

// 為Node節點添加索引節點
/**
     * @param z the node
     * @param level the level of the index
     */
private void insertIndex(Node z, int level) {
	HeadIndex h = head;
	int max = h.level; // head的索引層次是最大的

	if (level <= max) { // 待添加索引節點的索引層次在head索引層次內,創建索引節點添加進來即可
		Index idx = null;
		for (int i = 1; i <= level; ++i)
			idx = new Index(z, idx, null);// 索引節點,從下往上鏈接
		addIndex(idx, h, level);               // 將索引節點鏈接進來

	} else { // 增加一層索引層,需要new新層次的HeadIndex
		/*
		 * To reduce interference by other threads checking for
		 * empty levels in tryReduceLevel, new levels are added
		 * with initialized right pointers. Which in turn requires
		 * keeping levels in an array to access them while
		 * creating new head index nodes from the opposite
		 * direction.
		 */
		level = max + 1;
		Index[] idxs = (Index[])new Index[level+1];
		Index idx = null;
		for (int i = 1; i <= level; ++i)
			idxs[i] = idx = new Index(z, idx, null);

		HeadIndex oldh;
		int k;
		for (;;) {
			oldh = head;
			int oldLevel = oldh.level;
			if (level <= oldLevel) { // 其他線程增加過索引層
				k = level;
				break;               // 同上面的level <= max情況處理
			}
			HeadIndex newh = oldh;
			Node oldbase = oldh.node;
			for (int j = oldLevel+1; j <= level; ++j) // 有可能其他線程刪除過索引層,所以從oldLevel至level增加HeadIndex
				newh = new HeadIndex(oldbase, newh, idxs[j], j); // 創建新層次的HeadIndex且將索引節點idxs相應層次鏈接進來
			if (casHead(oldh, newh)) { // CAS head HeadIndex節點
				k = oldLevel;
				break;
			}
		}
		addIndex(idxs[k], oldh, k);                  // 需要將idxs的舊oldLevel層次及下面的索引鏈接進來
	}
}

/**
 * 從第indexLevel層往下到第1層,將索引節點鏈接進來
 * @param idx the topmost index node being inserted
 * @param h the value of head to use to insert. This must be
 * snapshotted by callers to provide correct insertion level
 * @param indexLevel the level of the index
 */
private void addIndex(Index idx, HeadIndex h, int indexLevel) {
	// Track next level to insert in case of retries
	int insertionLevel = indexLevel;
	Comparable key = comparable(idx.node.key);
	if (key == null) throw new NullPointerException();

	// 過程與findPredecessor類似, 只是多了增加索引節點
	for (;;) {
		int j = h.level;
		Index q = h;
		Index r = q.right;
		Index t = idx;
		for (;;) {
			if (r != null) {// 在索引層,往右遍歷
				Node n = r.node;
				// compare before deletion check avoids needing recheck
				int c = key.compareTo(n.key);
				if (n.value == null) { 
					if (!q.unlink(r))
						break;
					r = q.right;
					continue;
				}
				if (c > 0) {
					q = r;
					r = r.right;
					continue;
				}
			}

			if (j == insertionLevel) {        // 可以鏈接索引節點idx
				if (t.indexesDeletedNode()) { // 索引節點的Node節點被標記為待刪除節點
					findNode(key);            // 推進刪除索引節點及其Node節點
					return;                   // 不用增加索引節點了
				}
				if (!q.link(r, t))            // 將第insertionLevel層索引節點鏈接進來
					//刪除失敗原因:同findPredecessor種的q.unlink(r)
					break; 				 	  // 鏈接失敗,重新開始
				if (--insertionLevel == 0) {  // 准備鏈接索引節點idx的下一層索引
					if (t.indexesDeletedNode()) // 返回前再次檢查索引節點t是否被標記為待刪除節點,以進行清理工作
						findNode(key);
					return;                   // insertionLevel==0表明已經完成索引節點idx的鏈接
				}
			}

			if (--j >= insertionLevel && j < indexLevel) // 已鏈接過索引節點idx的第insertionLevel+1層
				t = t.down;                              // 准備鏈接索引節點idx的第insertionLevel層
			q = q.down;						  // 准備鏈接索引節點idx的下一層索引
			r = q.right;
		}
	}
}

// 查找相關key的Node,沒有則返回null。
// 遍歷Node單鏈表中,清除待刪除節點
// 在doPut、doRemove、findNear等都包含這樣的遍歷清除操作
// 不能共享這樣的清除代碼,因為增刪改查需要獲取Node鏈表順序的快照暫存到自身的局部變量,用於並發
// 一些操作依賴此方法刪除Node節點
private Node findNode(Comparable key) {
	for (;;) {
		Node b = findPredecessor(key); // 獲取key的前驅節點
		Node n = b.next;
		for (;;) {
			if (n == null)
				return null;
			Node f = n.next;
			// 不是連續的b——>n——>f快照,不能進行後續解鏈接待刪除節點
			//變化情況:在b後增加了新節點或刪除了其next節點或增加了刪除標記節點以刪除b,
			if (n != b.next)              
				break; // 重新開始
			Object v = n.value;
			if (v == null) {           // n為待刪除節點
				n.helpDelete(b, f);    // 推進刪除節點n
				break;                 // 重新開始
			}
			// 返回的前驅節點b為待刪除節點
			// 這裡不能直接刪除b,因為不知道b的前驅節點,只能重新開始,調用findPredecessor返回更前的節點
			if (v == n || b.value == null)  // b is deleted
				break;                 // 重新開始
			int c = key.compareTo(n.key);
			if (c == 0)
				return n;
			if (c < 0)
				return null;
			b = n;
			n = f;
		}
	}
}

步驟:
1)查找比key小的前驅節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若不存在相關的key-value對,則返回null;否則確定Node節點位置;假設確定刪除的節點為n,b為其前驅節點,f為其後繼節點:

\
3)用CAS方式將其value置為null;

作用:

a)其他增刪改查線程遍歷到該節點時,都知其為待刪除節點;

b)其他增刪改查線程可通過CAS修改n的next,推進n的刪除。

該步失敗,只需要重試即可。
4)在其後添加刪除標記節點marker;

\
作用:

a)新增節點不能插入到n的後面;

b)基於CAS方式的刪除,可以避免刪除上的錯誤。

5)刪除該節點及其刪除標記節點;

\
第4)5)步可能會失敗,原因在於其他操作線程在遍歷過程中知n的value為null後,會幫助推進刪除n,這些幫助操作可以保證沒有一個線程因為刪除線程的刪除操作而阻塞。

另外,刪除需要確保b——>n——>marker——>f鏈接關系才能進行。

6)用findPredecessor刪除其索引節點;
7)若最上層索引層無Node索引節點,則嘗試降低索引層次。
8)返回其舊value

public V remove(Object key) {
	return doRemove(key, null);
}

// 主要的刪除Node節點及其索引節點方法
final V doRemove(Object okey, Object value) {
	Comparable key = comparable(okey);
	for (;;) {
		Node b = findPredecessor(key);
		Node n = b.next;
		for (;;) {
			if (n == null)
				return null;
			Node f = n.next;
			if (n != b.next)                    // inconsistent read
				break;
			Object v = n.value;
			if (v == null) {                    // n is deleted
				n.helpDelete(b, f);
				break;
			}
			if (v == n || b.value == null)      // b is deleted
				break;
			int c = key.compareTo(n.key);
			if (c < 0)
				return null;
			if (c > 0) {
				b = n;
				n = f;
				continue;
			}
			if (value != null && !value.equals(v))
				return null;
			if (!n.casValue(v, null))           		// 將value置為null
				break;
			if (!n.appendMarker(f) || !b.casNext(n, f)) // 添加刪除標記節點,刪除該節點與其刪除標記節點
				findNode(key);                  		// 失敗則用findNode繼續刪除
			else {
				findPredecessor(key);           		// 用findPredecessor刪除其索引節點
				if (head.right == null)
					tryReduceLevel();
			}
			return (V)v;
		}
	}
}

// 當最上三層索引層無Node索引節點,則將最上層索引層去掉。
// 采用CAS方式去掉後,如果其又擁有Node索引節點,則嘗試將其恢復。
private void tryReduceLevel() {
	HeadIndex h = head;
	HeadIndex d;
	HeadIndex e;
	if (h.level > 3 &&
		(d = (HeadIndex)h.down) != null &&
		(e = (HeadIndex)d.down) != null &&
		e.right == null &&
		d.right == null &&
		h.right == null &&
		casHead(h, d) && // try to set
		h.right != null) // recheck
		casHead(d, h);   // try to backout
}

步驟:
1)查找比key小的前驅節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若不存在相關的key-value對,則返回null;否則獲取Node節點;
3)判斷其value是否為null,如果不為null,則直接返回value;否則重試,原因是其被標記為待刪除節點。

public V get(Object key) {
	return doGet(key);
}

private V doGet(Object okey) {
	Comparable key = comparable(okey);
	/*
	 * Loop needed here and elsewhere in case value field goes
	 * null just as it is about to be returned, in which case we
	 * lost a race with a deletion, so must retry.
	 */
	for (;;) {
		Node n = findNode(key);
		if (n == null)
			return null;
		Object v = n.value;
		if (v != null)
			return (V)v;
	}
}

迭代器

基礎迭代器為Iter,從first節點開始遍歷Node單鏈表層:

abstract class Iter implements Iterator {
	/** the last node returned by next() */
	Node lastReturned;
	/** the next node to return from next(); */
	Node next;
	/** Cache of next value field to maintain weak consistency */
	V nextValue;

	/** Initializes ascending iterator for entire range. */
	Iter() {
		for (;;) {
			next = findFirst();
			if (next == null)
				break;
			Object x = next.value;
			if (x != null && x != next) {
				nextValue = (V) x;
				break;
			}
		}
	}

	public final boolean hasNext() {
		return next != null;
	}

	/** Advances next to higher entry. */
	final void advance() {
		if (next == null)
			throw new NoSuchElementException();
		lastReturned = next;
		for (;;) {
			next = next.next; // next
			if (next == null)
				break;
			Object x = next.value;
			if (x != null && x != next) {
				nextValue = (V) x;
				break;
			}
		}
	}

	public void remove() {
		Node l = lastReturned;
		if (l == null)
			throw new IllegalStateException();
		// It would not be worth all of the overhead to directly
		// unlink from here. Using remove is fast enough.
		ConcurrentSkipListMap.this.remove(l.key);
		lastReturned = null;
	}

}

特性

如何實現增刪改查並發線程安全?

1. 采用無鎖並發方式;

2.基於final、volatile、CAS方式助於並發;

3. 刪除Node節點方式:將該節點的value置為null,且在其後插入一個刪除標記節點,即:b——>n——>marker——>f(假設n為待刪除Node節點,marker為其刪除標記節點,b為n的前驅節點,f為n的刪除前的後繼節點)這種方式解決了4個問題:

1)與Node插入可並發進行,因為n後為marker標記節點,肯定不會在n後插入新的Node節點;

2)與Node修改可並發進行,因為n的value為null,修改線程對n的CAS修改肯定是失敗的;

3)與Node讀可並發進行,因為n的value為null,即使讀線程匹配到n,也返回的value為null,而在Map中返回null即代表key-value對不存在,n正在刪除,所以也表明不存在,盡管不是嚴格意義上的;

4)所有操作在遍歷Node單鏈表時,可根據以上鏈接關系,推進刪除n和marker。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved