程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 基於ReentrantLock的AQS的源碼分析(獨占、非中斷、不超時部分),reentrantlockaqs

基於ReentrantLock的AQS的源碼分析(獨占、非中斷、不超時部分),reentrantlockaqs

編輯:JAVA綜合教程

基於ReentrantLock的AQS的源碼分析(獨占、非中斷、不超時部分),reentrantlockaqs


剛剛看完了並發實踐這本書,算是理論具備了,看到了AQS的介紹,再看看源碼,發現要想把並發理解透還是很難得,花了幾個小時細分析了一下把可能出現的場景盡可能的往代碼中去套,還是有些收獲,但是真的很費腦,還是對多線程的理解太淺了,不多說了,直接上代碼吧。

這段代碼不是為跑通,只是把AQS,ReentrantLock中的部分源碼合並到了一起,便於理解。

  1 package com.yb.interview.concurrent;
  2 
  3 
  4 import java.util.concurrent.locks.LockSupport;
  5 
  6 public class AQSSourceStudy {
  7 
  8     abstract static class AQS {
  9         /**
 10          * 這個狀態是有子類來維護的,AQS不會用這個狀態做什麼
 11          */
 12         private volatile int state;
 13         /**
 14          * 隊尾節點
 15          */
 16         private volatile Node tail;
 17         /**
 18          * 可能情況
 19          */
 20         private volatile Node head;
 21         /**
 22          * 獨占線程
 23          */
 24         private Thread exclusiveOwnerThread;
 25 
 26 
 27         /**
 28          * 由子類實現
 29          * 判斷當前線程是否需要排隊
 30          */
 31         abstract boolean tryAcquire(int i);
 32 
 33         public int getState() {
 34             return state;
 35         }
 36 
 37         public void setState(int state) {
 38             this.state = state;
 39         }
 40 
 41         /**
 42          * 主方法
 43          * 可能的情況
 44          * 當前狀態可以直接運行
 45          * 當前狀態要放入隊列裡等待
 46          * 狀態->子類獲取
 47          * 過程,盡可能的不要去阻塞,循環多次,競爭多次
 48          * 創建節點
 49          * 節點入隊,隊尾
 50          * 判斷新節點的前一個節點的狀態,更新,前一個節點,因為在入隊的過程中每個節點的狀態是動的
 51          * 最後,阻塞當前線程
 52          */
 53         public final void acquire(int arg) {
 54             if (!tryAcquire(arg) &&
 55                     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 56                 // 中斷狀態傳播
 57                 // 實時或者將來阻塞,拋中斷異常
 58                 selfInterrupt();
 59         }
 60 
 61         /**
 62          * 當有新節點入隊時,循環的把新節點關聯到一個有效節點的後面
 63          * 然後,阻塞這個節點的線程(當前線程)
 64          */
 65         private boolean acquireQueued(Node node, int arg) {
 66             boolean failed = true;
 67             try {
 68                 boolean interrupted = false;
 69                 for (; ; ) {
 70                     final Node p = node.predecessor();
 71                     // 新節點的前個節點是頭結點,如果頭結點的線程釋放,新節點接可以直接執行
 72                     // 所有不要著急阻塞,在判斷一次,頭結點釋放沒有,如果頭結點釋放,新節點不阻塞,把新節點設為頭結點
 73                     // 當新節點沒有排隊直接運行了,之後要將節點標記為無效 cancelAcquire
 74                     if (p == head && tryAcquire(arg)) {
 75                         // 想了很久這段代碼發生的情況
 76                         // 這段代碼發生的情況
 77                         // 1.node在入隊列時,有不同的線程在獲得了鎖,且隊列中沒有節點
 78                         // 2.當執行到這裡再次tryAcquire之前,之前釋放了鎖
 79                         // 3.這時hasQueuedPredecessors中的判斷,頭結點的後一個節點,是新建的這個節點,滿足s.thread==Thread.currentThread(不考慮這時有其他線程進入,或者進入無效)
 80                         // 滿足了tryAcquire返回true的情況
 81                         // 將頭結點改為新節點
 82                         /****
 83                          * head          tail
 84                          * |               |
 85                          * |               |
 86                          * ----------    ---------
 87                          * nullNode      newNode
 88                          * ---------     ----------
 89                          * next=newNode  prev=nullNode
 90                          * prev=null     next=null
 91                          * -------       ----------
 92                          *
 93                          * 改完後
 94                          *
 95                          *             head tail
 96                          *               |    |
 97                          *               |    |
 98                          * ---------    ---------
 99                          * nullNode      newNode
100                          * ---------     ---------
101                          * next=newNode  prev=nullNode
102                          * prev=null     next=null
103                          * ---------     ----------
104                          * */
105 
106                         setHead(node);
107                         p.next = null;
108                         failed = false;
109                         return interrupted;
110                     }
111                     // 之前的節點不是正在執行線程的節點,調整位置和狀態再阻塞
112                     // 在線程解除阻塞後,使者節點失效
113                     if (shouldParkAfterFailedAcquire(p, node) &&
114                             parkAndCheckInterrupt())
115                         interrupted = true;
116                 }
117             } finally {
118                 if (failed)
119                     // 節點解除阻塞後,可能是中斷或者超時
120                     // 非unlock的解鎖
121                     cancelAcquire(node);
122             }
123         }
124 
125         private void cancelAcquire(Node node) {
126             if (node == null)
127                 return;
128             node.thread = null;
129             Node pred = node.prev;
130             // 那個空的節點會保證終止
131             while (pred.waitStatus > 0)
132                 // 將節點的prev關聯到最近的有效節點
133                 node.prev = pred = pred.prev;
134             Node predNext = pred.next;
135             // 任何情況都執行的
136             node.waitStatus = Node.CANCELLED;
137 
138             // 如果取消的節點是隊尾節點,並且將前節點設為隊尾節點
139             if (node == tail && compareAndSetTail(node, pred)) {
140                 // cancel的節點和cancel之前的無效節點會移出隊列
141                 compareAndSetNext(pred, predNext, null);
142             } else {
143                 // 如果不是隊尾節點
144                 int ws;
145                 if (pred != head &&
146                         ((ws = pred.waitStatus) == Node.SIGNAL ||
147                                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
148                         pred.thread != null) {
149                     Node next = node.next;
150                     if (next != null && next.waitStatus <= 0)
151                         // prev->node->next  改為 prev->next
152                         compareAndSetNext(pred, predNext, next);
153                 } else {
154                     // 判斷鎖定的狀態
155                     // 如果前節點是頭結點,或者不是SIGNAL狀態並且無法設置為SIGNAL狀態
156                     // 總結,取消一個節點是,要保證這個節點能被釋放,要不通過前節點通知,在鎖鎖,對應release
157                     unparkSuccessor(node);
158                 }
159 
160                 node.next = node; // help GC
161             }
162         }
163 
164         private void unparkSuccessor(Node node) {
165             // 解鎖節點的線程
166             // 當node時頭節點時,是當前獲取線程釋放的炒作
167             // 不是偷節點
168             int ws = node.waitStatus;
169             if (ws < 0)
170                 // 不用再去通知下個節點了,即將釋放node了
171                 compareAndSetWaitStatus(node, ws, 0);
172             Node s = node.next;
173             if (s == null || s.waitStatus > 0) {
174                 s = null;
175                 // 從隊尾向前找到最前有效的節點
176                 for (Node t = tail; t != null && t != node; t = t.prev)
177                     if (t.waitStatus <= 0)
178                         s = t;
179             }
180             if (s != null)
181                 LockSupport.unpark(s.thread);
182 
183         }
184 
185         private void compareAndSetNext(Node pred, Node predNext, Object o) {
186 
187         }
188 
189         private boolean parkAndCheckInterrupt() {
190             // 阻塞
191             LockSupport.park(this);
192             // 當前前程標記中斷
193             return Thread.interrupted();
194         }
195 
196         private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
197             int ws = pred.waitStatus;
198             // 如果前節點是需要被通知的,前節點正在被阻塞,阻塞當先線程
199             if (ws == Node.SIGNAL)
200                 return true;
201             // 如果前節點是無效的,找到最近的一個有效節點,並關聯,返回,在外部調用方法中會再次調用這個方法
202             if (ws > 0) {
203                 do {
204                     node.prev = pred = pred.prev;
205                 } while (pred.waitStatus > 0);
206                 // 這是個切斷調用鏈的過程
207                 pred.next = node;
208             } else {
209                 // 更新前節點的狀態,釋放時通知新節點
210                 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
211             }
212             return false;
213         }
214 
215         /**
216          * 創建節點
217          * 節點入隊
218          *
219          * @return 新節點
220          */
221         private Node addWaiter(Node mode) {
222             Node node = new Node(Thread.currentThread(), mode);
223             Node pred = tail;
224             // 之前有節點在隊列中
225             if (pred != null) {
226                 node.prev = pred;
227                 // 直接修改隊尾,不成功要進入接下類的循環,循環中也有類型的判斷,這裡添加會減少一些邏輯(這樣說可能是理解的有偏差)
228                 if (compareAndSetTail(pred, node)) {
229                     pred.next = node;
230                     return node;
231                 }
232             }
233             enq(node);
234             return node;
235         }
236 
237         /**
238          * 節點入隊
239          * 循環,直到把新節點放到隊尾,在多線程中這個過程是不確定的
240          */
241         private Node enq(Node node) {
242             for (; ; ) {
243                 Node t = tail;
244                 // Must initialize
245                 // 隊尾沒值,新節點是第一個入隊的節點,創建一個空的節點,頭尾都指向這個空節點
246                 if (t == null) {
247                     if (compareAndSetHead(new Node()))
248                         tail = head;
249                 } else {
250                     node.prev = t;
251                     if (compareAndSetTail(t, node)) {
252                         t.next = node;
253                         return t;
254                     }
255                 }
256             }
257         }
258 
259         /**
260          * 字面理解,是否有已經排隊的線程
261          * 實際意義,有重入鎖的情況,在這裡要考慮到
262          * 沒有節點在排隊的情況,頭結點與未節點是相同的
263          * 判斷重入,當前線程是頭結點的線程.
264          */
265         protected boolean hasQueuedPredecessors() {
266             Node t = tail;
267             Node h = head;
268             Node s;
269             //為什麼是頭結點的線程,而不是exclusiveOwnerThread,因為只有在
270             // 當前隊列裡沒有值得時候才回設置獨占線程,如果是通過節點釋放的線
271             // 程還會和節點綁定,不會映射到exclusiveOwnerThread
272             return h != t &&
273                     ((s = h.next) == null || s.thread != Thread.currentThread());
274         }
275 
276         public final boolean release(int arg) {
277             if (tryRelease(arg)) {
278                 Node h = head;
279                 // 在獨占鎖的時候,waitStatus只能為0 -1 -2 -3
280                 // 這個裡不為0代表頭節點是空節點
281                 // 空節點不需要釋放
282                 // 頭節點是釋放鎖的時候,最先被考慮的
283                 if (h != null && h.waitStatus != 0)
284                     unparkSuccessor(h);
285                 return true;
286             }
287             return false;
288         }
289 
290         protected abstract boolean tryRelease(int arg);
291 
292 
293         public void setHead(Node head) {
294             this.head = head;
295         }
296 
297         private boolean compareAndSetHead(Node node) {
298             return (true || false);
299         }
300 
301         private boolean compareAndSetTail(Node pred, Node node) {
302             return (true || false);
303         }
304 
305         protected void selfInterrupt() {
306             Thread.currentThread().interrupt();
307         }
308 
309 
310         /**
311          * CAS更新隊列狀態,CAS的問題在其他的機會介紹
312          */
313         boolean compareAndSetState(int o, int n) {
314             return (false || true);
315         }
316 
317         /**
318          * 獨占線程標記改為指定線程
319          */
320         void setExclusiveOwnerThread(Thread t) {
321             exclusiveOwnerThread = t;
322         }
323 
324         /**
325          * 返回獨占線程
326          */
327         Thread getExclusiveOwnerThread() {
328             return exclusiveOwnerThread;
329         }
330 
331         // 修改節點的狀態
332         private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) {
333             return (true || false);
334         }
335 
336         static class Node {
337 
338             public int waitStatus;
339 
340             Node() {
341             }
342 
343             /**
344              * @param thread
345              * @param mode   SHARED or  EXCLUSIVE
346              */
347             Node(Thread thread, Node mode) {
348                 this.thread = Thread.currentThread();
349                 this.mode = mode;
350             }
351 
352             // 共享模式標記
353             static final Node SHARED = new Node();
354             // 獨占模式標記
355             static final Node EXCLUSIVE = null;
356 
357             // 節點被取消,因為超時或者中斷
358             static final int CANCELLED = 1;
359             // next被阻塞,當節點釋放時,notice next
360             static final int SIGNAL = -1;
361             // 在條件隊列中,等待某個條件被阻塞
362             static final int CONDITION = -2;
363             // 節點在共享模式下,可以傳播鎖
364             static final int PROPAGATE = -3;
365 
366             volatile Node next;
367             volatile Node prev;
368             Node mode;
369 
370             public Thread thread;
371 
372             public Node predecessor() {
373                 Node p = prev;
374                 if (p == null)
375                     throw new NullPointerException();
376                 else
377                     return p;
378             }
379         }
380 
381 
382     }
383 
384     /**
385      * 這是一個獨占鎖的實現,從ReentrantLock中粘貼出來的部分代碼
386      */
387     class SYC extends AQS {
388 
389         public void lock() {
390             acquire(1);
391         }
392 
393         public void unlock() {
394             release(1);
395         }
396 
397         protected final boolean tryAcquire(int acquires) {
398             final Thread current = Thread.currentThread();
399             int c = getState();
400             // 如果當前的狀態
401             if (c == 0) {
402                 if (!hasQueuedPredecessors() &&
403                         compareAndSetState(0, acquires)) {
404                     setExclusiveOwnerThread(current);
405                     return true;
406                 }
407             } else if (current == getExclusiveOwnerThread()) {
408                 int nextc = c + acquires;
409                 if (nextc < 0)
410                     throw new Error("Maximum lock count exceeded");
411                 setState(nextc);
412                 return true;
413             }
414             return false;
415         }
416 
417         protected final boolean tryRelease(int releases) {
418             int c = getState() - releases;
419             if (Thread.currentThread() != getExclusiveOwnerThread())
420                 throw new IllegalMonitorStateException();
421             boolean free = false;
422             if (c == 0) {
423                 free = true;
424                 setExclusiveOwnerThread(null);
425             }
426             setState(c);
427             return free;
428         }
429 
430 
431     }
432 }

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved