剛剛看完了並發實踐這本書,算是理論具備了,看到了AQS的介紹,再看看源碼,發現要想把並發理解透還是很難得,花了幾個小時細分析了一下把可能出現的場景盡可能的往代碼中去套,還是有些收獲,但是真的很費腦,還是對多線程的理解太淺了,不多說了,直接上代碼吧。
這段代碼不是為跑通,只是把AQS,ReentrantLock中的部分源碼合並到了一起,便於理解。
1 package com.yb.interview.concurrent;
2
3
4 import java.util.concurrent.locks.LockSupport;
5
6 public class AQSSourceStudy {
7
8 abstract static class AQS {
9 /**
10 * 這個狀態是有子類來維護的,AQS不會用這個狀態做什麼
11 */
12 private volatile int state;
13 /**
14 * 隊尾節點
15 */
16 private volatile Node tail;
17 /**
18 * 可能情況
19 */
20 private volatile Node head;
21 /**
22 * 獨占線程
23 */
24 private Thread exclusiveOwnerThread;
25
26
27 /**
28 * 由子類實現
29 * 判斷當前線程是否需要排隊
30 */
31 abstract boolean tryAcquire(int i);
32
33 public int getState() {
34 return state;
35 }
36
37 public void setState(int state) {
38 this.state = state;
39 }
40
41 /**
42 * 主方法
43 * 可能的情況
44 * 當前狀態可以直接運行
45 * 當前狀態要放入隊列裡等待
46 * 狀態->子類獲取
47 * 過程,盡可能的不要去阻塞,循環多次,競爭多次
48 * 創建節點
49 * 節點入隊,隊尾
50 * 判斷新節點的前一個節點的狀態,更新,前一個節點,因為在入隊的過程中每個節點的狀態是動的
51 * 最後,阻塞當前線程
52 */
53 public final void acquire(int arg) {
54 if (!tryAcquire(arg) &&
55 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
56 // 中斷狀態傳播
57 // 實時或者將來阻塞,拋中斷異常
58 selfInterrupt();
59 }
60
61 /**
62 * 當有新節點入隊時,循環的把新節點關聯到一個有效節點的後面
63 * 然後,阻塞這個節點的線程(當前線程)
64 */
65 private boolean acquireQueued(Node node, int arg) {
66 boolean failed = true;
67 try {
68 boolean interrupted = false;
69 for (; ; ) {
70 final Node p = node.predecessor();
71 // 新節點的前個節點是頭結點,如果頭結點的線程釋放,新節點接可以直接執行
72 // 所有不要著急阻塞,在判斷一次,頭結點釋放沒有,如果頭結點釋放,新節點不阻塞,把新節點設為頭結點
73 // 當新節點沒有排隊直接運行了,之後要將節點標記為無效 cancelAcquire
74 if (p == head && tryAcquire(arg)) {
75 // 想了很久這段代碼發生的情況
76 // 這段代碼發生的情況
77 // 1.node在入隊列時,有不同的線程在獲得了鎖,且隊列中沒有節點
78 // 2.當執行到這裡再次tryAcquire之前,之前釋放了鎖
79 // 3.這時hasQueuedPredecessors中的判斷,頭結點的後一個節點,是新建的這個節點,滿足s.thread==Thread.currentThread(不考慮這時有其他線程進入,或者進入無效)
80 // 滿足了tryAcquire返回true的情況
81 // 將頭結點改為新節點
82 /****
83 * head tail
84 * | |
85 * | |
86 * ---------- ---------
87 * nullNode newNode
88 * --------- ----------
89 * next=newNode prev=nullNode
90 * prev=null next=null
91 * ------- ----------
92 *
93 * 改完後
94 *
95 * head tail
96 * | |
97 * | |
98 * --------- ---------
99 * nullNode newNode
100 * --------- ---------
101 * next=newNode prev=nullNode
102 * prev=null next=null
103 * --------- ----------
104 * */
105
106 setHead(node);
107 p.next = null;
108 failed = false;
109 return interrupted;
110 }
111 // 之前的節點不是正在執行線程的節點,調整位置和狀態再阻塞
112 // 在線程解除阻塞後,使者節點失效
113 if (shouldParkAfterFailedAcquire(p, node) &&
114 parkAndCheckInterrupt())
115 interrupted = true;
116 }
117 } finally {
118 if (failed)
119 // 節點解除阻塞後,可能是中斷或者超時
120 // 非unlock的解鎖
121 cancelAcquire(node);
122 }
123 }
124
125 private void cancelAcquire(Node node) {
126 if (node == null)
127 return;
128 node.thread = null;
129 Node pred = node.prev;
130 // 那個空的節點會保證終止
131 while (pred.waitStatus > 0)
132 // 將節點的prev關聯到最近的有效節點
133 node.prev = pred = pred.prev;
134 Node predNext = pred.next;
135 // 任何情況都執行的
136 node.waitStatus = Node.CANCELLED;
137
138 // 如果取消的節點是隊尾節點,並且將前節點設為隊尾節點
139 if (node == tail && compareAndSetTail(node, pred)) {
140 // cancel的節點和cancel之前的無效節點會移出隊列
141 compareAndSetNext(pred, predNext, null);
142 } else {
143 // 如果不是隊尾節點
144 int ws;
145 if (pred != head &&
146 ((ws = pred.waitStatus) == Node.SIGNAL ||
147 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
148 pred.thread != null) {
149 Node next = node.next;
150 if (next != null && next.waitStatus <= 0)
151 // prev->node->next 改為 prev->next
152 compareAndSetNext(pred, predNext, next);
153 } else {
154 // 判斷鎖定的狀態
155 // 如果前節點是頭結點,或者不是SIGNAL狀態並且無法設置為SIGNAL狀態
156 // 總結,取消一個節點是,要保證這個節點能被釋放,要不通過前節點通知,在鎖鎖,對應release
157 unparkSuccessor(node);
158 }
159
160 node.next = node; // help GC
161 }
162 }
163
164 private void unparkSuccessor(Node node) {
165 // 解鎖節點的線程
166 // 當node時頭節點時,是當前獲取線程釋放的炒作
167 // 不是偷節點
168 int ws = node.waitStatus;
169 if (ws < 0)
170 // 不用再去通知下個節點了,即將釋放node了
171 compareAndSetWaitStatus(node, ws, 0);
172 Node s = node.next;
173 if (s == null || s.waitStatus > 0) {
174 s = null;
175 // 從隊尾向前找到最前有效的節點
176 for (Node t = tail; t != null && t != node; t = t.prev)
177 if (t.waitStatus <= 0)
178 s = t;
179 }
180 if (s != null)
181 LockSupport.unpark(s.thread);
182
183 }
184
185 private void compareAndSetNext(Node pred, Node predNext, Object o) {
186
187 }
188
189 private boolean parkAndCheckInterrupt() {
190 // 阻塞
191 LockSupport.park(this);
192 // 當前前程標記中斷
193 return Thread.interrupted();
194 }
195
196 private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
197 int ws = pred.waitStatus;
198 // 如果前節點是需要被通知的,前節點正在被阻塞,阻塞當先線程
199 if (ws == Node.SIGNAL)
200 return true;
201 // 如果前節點是無效的,找到最近的一個有效節點,並關聯,返回,在外部調用方法中會再次調用這個方法
202 if (ws > 0) {
203 do {
204 node.prev = pred = pred.prev;
205 } while (pred.waitStatus > 0);
206 // 這是個切斷調用鏈的過程
207 pred.next = node;
208 } else {
209 // 更新前節點的狀態,釋放時通知新節點
210 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
211 }
212 return false;
213 }
214
215 /**
216 * 創建節點
217 * 節點入隊
218 *
219 * @return 新節點
220 */
221 private Node addWaiter(Node mode) {
222 Node node = new Node(Thread.currentThread(), mode);
223 Node pred = tail;
224 // 之前有節點在隊列中
225 if (pred != null) {
226 node.prev = pred;
227 // 直接修改隊尾,不成功要進入接下類的循環,循環中也有類型的判斷,這裡添加會減少一些邏輯(這樣說可能是理解的有偏差)
228 if (compareAndSetTail(pred, node)) {
229 pred.next = node;
230 return node;
231 }
232 }
233 enq(node);
234 return node;
235 }
236
237 /**
238 * 節點入隊
239 * 循環,直到把新節點放到隊尾,在多線程中這個過程是不確定的
240 */
241 private Node enq(Node node) {
242 for (; ; ) {
243 Node t = tail;
244 // Must initialize
245 // 隊尾沒值,新節點是第一個入隊的節點,創建一個空的節點,頭尾都指向這個空節點
246 if (t == null) {
247 if (compareAndSetHead(new Node()))
248 tail = head;
249 } else {
250 node.prev = t;
251 if (compareAndSetTail(t, node)) {
252 t.next = node;
253 return t;
254 }
255 }
256 }
257 }
258
259 /**
260 * 字面理解,是否有已經排隊的線程
261 * 實際意義,有重入鎖的情況,在這裡要考慮到
262 * 沒有節點在排隊的情況,頭結點與未節點是相同的
263 * 判斷重入,當前線程是頭結點的線程.
264 */
265 protected boolean hasQueuedPredecessors() {
266 Node t = tail;
267 Node h = head;
268 Node s;
269 //為什麼是頭結點的線程,而不是exclusiveOwnerThread,因為只有在
270 // 當前隊列裡沒有值得時候才回設置獨占線程,如果是通過節點釋放的線
271 // 程還會和節點綁定,不會映射到exclusiveOwnerThread
272 return h != t &&
273 ((s = h.next) == null || s.thread != Thread.currentThread());
274 }
275
276 public final boolean release(int arg) {
277 if (tryRelease(arg)) {
278 Node h = head;
279 // 在獨占鎖的時候,waitStatus只能為0 -1 -2 -3
280 // 這個裡不為0代表頭節點是空節點
281 // 空節點不需要釋放
282 // 頭節點是釋放鎖的時候,最先被考慮的
283 if (h != null && h.waitStatus != 0)
284 unparkSuccessor(h);
285 return true;
286 }
287 return false;
288 }
289
290 protected abstract boolean tryRelease(int arg);
291
292
293 public void setHead(Node head) {
294 this.head = head;
295 }
296
297 private boolean compareAndSetHead(Node node) {
298 return (true || false);
299 }
300
301 private boolean compareAndSetTail(Node pred, Node node) {
302 return (true || false);
303 }
304
305 protected void selfInterrupt() {
306 Thread.currentThread().interrupt();
307 }
308
309
310 /**
311 * CAS更新隊列狀態,CAS的問題在其他的機會介紹
312 */
313 boolean compareAndSetState(int o, int n) {
314 return (false || true);
315 }
316
317 /**
318 * 獨占線程標記改為指定線程
319 */
320 void setExclusiveOwnerThread(Thread t) {
321 exclusiveOwnerThread = t;
322 }
323
324 /**
325 * 返回獨占線程
326 */
327 Thread getExclusiveOwnerThread() {
328 return exclusiveOwnerThread;
329 }
330
331 // 修改節點的狀態
332 private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) {
333 return (true || false);
334 }
335
336 static class Node {
337
338 public int waitStatus;
339
340 Node() {
341 }
342
343 /**
344 * @param thread
345 * @param mode SHARED or EXCLUSIVE
346 */
347 Node(Thread thread, Node mode) {
348 this.thread = Thread.currentThread();
349 this.mode = mode;
350 }
351
352 // 共享模式標記
353 static final Node SHARED = new Node();
354 // 獨占模式標記
355 static final Node EXCLUSIVE = null;
356
357 // 節點被取消,因為超時或者中斷
358 static final int CANCELLED = 1;
359 // next被阻塞,當節點釋放時,notice next
360 static final int SIGNAL = -1;
361 // 在條件隊列中,等待某個條件被阻塞
362 static final int CONDITION = -2;
363 // 節點在共享模式下,可以傳播鎖
364 static final int PROPAGATE = -3;
365
366 volatile Node next;
367 volatile Node prev;
368 Node mode;
369
370 public Thread thread;
371
372 public Node predecessor() {
373 Node p = prev;
374 if (p == null)
375 throw new NullPointerException();
376 else
377 return p;
378 }
379 }
380
381
382 }
383
384 /**
385 * 這是一個獨占鎖的實現,從ReentrantLock中粘貼出來的部分代碼
386 */
387 class SYC extends AQS {
388
389 public void lock() {
390 acquire(1);
391 }
392
393 public void unlock() {
394 release(1);
395 }
396
397 protected final boolean tryAcquire(int acquires) {
398 final Thread current = Thread.currentThread();
399 int c = getState();
400 // 如果當前的狀態
401 if (c == 0) {
402 if (!hasQueuedPredecessors() &&
403 compareAndSetState(0, acquires)) {
404 setExclusiveOwnerThread(current);
405 return true;
406 }
407 } else if (current == getExclusiveOwnerThread()) {
408 int nextc = c + acquires;
409 if (nextc < 0)
410 throw new Error("Maximum lock count exceeded");
411 setState(nextc);
412 return true;
413 }
414 return false;
415 }
416
417 protected final boolean tryRelease(int releases) {
418 int c = getState() - releases;
419 if (Thread.currentThread() != getExclusiveOwnerThread())
420 throw new IllegalMonitorStateException();
421 boolean free = false;
422 if (c == 0) {
423 free = true;
424 setExclusiveOwnerThread(null);
425 }
426 setState(c);
427 return free;
428 }
429
430
431 }
432 }