程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Active Object並發模式在Java中的應用

Active Object並發模式在Java中的應用

編輯:關於JAVA

簡介:Active Object 是並發編程實踐中典型的設計模式,Active Object 模式的核心是通過解耦合 方法的調用與執行來提高程序的並發度。本文將從典型 Active Object 設計模式入手,從一個新的視角 來探討 Active Object 並發模式在 Java 中的應用。

本文主要從以下兩個方面進行闡述:

使用 C++ 語言,來描述 Active Object 設計模式。

Java 類庫對於這樣一個典型的模式做了很好的類庫層面的封裝,因此對於 Java 的開發者來說,很多 關於該設計模式本身的東西被屏蔽掉了。本文試圖使用 Native C++ 語言,幫助讀者從本質上對 Active Object 設計模式有一個更全面的認識。

結合 C++ 版本的 Active Object 設計模式,引領讀者對於 Active Object 設計模式在 Java 類庫中 的支持,有一個更深刻的認識,幫助讀者正確並且有效地使用這些類庫。

預備知識

並發對象 (Concurrent Object)

在這裡,我們先定義一下,什麼是並發對象。不同於一般的對象,並發對象指的是該對象方法的調用 與方法的執行不在同一個線程內,也即:該對象方法被異步執行。這其實是在多線程編程環境下典型的計 算特征,線程引入了異步。從另一個角度來看,並發對象其實是面向對象的概念應用於多線程計算環境下 的產物。

Active Object 設計模式 C++ 描述

我們將從以下幾個方面來討論 Active Object 模式。

問題描述

我們都知道,基於多線程機制的並發編程模式可以極大提高應用的 QoS(Quality of Service)。典型 的例子如,我們在開發服務器端的應用時,通行的做法就是通過多線程機制並發地服務客戶端提交上來的 請求,以期提高服務器對客戶端的反應度 (Responsiveness)。同時,相比於單線程的應用,並發的多線 程應用相對要復雜得多。在多線程的計算環境裡,並發對象被所有的調用者線程所共享。一般來說,並發 對象的設計實現需要考慮下面的幾個重要因素:

並發對象的任何一次的方法執行,不允許無限地或者長時間阻止其它方法的調用執行,從而影響應用 的 QoS。

由於並發對象被調用者線程所共享,其內部狀態必須保證是線程安全的,必須受限於某些線程同步約 束,並且這些約束對調用者來說是透明的,不可見的。從調用者的角度來看,並發對象與普通對象沒有多 大區別。

並發對象的設計實現,應該能夠透明地利用底層平台所提供的並發機制。這樣做的好處是,當並發對 象運行在諸如多核處理器這類底層硬件平台上時,我們的應用能夠充分挖掘底層平台帶來的並發優勢,以 獲得足夠好的應用性能。

我們使用 Active Object 設計模式來解決這些問題。

Active Object 設計模式的本質是解耦合方法的調用 (Method invocation) 與方法的執行 (Method execution),方法調用發生在調用者線程上下文中,而方法的執行發生在獨立於調用者線程的 Active Object 線程上下文中。並且重要的一點是,該方法與其它普通的對象成員方法對於調用者來說,沒有什 麼特別的不同。從運行時的角度來看,這裡涉及到兩類線程,一個是調用者線程,另外一個是 Active Object 線程,我們會在下面更詳細地談到。

結構

在 Active Object 模式中,主要有以下幾種類型的參與者:

代理 (Proxy) :代理是 Active Object 所定義的對於調用者的公共接口。運行時,代理運行在調用 者線程的上下文中,負責把調用者的方法調用轉換成相應的方法請求 (Method Request),並將其插入相 應的 Activation List,最後返回給調用者 Future 對象。

方法請求:方法請求定義了方法執行所需要的上下文信息,諸如調用參數等。

Activation List:負責存儲所有由代理創建的,等待執行的方法請求。從運行時來看,Activation List 會被包括調用者線程及其 Active Object 線程並發存取訪問,所以,Activation List 實現應當是 線程安全的。

調度者 (Scheduler):調度者運行在 Active Object 線程中,調度者來決定下一個執行的方法請求, 而調度策略可以基於很多種標准,比如根據方法請求被插入的順序 FIFO 或者 LIFO,比如根據方法請求 的優先級等等。

Servant: Servant 定義了 Active Object 的行為和狀態,它是 Proxy 所定義的接口的事實實現。

Future: 調用者調用 Proxy 所定義的方法,獲得 Future 對象。調用者可以從該 Future 對象獲得方 法執行的最終結果。在真實的實現裡,Future 對象會保留一個私有的空間,用來存放 Servant 方法執行 的結果。

執行序列圖

在 Active Object 設計模式中,在參與者之間將發生如下的協作過程:

方法請求的構造與調度。調用者調用 Proxy 的方法 method(),Proxy 創建相應的方法請求,把它傳 給調度者 (Scheduler),調度者負責把該方法請求放入 Activation List 中。如果 method() 需要返回 執行結果,Proxy 返回一個 Future 對象給調用者(圖 1 中步驟 1 到步驟 6)。

方法請求執行。調度者負責從 Activation List 隊列裡按照預先定義的規則拿出下一個可執行的方法 請求,並把該請求綁定到相應 Servant 所定義的方法(圖 1 中步驟 7 到步驟 11)。

完成階段。保存任何 Servant 方法執行的結果到 Future 對象中去(圖 1 中步驟 12)。重復第二步 ,調度者繼續輪詢 Activation List 隊列,看是否有下一個可執行的方法請求。

圖 1. Active Object Sequence Diagram.

從圖 1 我們可以看到,步驟 1 到步驟 6 運行在調用者線程中,而步驟 7 到步驟 12 運行在 Active Object 的線程中。

實現

在本節中,我們給出 Active Object 的 C++ 示例實現。

調用者調用 Proxy 的 get() 方法,從 Active Object 獲得 Message。我們可以假定,在真實的應用 中, get() 方法的實現受制於某些慢速的 IO 操作,比如需要通過 TCP Socket 從遠端的機器獲得 Message, 然後返回給調用者。所以我們使用 Active Object 來實現該應用,通過線程的並發達到提高應 用的 QoS。

實現 Servant,如清單 1 所示:

清單 1. MQ_Servant

class MQ_Servant {
public:
   // Constructor and destructor.
   MQ_Servant (size_t mq_size);
   virtual ~MQ_Servant ();

   // Message queue implementation operations.
   void put (const Message &msg);
   Message get ();

    // Predicates.
   bool empty () const;
   bool full () const;
private:
   // Internal queue representation, e.g., a circular
   // array or a linked list, that does not use any
   // internal synchronization mechanism.
};

MQ_Servant 是真正的服務提供者,實現了 Proxy 中定義的方法。put() 和 get() 方法用來操作底層 的隊列。另外,Servant 的實現是純粹的應用邏輯實現,或者稱為商業邏輯實現,沒有混合任何的線程同 步機制 , 這有利於我們進行應用邏輯的重用,而不需要考慮不同的線程同步機制。

實現 Proxy,如清單 2 所示:

清單 2. MQ_Proxy

class MQ_Proxy {
public:
   // Bound the message queue size.
   enum { MQ_MAX_SIZE = 100 };
   MQ_Proxy (size_t size = MQ_MAX_SIZE)
   :scheduler_ (size),
   servant_ (size) {
   }

   // Schedule <put> to execute on the active object.
   void put (const Message &msg) {
     Method_Request *mr = new Put(servant_,msg);
     scheduler_.insert (mr);
   }

   // Return a <Message_Future> as the "future" result of 
   // an asynchronous <get> method on the active object.
   Message_Future get () {
     Message_Future result;
     Method_Request *mr = new Get (&servant_,result);
     scheduler_.insert (mr);
     return result;
   }

   // empty() and full() predicate implementations ...
private:
   // The servant that implements the active object
   // methods and a scheduler for the message queue.
   MQ_Servant servant_;
   MQ_Scheduler scheduler_;
};

同一個進程中的多個調用者線程可以共享同一個 Proxy。

實現 Method Request,如清單 3 所示:

清單 3. Method_Request

class Method_Request {
public:
   // Evaluate the synchronization constraint.
   virtual bool can_run () const = 0
   // Execute the method.
   virtual void call () = 0;
};
// Inherites from Method_Request
class Get : public Method_Request {
public:
   Get (MQ_Servant *rep, const Message_Future &f)
   :servant_ (rep),
   result_ (f)
   {
   }
   virtual bool can_run () const {
     // Synchronization constraint: cannot call a
     // <get> method until queue is not empty.
     return !servant_->empty ();
   }

   virtual void call () {
     // Bind dequeued message to the future result.
     result_ = servant_->get ();
   }
private:
   MQ_Servant *servant_;
   Message_Future result_;
};

實現 Activation List,如清單 4 所示:

清單 4. Activation_List

class Activation_List {
public:
   // Block for an "infinite" amount of time waiting
   // for <insert> and <remove> methods to complete.
   enum { INFINITE = -1 };

   // Define a "trait".
   typedef Activation_List_Iterator iterator;

   Activation_List ();

   // Insert <method_request> into the list, waiting up
   // to <timeout> amount of time for space to become
   // available in the queue. Throws the <System_Ex>
   // exception if <timeout> expires.
   void insert (Method_Request *method_request,Time_Value *timeout = 0);

   // Remove <method_request> from the list, waiting up
   // to <timeout> amount of time for a <method_request>
   // to be inserted into the list. Throws the
   // <System_Ex> exception if <timeout> expires.
   void remove (Method_Request *&method_request, Time_Value *timeout =  0);

private:
   // Synchronization mechanisms, e.g., condition
   // variables and mutexes, and the queue implementation,
      // e.g., an array or a linked list, go here.
};

Activation List 的實際上就是一個線程同步機制保護下的 Method Request 隊列,對該隊列的所有 操作 (insert/remove) 都應該是線程安全的。從本質上講,Activation List 所基於的就是典型的生產 者 / 消費者並發編程模型,調用者線程作為生產者把 Method Request 放入該隊列,Active Object 線 程作為消費者從該隊列拿出 Method Request, 並執行。

實現 Scheduler,如清單 5 所示:

清單 5. MQ_Scheduler

class MQ_Scheduler {
public:
   // Initialize the <Activation_List> and make <MQ_Scheduler>
   // run in its own thread of control.
      // we call this thread as Active Object thread.
   MQ_Scheduler ()
   : act_list_() {
     // Spawn separate thread to dispatch method requests.
     // The following call is leveraging the parallelism available on native  OS 
     // transparently
     Thread_Manager::instance ()->spawn (&svc_run,this);
   }
   // ... Other constructors/destructors, etc.

   // Put <Method_Request> into <Activation_List>. This
   // method runs in the thread of its client,i.e.
   // in the proxy's thread.
   void insert (Method_Request *mr) {
     act_list_.insert (mr);
   }

   // Dispatch the method requests on their servant
   // in its scheduler's thread of control.
   virtual void dispatch () {
     // Iterate continuously in a separate thread(Active Object thread).
     for (;;) {
       Activation_List::iterator request;
       // The iterator's <begin> method blocks
       // when the <Activation_List> is empty.
       for(request = act_list_.begin (); request != act_list_.end  ();++request){
         // Select a method request whose
         // guard evaluates to true.
         if ((*request).can_run ()) {
           // Take <request> off the list.
           act_list_.remove (*request);
           (*request).call () ;
           delete *request;
         }

         // Other scheduling activities can go here,
         // e.g., to handle when no <Method_Request>s
         // in the <Activation_List> have <can_run>
         // methods that evaluate to true.

       }

     }
   }

private:
   // List of pending Method_Requests.
   Activation_List act_list_;

   // Entry point into the new thread.
   static void *svc_run (void *arg) {
     MQ_Scheduler *this_obj =  static_cast<MQ_Scheduler *> (args);
     this_obj->dispatch ();
   }
};

實現 Future,如清單 6 所示:

清單 6. Message_Future

class Message_Future {
public:
   // Initializes <Message_Future> to
   // point to <message> immediately.
   Message_Future (const Message &message);

   //Other implementatio……

   // Block upto <timeout> time waiting to obtain result
   // of an asynchronous method invocation. Throws
   // <System_Ex> exception if <timeout> expires.
   Message result (Time_Value *timeout = 0) const;
private:
   //members definition here……
};

事實上,對於調用者來說,可以通過以下的方式從 Future 對象獲得真實的執行結果 Message:

同步等待。調用者調用 Future 對象的 result() 方法同步等待,直到後端的 Servant 相應方法執行 結束,並把結果存儲到了 Future 對象中來,result 返回,調用者獲得 Message。

同步超時等待。調用者調用 Future 對象的 result(timeout) 方法。如果過了 timeout 時間之後, 後端的 Servant 相應方法執行仍未結束,則調用失敗,否則,調用者線程被喚醒,result 方法返回,調 用者獲得 Message。

異步查詢。調用者可以通過調用 Future 對象定義的查詢方法 ( 清單 6 沒有提供相應的定義 ),查 看真實的結果是否准備好了,如果准備好了,調用 result 方法,直接獲得 Message。

清單 7 是使用該 Active Object 的示例。

清單 7. Active Object 使用

MQ_Proxy message_queue;

//Optioin 1. Obtain future and block thread until message arrives.
Message_Future future = message_queue.get();
Message msg = future.result();
//Handle received message here
handle(msg);

//2. Obtain a future (does not block the client).
Message_Future future = message_queue.get ();

//The current thread is not blocked, do something else here...
//Evaluate future and block if result is not available.
Message msg = future.result ();
//Handle received message here
handle(msg);

從清單 7 可以看到,MQ_Proxy 對於調用者而言,和一個普通的 C++ 定義的對象並沒有區別,並發的 實現細節已經被隱藏。

Java 對 Active Object 支持

Java JDK 1.3 引入了 java.util.Timer 和 java.util.TimerTask,提供了對 timer-based 並發任務 支持,Timer 和 TimerTask 可以看作是 Active Object 設計模式在 Java 中的實現。不過,在這裡我們 不打算過多討論 Timer 及其 TimerTask。由於 Timer 和 TimerTask 的缺陷性,例如 Timer 使用單線程 執行 TimerTask 導致的 Task 調度時間的不精確性等問題。從 Java1.5 開始,Java 建議使用 ScheduledThreadPoolExecutor 作為 Timer 的替代。

在這裡,我們討論一下自 Java1.5 引入的 Executor Framework。Java1.5 的 Executor Framework 可以看作是 Active Object 設計模式在 Java 中的體現。不過 Java 的 Executor Framework 極大地簡 化了我們前面所討論的 Active Object 所定義的模式。

Java 的 Executor Framework 是一套靈活強大的異步任務執行框架,它提供了標准的方式解耦合任務 的提交與任務的執行。Java Executor 框架中的任務指的是實現了 Runnable 或者 Callable 接口的對象 。Executor 的示例用法如清單 8 所示:

清單 8. Java Executor 示例代碼

public class TaskExecutionTcpServer {
   private static final int NTHREADS = 100;
   private static final Executor exec = Executors.newFixedThreadPool (NTHREADS);

   public static void main(String[] args) throws IOException {
     ServerSocket socket = new ServerSocket(80);
     while (true) {
       final Socket connection = socket.accept();
       Runnable task = new Runnable() {
         public void run() {
           handleRequest(connection);
         }
         public void handleRequest(Socket connection) {
           // Handle the incoming socket connection from
           //individual client.
         }
       };
       exec.execute(task);
     }
   }
} 

在示例 8 中,我們創建了一個基於線程池的 Java Executor, 每當新的 TCP 連接進來的時候,我們 就分配一個獨立的實現了 Runnable 任務來處理該連接,所有這些任務運行在我們創建的有 100 個線程 的線程池上。

我們可以從 Active Object 設計模式的角度來審視一下 Java Executor 框架。Java Executor 框架 以任務 (Task) 為中心,簡化了 Active Object 中的角色分工。可以看到,實現 Runnable 或者 Callable 接口的 Java Executor 任務整合了 Method Request 和 Servant 的角色 , 通過實現 run() 或者 call() 方法實現應用邏輯。Java Executor 框架並沒有顯式地定義 Proxy 接口,而是直接調用 Executor 提交任務,這裡的 Executor 相當於 Active Object 中調度者角色。從調用者的角度來看,這 看起來並不像是在調用一個普通對象方法,而是向 Executor 提交了一個任務。所以,在這個層面上說, 並發的底層細節已經暴露給了調用者。對於 Java 的開發者來說,如果你不擔心這樣的底層並發細節直接 暴露給調用者,或者說你的應用並不需要像對待普通對象一樣對待並發對象,Java 的 Executor 框架是 一個很好的選擇。相反,如果你希望隱藏這樣的並發細節,希望像操縱普通對象一樣操縱並發對象,那你 就需要如本文上節所描述的那樣,遵循 Active Object 設計原則 , 清晰地定義各個角色,實現自己的 Active Object 模式。

總而言之,Java Executor 框架簡化了 Active Object 所定義的模式,模糊了 Active Object 中角 色的分工,其基於生產者 / 消費者模式,生產者和消費者基於任務相互協作。

總結

最後,我們討論一下 Active Object 設計模式的優缺點。

Active Object 給我們的應用帶來的好處:

極大提高了應用的並發性以及簡化了線程同步帶來的復雜性。並發性的提高得益於調用者線程與 Active Object 線程的並發執行。簡化的線程同步復雜性主要表現在所有線程同步細節封裝在調度者內 ( 也就是 Java 的 Executor 對象 ),Active Object 調用者並不需要關心。

在 Active Object 中,方法的執行順序可以不同於方法的調用順序。用 Java 的話說,也就是任務執 行的順序可以不同於任務提交的順序。在一定情況下,這可以幫助優化我們應用的性能,提高應用的 QoS 及其 Responsiveness。在 Java Executor 框架下,你可以根據當前的計算資源,確定優化的執行策略 (Execution Policy),該執行策略的內容包括:任務將分配在多少線程上執行,以什麼順序執行,多少任 務可以同時執行等等。

當然,Active Object 也有缺點:

額外的性能開銷。這涉及到從調用者線程到 Active Object 線程的上下文切換,線程同步,額外的內 存拷貝等。

難於調試。Active Object 引入了方法的異步執行,從調試者的角度看,調試這樣的方法調用不像普 通方法那樣直截了當,並且這其中涉及到了線程的調度,同步等。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved