程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> J2EE >> Java多線程同步設計中使用Metux

Java多線程同步設計中使用Metux

編輯:J2EE

Mutex是互斥體,廣泛地應用在多線程編程中。本文以廣為流程的Doug Lea的concurrent工具包的Mutex實現為例,進行一點探討。在Doug Lea的concurrent工具包中,Mutex實現了Sync接口,該接口是concurrent工具包中所有鎖(lock)、門(gate)和條件變量(condition)的公共接口,Sync的實現類主要有:Mutex、Semaphore及其子類、Latch、CountDown、ReentrantLock等。這也體現了面向抽象編程的思想,使我們可以在不改變代碼或者改變少量代碼的情況下,選擇使用Sync的不同實現。下面是Sync接口的定義:

public interface Sync
{
  public void acquire() throws InterruptedException;
  //獲取許可
  public boolean attempt(long msecs) throws InterruptedException;
  //嘗試獲取許可
  public void release();
  //釋放許可
}
 
通過使用Sync可以替代Java synchronized關鍵字,並提供更加靈活的同步控制。當然,並不是說 concurrent工具包是和Java synchronized獨立的技術,其實concurrent工具包也是在synchronized的基礎上搭建的,從下面對Mutex源碼的解析即可以看到這一點。synchronized關鍵字僅在方法內或者代碼塊內有效,而使用Sync卻可以跨越方法甚至通過在對象之間傳遞,跨越對象進行同步。這是Sync及concurrent工具包比直接使用synchronized更加強大的地方。

注意Sync中的acquire()和attempt()都會拋出InterruptedException,所以使用Sync及其子類時,調用這些方法一定要捕獲InterruptedException。而release()方法並不會拋出InterruptedException,這是因為在acquire()和attempt()方法中可能會調用wait()等待其它線程釋放鎖。而release()在實現上進行了簡化,直接釋放鎖,不管是否真的持有。所以,你可以對一個並沒有acquire()的線程調用release()這也不會有什麼問題。而由於release()不會拋出InterruptedException,所以我們可以在catch或finally子句中調用release()以保證獲得的鎖能夠被正確釋放。比如:

class X
{
  Sync gate; // ...
  public void m()
  {
   try
   {
    gate.acquire();
    // block until condition holds
    try
    {
     // ... method body
    }
    finally { gate.release(); }
   }
   catch (InterruptedException ex) { // ... evasive action }
  }
}
 
Mutex是一個非重入的互斥鎖。Mutex廣泛地用在需要跨越方法的before/after類型的同步環境中。下面是Doug Lea的concurrent工具包中的Mutex的實現。

public class Mutex implements Sync
{
  /** The lock status **/
  protected boolean inuse_ = false;
  public void acquire() throws InterruptedException
  {
   if (Thread.interrupted()) throw new InterruptedException();//(1)
   synchronized(this)
   {
    try
    {
     while (inuse_) wait();
     inuse_ = true;
    }
    catch (InterruptedException ex)
    {
     //(2)
     notify();
     throw ex;
    }
   }
  }
  public synchronized void release()
  {
   inuse_ = false;
   notify();
  }
  public boolean attempt(long msecs) throws InterruptedException
  {
   if (Thread.interrupted()) throw new InterruptedException();
   synchronized(this)
   {
    if (!inuse_)
    {
     inuse_ = true;
     return true;
    }
    else if (msecs <= 0)
     return false;
    else
    {
     long waitTime = msecs;
     long start = System.currentTimeMillis();
     try
     {
      for (;;)
      {
       wait(waitTime);
       if (!inuse_)
       {
        inuse_ = true;
        return true;
       }
       else
       {
        waitTime = msecs - (System.currentTimeMillis() - start);
        if (waitTime <= 0) // (3)
         return false;
       }
      }
     }
     catch (InterruptedException ex)
     {
      notify();
      throw ex;
     }
    }
   }
  }
}

為什麼要在acquire()和attempt(0方法的開始都要檢查當前線程的中斷標志呢?這是為了在當前線程已經被打斷時,可以立即返回,而不會仍然在鎖標志上等待。調用一個線程的interrupt()方法根據當前線程所處的狀態,可能產生兩種不同的結果:當線程在運行過程中被打斷,則設置當前線程的中斷標志為true;如果當前線程阻塞於wait()、sleep()、join(),則當前線程的中斷標志被清空,同時拋出InterruptedException。所以在上面代碼的位置(2)也捕獲了InterruptedException,然後再次拋出InterruptedException。

release()方法簡單地重置inuse_標志,並通知其它線程。

attempt()方法是利用Java的Object.wait(long)進行計時的,由於Object.wait(long)不是一個精確的時鐘,所以attempt(long)方法也是一個粗略的計時。注意代碼中位置(3),在超時時返回。

Mutex是Sync的一個基本實現,除了實現了Sync接口中的方法外,並沒有添加新的方法。所以,Mutex的使用和Sync的完全一樣。在concurrent包的API中Doug給出了一個精細鎖定的List的實現示例,我們這兒也給出,作為對Mutex和Sync使用的一個例子:

class Node
{
  Object item; Node next;
  Mutex lock = new Mutex();
  // 每一個節點都持有一個鎖
  Node(Object x, Node n)
  {
   item = x;
   next = n;
  }
}
class List
{
  protected Node head;
  // 指向列表的頭
  // 使用Java的synchronized保護head域
  // (我們當然可以使用Mutex,但是這兒似乎沒有這樣做的必要
 
  protected synchronized Node getHead()
  { return head; }
  boolean search(Object x) throws InterruptedException
  {
   Node p = getHead();
   if (p == null) return false;
   // (這兒可以更加緊湊,但是為了演示的清楚,各種情況都分別進行處理)
   p.lock.acquire();
   // Prime loop by acquiring first lock.
   // (If the acquire fails due to
   // interrupt, the method will throw
   // InterruptedException now,
   // so there is no need for any
   // further cleanup.)
   for (;;)
   {
    if (x.equals(p.item))
    {
     p.lock.release();
     // 釋放當前節點的鎖
     return true;
    }
    else
    {
     Node nextp = p.next;
     if (nextp == null)
     {
      p.lock.release();
      // 釋放最後持有的鎖
      return false;
     }
     else
     {
      try
      {
       nextp.lock.acquire();
       // 在釋放當前鎖之前獲取下一個節點的鎖
      }
      catch (InterruptedException ex)
      {
       p.lock.release();
       // 如果獲取失敗,也釋放當前的鎖 throw ex;
      }
      p.lock.release();
      // 釋放上個節點的鎖,現在已經持有新的鎖了
      p = nextp;
     }
    }
   }
  }
  synchronized void add(Object x)
  {
   // 使用synchronized保護head域
   head = new Node(x, head);
  }
  // ... other similar traversal and update methods ...
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved