程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java多線程基礎總結九:Mina窺探(1)

Java多線程基礎總結九:Mina窺探(1)

編輯:關於JAVA

一直以來的多線程的基礎總結都是脫離應用的,但是要說多線程的應用就不能不說Mina。Apache Mina作為一個高性能的Java異步並發網 絡通訊框架,其內部的多線程的設計和實現可謂是學習多線程的良藥。手上的Mina源碼是svn剪下來的最新的代碼,mvn轉化成eclipse項目 後導入mina-core的源碼看看多線程的應用吧。

首先簡單的介紹在org.apache.mina.core.service包裡的核心接口之一:IoService。這個接口是對於服務器端接收連接和客戶端發起連 接這兩種服務的頂層抽象,所以就有了IoAcceptor和IoConnector兩個子接口的繼承與隔離。很拍馬屁的說,從這個小細節可以看到mina架 構的精細。這種程度的接口的隔離最重要的就是對接口內抽象行為的准確劃分,錯誤的劃分接口的職責將使得後面得實現顯得不合理甚至是 錯誤。只是不知道負責抽象的設計人員是否是一次性的抽象成功,如果是只能說牛x。至於交互會話IoSession和實際的I/O操作處理器 IoProcessor 以及底層處理I/O事件的IoHandle這些接口就不廢話了,今天看的是與IoServiceListener有關的多線程應用。 IoServiceListener主要是用來監聽IoService相關的事件,而今日主角--IoServiceListenerSupport則是用來把IoService和對應的 IoServiceListener包裝在一起進行管理的輔助類。先看看其源碼:

Java代碼

public class IoServiceListenerSupport {
   /** The {@link IoService} that this instance manages. */
   private final IoService service;

   /** A list of {@link IoServiceListener}s. */
   private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener> ();

   /** Tracks managed sessions. */
   private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long,  IoSession>();

   /** Read only version of {@link #managedSessions}. */
   private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap (managedSessions);

   private final AtomicBoolean activated = new AtomicBoolean();

   /** Time this listenerSupport has been activated */
   private volatile long activationTime;

   /** A counter used to store the maximum sessions we managed since the listenerSupport has been  activated */
   private volatile int largestManagedSessionCount = 0;

   /** A global counter to count the number of sessions managed since the start */
   private volatile long cumulativeManagedSessionCount = 0;

   /**
    * Adds a new listener.
    *
    * @param listener The added listener
    */
   public void add(IoServiceListener listener) {
     if (listener != null) {
       listeners.add(listener);
     }
   }

   /**
    * @return true if the instance is active
    */
   public boolean isActive() {
     return activated.get();
   }

   /**
    * Calls {@link IoServiceListener#serviceActivated(IoService)}
    * for all registered listeners.
    */
   public void fireServiceActivated() {
     if (!activated.compareAndSet(false, true)) {
       // The instance is already active
       return;
     }

     activationTime = System.currentTimeMillis();

     // Activate all the listeners now
     for (IoServiceListener listener : listeners) {
       try {
         listener.serviceActivated(service);
       } catch (Throwable e) {
         ExceptionMonitor.getInstance().exceptionCaught(e);
       }
     }
   }

   /**
    * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
    *
    * @param session The session which has been created
    */
   public void fireSessionCreated(IoSession session) {
     boolean firstSession = false;

     if (session.getService() instanceof IoConnector) {
       synchronized (managedSessions) {
         firstSession = managedSessions.isEmpty();
       }
     }
   ...

     cumulativeManagedSessionCount ++;
   ...
   }
}

這裡為了說明多線程的應用,我僅僅節選相關的方法和方法片段。從這個類中我們還是能看到豐富的多線程的應用的。下面就開始這盤 菜吧!

首先從全局變量開始看看:CopyOnWriteArrayList,ConcurrentMap,AtomicBoolean,volatile堪稱多線程小動員了。後兩者之前都有 介紹暫時省略,後面的方法分析時會看到使用情景。前兩者都是java.util.concurrent包下的並發集合框架成員。ConcurrentMap是高效的 並發Map實現,主要是采取分段加鎖的機制,默認是16段鎖,所以多線程的競爭大大的降低。只有key的hash 分布在同一段位上線程之間存 在競爭。

而CopyOnWriteArrayList是ArrayList的一個線程安全的變體,變態之處在於對其的所有的可變操作都是對底層的數組的進行的一次新的 復制。看看其實現原來是使用ReentrantLock來保證可變時的線程安全,又多了一個多線程的成員啊。當然使用這個並發集合實現是需要特 殊的情況的(要知道它的每次的可變都是全盤復制,這意味著很大的性能成本):如果遍歷集合的操作次數大大的超過了可變操作時,這時 候它的性能優勢就體現出來了。因為內部的數組使用的是volatile的,所以遍歷查找時都不用同步而能保證可見性,這是鎖同步無法比擬的 。要說mina重視性能從這兒可見一斑,畢竟每個全局變量都是為了性能去選擇相應的同步機制,而不是synchronized通吃天下。

IoServiceListenerSupport的 fireServiceActivated(),fireServiceDeactivated(),fireSessionCreated(), fireSessionDestroyed() 方法均有遍歷listeners的操作,如果這些方法在線程之間頻繁的使用的話,那麼無疑使用CopyOnWriteArrayList 是個很好的解決方案。managedSessions主要是管理IoSession的,需要使用並發Map的數據結構,那麼ConcurrentMap無疑已被證明是相對出 色的並發Map。activated類似一個開關的設計,看看為什麼使用無鎖得AtomicBoolean?

Java代碼

fireServiceActivated() {
     if (!activated.compareAndSet(false, true)) {
       // The instance is already active
       return;
     }
     ...
}

這裡可以看到activated的狀態改變是要依賴其原來的值得,也就是如果使用volatile的話,要判斷之前的是否為false,不滿足則置為 true。這樣的操作對於volatile只能使用鎖同步實現。而AtomicBoolean的“CAS”輕松的使用無鎖同步原語解決了這個問題。 compareAndSet(...)不管有多少個線程執行,只有取得activated最新值得線程才能返回true,其余的都會是false。這就看到mina的 committer還是很糾結性能的。

然後看看那幾個volatile的變量吧。首先是activationTime,這個變量除了有自身的get()可以隨時取得最新的值之外,就在 fireServiceActivated()內出現了:activationTime = System.currentTimeMillis();可以看到僅僅是一次性的賦值而且不依賴其自身的值 。所以完全的滿足線程安全的條件。 largestManagedSessionCount的使用和其類似。

再來看看cumulativeManagedSessionCount,它是一個全局的計數器,負責記錄啟動後所管理得會話數量。除了同樣有get()方法之外就 是出現在fireSessionCreated(...)方法中:cumulativeManagedSessionCount ++;這引起了我的注意,因為這樣的寫法是不能保證線程同步 的!因為volatile的變量根本無法完成“++”的原子操作。“++”是需要依賴其自身的值而進行更改的操作。為此我簡單的寫了個驗證的例 子證明了這個鐵律。

Java代碼

/**
  *
  * @author: yanxuxin 
  * @date: 2010-1-16 
  */
public class VolatileTest {

  public static void main(String[] args) {
  final VolatileSample sample = new VolatileSample();

  Runnable runnable = new Runnable() {
   public void run() {
   for(int i = 0; i < 500; i++) {
    System.out.println(sample.incrementAndGet());
   }
   }
  };

  for(int i = 0; i < 50; i++) {
   new Thread(runnable).start();
  }
  }
}

class VolatileSample {
  private volatile long counter = 0;

  public long incrementAndGet() {
  counter++;
  return get();
  }

  public long get() {
  return counter;
  }
}

使用50個線程每個線程執行500次對counter進行疊加。正確的答案應該是25000,但是很不幸的是僅僅測試幾次就出現了24999和其他的 小於25000的最大值。所以很遺憾的發現mina的這個變量的方案的選擇是有問題的!

這裡應該是使用的是AtomicLong,而對應的fireServiceActivated(...)內的代碼如果不使用API提供的話應該是:

Java代碼

for (;;) {
      long current = cumulativeManagedSessionCount.get();
      long next = current + 1;
      if (compareAndSet(current, next))
        break;
   }

雖然是for循環+CAS的atomic殺手锏,但是不要怕,一般for循環cpu都是一次搞定,極少情況是多次,性能不會有什麼明顯影響。這個 bug可能在數量較少的線程的情況下很難顯現,或者是mina的開發者考慮使用情景故意的寬松線程機制?從嚴謹性來看是我個人認為是不對 的。

最後就是看看一個有趣的synchronized的代碼片段:

Java代碼

/**
    * Close all the sessions
    * TODO disconnectSessions.
    *
    */
   private void disconnectSessions() {
     Object lock = new Object();
     ...
     try {
       synchronized (lock) {
         while (!managedSessions.isEmpty()) {
           lock.wait(500);
         }
       }
     } catch (InterruptedException ie) {
       // Ignored
     }
   }

這個方法是關閉所有的會話,但是為了減少線程鎖的競爭使用了一個很可愛的方式。這裡的lock是個局部變量,所以每個進入這個方法 的線程都會擁有一個lock對象,而synchronized(lock)是迷惑的重點。這個synchronized不會使得進入方法的線程們產生任何的競爭,因為 每個線程都能獲得屬於自己的lock。synchronized的作用就在於lock.wait(500)的調用,就因為wait方法必須要 synchronized的配合,所 以就出現了這個可愛的代碼。這段代碼的主要意圖就是不管幾個線程去關閉所有會話,每個線程都是間隔500ms去檢查 managedSessions是 否為空。這裡沒有對managedSessions使用synchronized的原因就是為了減少線程鎖對 managedSessions的資源獨占,而改用while循環的機 制寧願等待也不為了檢查managedSessions而影響其他線程的工作。很精巧的一個實現!

寫到這裡,算是這個小菜的結尾了,總感覺意猶未盡。mina對高性能並發的目標還是從代碼上得到了一些的體現。當然它的高性能主要 是對NIO的封裝使用,不過這一切都是建立在多線程的並發基礎上的。至於之前無意發現的bug(個人認為)如果mina的committer也有相同的 認知的話,我想以後會有相應的修改的。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved