程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 適用於 Java 程序員的 CSP ,第 2 部分

適用於 Java 程序員的 CSP ,第 2 部分

編輯:關於JAVA

CSP 是對並發對象之間的復雜交互進行建模的范式。使用 CSP 的主要優勢之一是:對程序每一階段所包含對象的行為進行精確地指定和驗證。CSP 的理論和實踐對於並發設計和編程領域有深遠的影響。它是 occam 這樣的編程語言的基礎,對其他語言(例如 Ada)的設計也有影響。就像在本文 第 1 部分 簡要討論過的,由於適合在 Java 平台上進行安全、優雅的多線程編程,所以 CSP 對 Java 開發人員也是無價的。

在我的這篇由三部分組成的 Java 平台 CSP 編程介紹的第 2 部分中,我把重點放在 CSP 理論和實踐上,特別是它在 Java 語言中多線程程序設計的應用。我將從 CSP 理論的概述開始介紹,然後介紹基於 Java 的 JCSP 庫實現,JCSP 的核心是 CSP 。

CSP 基礎
CSP 的基本構造是進程和進程之間各種形式的通信。CSP 中的每件事都是進程,甚至(子)進程網絡也是進程。但是,在進程之間沒有直接交互 —— 所有交互都通過 CSP 的同步對象(例如各級進程訂閱的通信通道和事件邊界)實現的。

CSP 進程 與典型的 Java 對象不同:封裝在進程組件中的數據 操縱數據的算法都是私有的。也就是說,進程沒有對外可以調用的方法(除了啟動進程必須調用的方法之外),算法只能在進程自己的控制線程內執行。如果把這種方法與 Java 語言中的方法調用進行對比,就可以立即看出 CSP 是如何消除顯式鎖定的需求的:

不要錯過本系列的其余部分!
“適用於 Java 程序員的 CSP”是對通信順序進程(Communicating Sequential Processes —— CSP)進行介紹的由三部分組成的一個系列。CSP 是並發編程的一個范式,它承認了並發編程的復雜性,卻並沒有把復雜性留給開發人員。請參閱本系列的其他部分:

第 2 部分:用 JCSP 進行並發編程

第 3 部分: JCSP 的高級主題

在 Java 語言中,在對象上調用的方法總是在調用者的線程中運行。但也有一個特殊的控制線程是通過系統中的多個對象進行工作的。對於大部分情況來說,對象沒有自己的生命 —— 它們只是在運行線程調用它們的方法時才存在。因此,不同的執行線程可以在同一時間試圖調用同一對象的同一方法,就像 第 1 部分所討論的那樣。顯然,這種情況在 CSP 中永遠不會發生。

通信通道和進程網絡
進程間通信最簡單的機制就是通過通道讀寫數據。CSP 中基本的通道構造是同步的(synchronous)點對點的(point-to-point);也就是說,它不包含內部緩沖,並且把一個進程連接到另外一個進程。從這個基本通道開始,有可能構建多個閱讀器/寫入器通道(即一對多、多對一和多對多)。 http://www.mscto.com

CSP 中的進程構成了復雜系統的基本構造塊 —— 一個進程可以同一個或多個其他進程連接起來(全都設置成並行的),從而構成一個進程網絡。可以把這個網絡本身想像成一個進程,這個進程還可以遞歸地與其他進程、它們自己的網絡或者其他類似東西組合在一起,形成一個為了最好地解決手上問題而設計的復雜排列的金字塔。 軟件開發網

如果單獨考慮,那麼進程僅僅是一個獨立的串行程序,它只與外部 I/O 設備交互。這個程序本身並不需要考慮在 I/O 通道另一端的進程是否存在或對方的性質。

CSP 理論已經在許多基於 Java 的框架中實現了,包括面向 Java 的通信順序進程(Communicating Sequential Processes for Java,JCSP) 庫。

關於 CSP 的更多內容
本文提供了對 CSP 復雜主題的一般性介紹。如果對於深入理論底層的數學機制有興趣,那麼請參閱 C.A.R. Hoare 的原文章以及他針對這一主題撰寫的書。要想獲得 CSP 理論的最新發展(這些年已經做了更新),請參閱 Bill Roscoe 撰寫的書。要想獲得廣泛的參考來源,請參考牛津大學計算機實驗室和 WoTUG 主頁的 CSP 歸檔。還請參閱 參考資料,以獲取所有這些參考和更多內容的鏈接。

軟件開發網

JCSP 庫
JCSP 庫由英國坎特伯雷市肯特大學的 Peter Welch 教授和 Paul Austin 開發(請參閱 參考資料)。對於本文余下的大部分內容來說,我會把重點放在 CSP 概念在 JCSP 中的實現方式上。因為 Java 語言沒有提供對 CSP 構造的自帶支持,所以 JCSP 庫內部使用 Java 語言 實際 支持的、自帶的並發構造,例如 synchronizedwaitnotify。為了幫助您正確地理解 JCSP 的工作方式,我將從這些 Java 構造的角度對 JCSP 庫中某些類的內部實現進行了解釋。

注意,後續章節中的示例基於或來自 JCSP 庫的 Javadoc 文檔,或者基於可以在 JCSP 主頁上得到的演示文稿。

JCSP 中的進程
在 JCSP 中,進程實際上就是實現了 CSProcess 接口的類。清單 1 顯示了這個接口:

清單 1. CSProcess 接口


package jcsp.lang;

public interface CSProcess
{
    public void run();
}
 http://www.mscto.com 

注意,CSProcess 接口看起來就像 Java 語言的 Runnable 接口,而且它也充當著類似的角色。雖然 JCSP 目前是用標准 Java API 實現的,但是並不需要這樣,而且在未來可能真的不需要這樣。出於這個原因,在 JCSP 中沒有直接使用 Runnable 接口。

驗證 JCSP 程序
Peter Welch 教授和其他人構建了一個正式的 CSP 模型,從而可以用 CSP 術語對多線程 Java 程序進行分析,並驗證程序是否會造成導致爭用風險、死鎖和資源耗盡的 bug。因為 JCSP 庫使用模型底部的監視器機制 (即 synchronized()wait()notify()notifyAll()) ,所以基於 JCSP 的應用程序可以用各種軟件工程工具進行驗證,其中包括一些商業化支持的工具。請參閱 參考資料,學習關於 FDR2 的內容,這是一個針對基於 CSP 的程序的模型檢測工具。

JCSP 定義了兩個接口用於從通道讀取對象和向通道寫入對象。從通道讀取對象的接口叫作 ChannelInput ,它只有一個方法,叫作 read()。如果進程調用一個實現 ChannelInput 接口的對象的這個方法,那麼進程會阻塞,直到在通道另一端的進程實際向通道寫入了一個對象。 一旦在通道上有對象可用,對象就被返回給調用進程。類似地,ChannelOutput 接口也只有一個方法,叫作 write(Object o)。如果進程調用 一個實現 ChannelOutput 接口的對象的這個方法,進程也會阻塞,直到通道接受對象。正如前面提到過的,最簡單的通道類型沒有緩沖,所以它在另一端(讀取)的進程調用 read() 之前不會接受對象。

從現在開始,我將使用代碼示例來演示這些和其他 JCSP 構造如何工作。在清單 2 中,可以看到一個非常簡單的進程,它輸出 1 到 100 之間的所有偶數:

清單 2. 生成 1 到 100 之間偶數的進程
import jcsp.lang.*;

public class SendEvenIntsProcess implements CSProcess
{
    private ChannelOutput out;

    public SendEvenIntsProcess(ChannelOutput out)
    {
      this.out = out;
    }

    public void run()
    {
      for (int i = 2; i <= 100; i = i   2)
      {
        out.write (new Integer (i));
      }
    }
}

與每一個寫進程對應,必須有一個讀進程。如果不存在這樣的進程,則會造成 SendEvenIntsProcessChannelOutput 對象的 out 進行第一次寫操作之後立即無限期阻塞。清單 3 演示了一個簡單的讀進程,該進程與清單 2 介紹的寫進程對應:

清單 3. 對應的消費者進程


import jcsp.lang.*;

public class ReadEvenIntsProcess implements CSProcess
{
    private ChannelInput in;
    public ReadEvenIntsProcess(ChannelInput in)
    {
      this.in = in;
    }

    public void run()
    {
      while (true)
      {
        Integer d = (Integer)in.read();
        System.out.println("Read: "   d.intValue());
      }
    }
}
 軟件開發網 

JCSP 中的通道
到目前為止,我只有兩個獨立的進程。下一步就是使用一個用作共享同步機制的公共通道把它們聯系在一起,然後從中剔除一個進程。channel 接口是 JCSP 的 ChannelInputChannelOutput 接口的子接口,是讀取和寫入對象的公共接口。這個接口有許多可能的實現,就像下面描述的一樣:

    One2OneChannel,顧名思義,實現了“單一寫入器/單一閱讀器”類型的通道。

    One2AnyChannel 實現了“單一寫入器/多閱讀器”對象通道。(注意,這不是廣播機制,實際上,為了從通道讀取對象,多個閱讀器要進行相互競爭;在指定時間只有一個閱讀器能使用通道和寫入器進行溝通。)

    Any2OneChannel 實現了 “多寫入器/單一閱讀器”對象通道。同上面的情況一樣,寫入進程彼此競爭使用通道。在指定時間,只有閱讀器和眾多寫入器中的一個在實際使用通道。

    Any2AnyChannel 實現了“多寫入器/多閱讀器”對象通道。讀取進程彼此競爭使用的通道,寫入進程也一樣。在指定時間只有一個閱讀器和一個寫入器在實際使用通道。

    在清單 3 的示例中,我只有一個寫入器進程和一個閱讀器進程,所以 One2OneChannel 類就足夠了。驅動器程序的示例代碼如清單 4 所示:

    清單 4. 驅動器程序
    
    import jcsp.lang.*;
    
    public class DriverProgram
    {
        public static void main(String[] args)
        {
          One2OneChannel chan = new One2OneChannel();
          new Parallel
          (
            new CSProcess[]
    	    {
    	      new SendEvenIntsProcess (chan),
    	      new ReadEvenIntsProcess (chan)
    	    }
          ).run ();
        }
    }
      

    正如代碼表示的,我首先實例化一個新的 One2OneChannel 對象,然後把它傳遞給 SendEvenIntsProcessReadEventIntsProcess 進程的構造函數。這樣做是因為 One2OneChannel 同時實現了兩個接口 —— ChannelInputChannelOutput

    通道內部
    因為通道在 JCSP 中是重要的概念,所以在進行下一步之前,要確定您確實理解了它們的工作方式。正如我在前面提到的,通道在默認情況下是非緩沖的,但是也可以把它們變成緩沖的。實現方式是:通道本身並不處理緩沖特性,而是把這個責任委托給其他類,其他類必須實現叫作
    ChannelDataStore 的接口。JCSP 為這個接口提供了多個內置實現,其中包括以下幾個實現:

      ZeroBuffer,對應默認的非緩沖特性。

      Buffer,為與之相關聯的通道提供了一個阻塞的先進先出的緩沖語義。

      InfiniteBuffer,也提供先進先出語義,但是如果緩沖為空,那麼可以將閱讀器阻塞。寫入器永遠不會阻塞,因為緩沖可以無限擴展,或者至少到了底層內存系統設置的限制為止。

      通道實戰
      考慮一個實際使用的通道示例。當我創建了如清單 4 所示的 One2OneChannel 實例時,我把它內部的 ChannelDatasource 設置成 ZeroBuffer 的一個新實例。ZeroBuffer 只能保存一個對象(或整數)。它有一個內部狀態變量,該變量的起始值為 EMPTY,只要放進一個對象,該變量的值就變成 FULL 了。

      SendEvenIntsProcess 進程在它的 out 通道上進行 write 操作時,會發生什麼呢?One2OneChannel 類的 write() 方法是一個 synchronized() 方法。因此,發送方進程運行所在的線程(很快就會看到發送方進程和閱讀器進程運行在獨立的線程中)就會得到與這個通道實例相關聯的監視器鎖,並繼續處理方法。在該方法中,業務的第一個順序就是調用內部持有的 ZeroBuffer 實例的 put 方法,把對象(或者在這個示例中是整數)寫到 ZeroBuffer 實例。這樣就把緩沖的狀態變成 FULL。這時,調用線程調用 wait,造成線程進入監視器的 等候集,後面進行的操作是釋放監視器鎖和阻塞線程。

      稍後,閱讀器線程調用通道上的 read 操作(這也是一個同步的方法,所以閱讀器線程在繼續處理之前必須得到監視器鎖)。因為內部緩沖的狀態是 FULL,所以可用數據將被返回,並發出一個 notify()notify() 喚醒發送方線程,然後發送方線程退出監視器等候集,並重新申請監視器鎖。

      http://www.mscto.com

      在反過來的場景中,如果閱讀器線程調用通道上的 read 方法時,通道的內部緩沖狀態是 EMPTY,那麼閱讀器線程就不得不 wait,在這種情況下,發送方線程要在把數據對象寫入內部緩沖之後通知閱讀器線程。

      Parallel 構造
      在 清單 4 中,您可能已經注意到驅動器程序引入了一個新類,叫作 ParallelParallel 類是由 JCSP 以預定義 CSProcess 的形式提供的,它接受一組獨立的 CSProcess 實例,並“平行地”運行它們 (除了最後一個之外,所有進程都在獨立的線程中運行;最後一個進程由 Parallel 對象在自己的控制線程中運行)。 Parallel 進程的 run 方法只有在所有的部件進程終止的時候才終止。所以 Parallel 進程是一種把多個獨立進程組織起來的機制,它用通道(在驅動器程序中示例中)作為“線”把進程連在一起。

      了解 Parallel 構造的另一個途徑是說:它可以把小的、簡單的組件組合成更高層次的進程。實際上,Parallel 允許通過迭代把前面迭代中創建的組件與新的組件連接起來,創建出任意復雜程度的完全連接的進程網絡。生成的進程網絡可以像一個 CSProcess
      對象一樣公開和使用。

      Parallel 示例
      JCSP 庫提供了一組即插即用的組件,不過僅僅是出於教育的目的,正好適合我的目的:進入其中幾個的內部實現,可以很好的表現如何在 JCSP 中組合成網絡化的並發進程。我用下面的示例進程來表現 JCSP 中 Parallel 構造的內部工作方式:

        PlusInt 在兩個輸入流中都接受整數,把整數加在一起,然後把結果輸出到輸出流。

        Delta2Int 平行地把到達它的輸入流的每個整數廣播到它的兩個輸出通道。

        PrefixInt 在它的整數輸入流之前加上一個(用戶配置的)整數。(也就是說,在這個進程的輸出通道上有整數可用之前,第一個輸出是預先配置的整數。後面的輸出才是從輸入流得到的整數。)

        IntegrateInt 是一個用 Parallel 構造組合了前三個進程的進程。它的功能是輸出來自它的輸入通道的整數的中間匯總值。

        IntegrateInt 類的 run 方法如清單 5 所示:

        清單 5. IntegrateInt 進程
        
        import jcsp.lang.*;
        
        public class IntegrateInt implements CSProcess 
        {
          private final ChannelInputInt in;
          private final ChannelOutputInt out;
        
          public IntegrateInt (ChannelInputInt in, ChannelOutputInt out)
          {
            this.in = in;
            this.out = out;
          }
        
          public void run()
          {
              One2OneChannelInt a = new One2OneChannelInt ();
              One2OneChannelInt b = new One2OneChannelInt ();
              One2OneChannelInt c = new One2OneChannelInt ();
        
              new Parallel 
              (
                new CSProcess[]
                {
                  new PlusInt (in, c, a),
                  new Delta2Int (a, out, b),
                  new PrefixInt (0, b, c)
                }
              ).run ();
          }
        }
         軟件開發網 

        注意,與 請單 4 中使用的通道相比,這個示例中使用了不同種類的通道。 IntegrateInt 類使用 ChannelInputIntChannelOutputInt 通道,顧名思義,可以用它們傳遞 int 類型的整數。相比之下,清單 4 中的驅動器程序使用了 ChannelInputChannelOutput,它們是 對象 通道,可以用來在通道中從發送方給接收方發送任意對象。出於這個原因,在清單 4 中傳遞 int 值之前,我不得不把 int 值包裝成 Integer 對象。 http://www.mscto.com

        在清單 5 中,還需要注意觀察什麼呢?實際上,PrefixInt 進程的第一個輸出是 0,它是通過 PlusInt 進程添加到輸入通道到達的第一個整數上的。這個結果被寫入通道 a,它構成了 Delta2Int 進程的輸入通道。Delta2Int 進程把整數結果寫到 out (進程的整體輸出通道)並把它發送到 PrefixInt 進程。然後 PrefixInt 進程把整數作為輸入發送給 PlusInt 進程,並添加到流中的第二個整數,如此類推。

        軟件開發網

        IntegrateInt 進程組成的圖示如圖 1 所示:

        圖 1. IntegrateInt 進程
        IntegrateInt 進程 

        網絡中的網絡
        IntegrateInt
        進程就是這樣由三個小進程組成,它本身可以當作一個復合進程來用。JCSP 庫提供了一個叫作 SquaresInt 的進程,顧名思義,它生成一個整數流,整數流是自然數 (1、2、3、4,等等)的平方。這個進程的代碼如清單 6 所示:

        http://www.mscto.com

        清單 6. SquaresInt 進程

        
        public class SquaresInt implements CSProcess 
        {
          private final ChannelOutputInt out;
        
          public SquaresInt (ChannelOutputInt out)
          {
            this.out = out;
          }
        
          public void run()
          {
              One2OneChannelInt a = new One2OneChannelInt ();
              One2OneChannelInt b = new One2OneChannelInt ();
        
              new Parallel 
              (
                new CSProcess[]
                {
                  new NumbersInt (a),
                  new IntegrateInt (a, b),
                  new PairsInt (b, out)
                }
              ).run ();
          }
        }
         

        我可以肯定您已經注意到清單 6 顯示的兩個新進程。NumbersInt 是一個內置進程,它只是在其輸出通道中輸出從 0 開始的自然數。PairsInt 進程則把連續的一對輸入值相加並輸出結果。這兩個新進程和 IntegrateInt 一起構成了 SquaresInt 進程,如圖 2 中的圖表所示:

        圖 2. SquaresInt 進程
        SquaresInt 進程

        SquaresInt 的工作方式
        在進入下一部分之前,先來考慮 SquaresInt 進程的內部工作方式。在下面可以看到 SquaresInt 內部每個通道上的交通流向:

        
        Channel "a":	[0, 1, 2, 3, 4, 5, 6, 7, 8, ...ad infinitum]
        Channel "b":	[0, 1, 3, 6, 10, 15, 21, 28, 36, ...ad infinitum]
        Channel "out":	[1, 4, 9, 16, 25, 36, 49, 64, 81 ...ad infinitum]
          

        您有沒有看這樣的模式:寫入通道 a 的整數造成它們也被寫入通道 b,因此也寫到通道 out?在第一次“滴答”當中,NumbersInt 進程把整數 0 寫入通道 aIntegrateInt 進程也把整數 0 (是當前匯總的值)寫入通道 bPairsInt 進程在這次滴答中什麼都不產生,因為它需要處理兩個輸入。在第二次滴答中,NumbersInt 進程在它的輸出通道上寫入整數 1。這造成 IntegrateInt 進程把匯總值修改成 0 1=1,所以把整數 1 寫入通道 b

        這時, PairsInt 有了兩個整數輸入可以處理 —— 整數 0 來自前一次滴答,整數 1 來自當前滴答。它把它們加在一起,並把輸出 0 1=1 寫到通道 out。請注意 1 是 1 的平方,所以我們現在可能是在正確的軌道上。繼續把示例前進到下一個(第三個)滴答,NumbersInt 進程把把整數 2 寫入通道 a。這使 IntegrateInt 進程把匯總值更新為 1 (前一個匯總值) 2 (新值) = 3 並把這個整數寫入通道 b

        PairsInt 進程看到最後兩個整數是什麼?它們是 1 (在前一次滴答期間) 和 3 (在當前滴答期間)。所以,進程把這兩個整數加在一起,並把 1 3=4 寫入通道 out。您會注意到 4 是 2 的平方,這意味著 SquaresInt 工作起來就像它應當工作的那樣。實際上,應當繼續運行這個程序到任意數量的滴答,這樣就可以驗證寫入通道 out 的整數總是在序列中的下一個整數的平方。我在下一節精確地這一操作。

        數學問題
        就在您納悶的時候,我想解釋一下生成平方值的數學基礎。假設在 NumbersInt 進程已經把整數輸出到某個 n-1 的時候,您偷看到了箱子內部。IntegrateInt 進程最後生成(而且通過共享通道 b 放到 PairsInt 進程)的中間匯總會是 [1 2 3 ... (n-1)] = (n-1)(n-2)/2

        在下一次滴答期間,NumbersInt 會輸出 n,這造成 IntegrateInt 進程的中間匯總增長為 (1 2 3 ... n) = n(n-1)/2。然後這個匯總會通過共享通道 b 傳給 PairsInt 進程。 PairsInt 會把這兩個數加在一起,生成 [(n-1)(n-2)/2 n(n-1)/2] = [(n-2) n](n-1)/2 = (2n-2)(n-1)/2 = (n-1)exp2

        接下來,NumbersInt 進程會產生(n 1)。與之對應,IntegrateInt 進程會把 n(n 1)/2 送到 PairsInt 進程。然後 PairsInt 會生成 [n(n-1)/2 n(n 1)/2] = nexp2。針對所有的 n 對這進行通用化,就會按照期望的那樣產生全部平方。

        JCSP 中的確定性
        以上示例演示了 CSP 的復合語言 —— 即如何用 Parallel 構造把細致的無狀態的組件組成分層的網絡。所有這類相互通信的平行進程的分層網絡的賣點就是:它們是完全確定的。在這個上下文環境中 確定 意味著什麼呢?它意味著這類分層網絡的輸出只取決於提供給它的輸入,而不用考慮網絡運行的運行時環境(JVM)的特性。也就是說,進程網絡獨立於 JVM 的調度策略,也獨立於它所分布的多處理器。(我在這裡假設的是個單一節點,但是,沒有什麼固有的東西會防礙把這個討論引入物理上分布在多個節點上而在進程之間通過線路進行通信的進程網絡上。)

        確定性會是工具包中的強大工具,因為它可以讓您清晰地推斷出程序的行為,不必擔心運行時環境對它可能產生的影響。同時,確定性不是並發性編程惟一可能的技術或必需的技術。因為下一個(也是最後一個)實例將顯示,非確定性在 JSP 中也是同樣強大的實用概念。 軟件開發網

        JCSP 中的非確定性
        非確定是許多真實的應用程序的因子,在這些應用程序,可見的輸出是某個功能或者事件發生的順序。換句話說,當結果取決於設計的調度,而 不是 取決於事件時,就是非確定性在並發應用程序中發揮作用的地方了。您將會看到,JCSP 顯式地處理這類問題。

        例如,假設一個進程對於下面要做什麼 有許多備選項,每個備選項都有一個與之關聯的 警衛(guard),警衛必須處於“就緒(ready)”狀態,這樣才能讓備選項得以考慮。進程可以從可用的備選項(也就是就緒的)中選擇一個選項;選擇本身可能基於不同的策略,可能是任意選擇、最高優先級選擇或者公平選擇。

        事件選擇策略
        在 JCSP 的特定上下文中,提供了一個叫作 Guard 的抽象類,競爭進程選擇的事件必須繼續它。進程本身使用另一個預先提供的類,叫作 Alternative,這些警衛對象必須以對象數組的形式傳遞給它的構造函數。Alternative 類為三種事件選擇策略提供了方法。

        Alternative 類的 select() 方法對應著 任意選擇 策略。select() 方法調用一直受阻塞,直到一個或多個警衛就緒為止(請記住,所有競爭的警衛對於 Alternative 類來說都是已知的)。其中一個就緒的警衛被隨機選中,它的索引(在傳遞進去的警衛數組中)也被返回。

        priSelect() 方法對應著 最高優先級 策略。也就是說,如果不止一個警衛就緒,則返回索引值最低的那個;這裡面的假設是:在數組中傳遞給 Alternative 構造函數的警衛已經按照優先級順序進行降序排序了。

        最後,方法 fairSelect 是在多個就緒警衛中進行 公平 選擇:在這個方法的連續調用中,在其他就緒而且可用的警衛沒被選中之前,不會有某個就緒的警衛被選中兩次。所以,如果警衛的總數是 n,那麼在最壞的情況下,就緒的警衛沒獲得選中的次數不會連續超過 n 次。

        如果進程不關心如何選擇多個就緒警衛,那麼任意選擇策略最合適;如果進程想保證沒有資源耗盡或者最差服務次數,例如在實時系統中,那麼任意選擇就不太適用了。在前面的情況下,推薦使用 fairSelect 方法,而在後面的情況下,用 priSelect() 方法最好。

        警衛類型
        大體來說,JCSP 提供了三類警衛:

        http://www.mscto.com

          通道警衛 總是對應著進程等候從中讀取數據的通道。也就是說,只有在通道另一端的進程已經輸出數據,而該數據還沒有被進程輸入的時候,警衛才就緒。

          計時器警衛 總是和設置(絕對)超時對應。也就是說,如果超時,則計時器警衛就會就緒。

          跳過警衛 總是就緒。

          JCSP 中的通道警衛 可以是以下類型:AltingChannelInput/AltingChannelInputInt,只要在對應的通道中有了對象或整數數據,則這兩個通道將就緒;或者 AltingChannelAccept,如果在通道中出現不可接受的“CALL”(這一點後面有更多介紹),則通道就會就緒。這些都是抽象類,它們擁有 One2OneAny2One 類型通道形式的具體實現。JCSP 中的計時器 警衛屬於 CSTimer 類型,而 跳過警衛 則是以 Skip 類的形式提供的。

          運作中的警衛
          我用一個簡單的示例,演示如何用 JCSP 警衛實現並發應用程序中的非確定性,借此總結對 JCSP 的介紹。假設您必須開發一個乘法(或者 倍增) 設計,讀取的整數在輸出通道以固定速率到達,可以用某個乘數乘以它們,然後把它們寫入其輸出通道。設備可以用一個初始乘數開始,但是這個乘數每 5 秒鐘自動加倍。 http://www.mscto.com

          這個故事中介紹的方法是這樣的:系統中存在著第二個控制器進程,它能通過專用通道向設備發送 suspend Operation 信號。這使設備中止自身,並把乘數的當前值通過第二個通道發送給控制器。

          在中止的時候,設備只應當允許全部進入的整數不經變化地通過它的輸出通道。控制器進程 —— 可能在用設備發送給它的乘數做了某些計算中的一種 —— 通過專用通道把一個新乘數發送回設備。(請注意:只要設備處於 中止 狀態,就會被迫接受這個乘數。)

          更新過的乘數插入到設備,並充當設備的喚醒信號。設備現在繼續執行它的放大操作,用新更新的乘數乘上輸入的整數。計時器這時也重置,所以新的乘數也在 5 秒之後被設置成加倍數值,如此類推。

          圖 3 中的圖表說明了這個放大設備:

          圖 3. 放大設備
          ScaleInt 進程

          ScaleInt 進程
          放大設備的源代碼在清單 7 中顯示。這個示例中的非確定性是因為:output 的值基於 ininject 流的值(同時還基於這些值到達的順序)。

          清單 7. ScaleInt 進程

          
          import jcsp.lang.*;
          import jcsp.plugNplay.ints.*;
          
          public class ScaleInt implements CSProcess
          {
            private int s;
            private final ChannelOutputInt out, factor;
            private final AltingChannelInputInt in, suspend, inject;
          
            public ScaleInt (int s, AltingChannelInputInt suspend, AltingChannelInputInt in, 
              ChannelOutputInt factor, AltingChannelInputInt inject, ChannelOutputInt out)
            {
              this.s = s;
          	this.in = in;
          	this.out = out;
          	this.suspend = suspend;
          	this.factor = factor;
          	this.inject = inject;
            }
          
            public void run()
            {
          	final long second = 1000;               // Java timings are in millisecs
          	final long doubleInterval = 5*second;
          	final CSTimer timer = new CSTimer ();
          
          	final Alternative normalAlt = new Alternative (new Guard[] {suspend, timer, in});
          	
          	final int NORMAL_SUSPEND=0, NORMAL_TIMER=1, NORMAL_IN = 2;
          
          	final Alternative suspendedAlt = new Alternative (new Guard[] {inject, in});
          	
          	final int SUSPENDED_INJECT=0, SUSPENDED_IN = 1;
          	
          	long timeout = timer.read ()   doubleInterval;
          	timer.setAlarm (timeout);
          
          	while (true)
          	{
          	  switch (normalAlt.priSelect ())
          	  {
          		case NORMAL_SUSPEND:
          		  suspend.read ();              // don't care what's sent
          		  factor.write (s);             // reply with the crucial information
          		  boolean suspended = true;
          		  while (suspended)
          		  {
          		    switch (suspendedAlt.priSelect ())
          			{
          			  case SUSPENDED_INJECT:    // this is the resume signal as well
          			    s = inject.read ();     // get the new scaling factor
          				suspended = false;      // and resume normal Operations
          				timeout = timer.read ()   doubleInterval;
          				timer.setAlarm (timeout);
          				break;
          			  case SUSPENDED_IN:
          			    out.write (in.read ());
          				break;
          			}
          		  }
          		  break;
          		case NORMAL_TIMER:
          		  timeout = timer.read ()   doubleInterval;
          		  timer.setAlarm (timeout);
          		  s = s*2;
          		  break;
          		case NORMAL_IN:
          		  out.write (s * in.read ());
          		  break;
          	  }
              }
            }
          }
          
          import jcsp.lang.*;
          import jcsp.plugNplay.ints.*;
          
          public class Controller implements CSProcess
          {
            private long interval;
            private final ChannelOutputInt suspend, inject;
            private final ChannelInputInt factor;
          
            public Controller (long interval, ChannelOutputInt suspend, ChannelOutputInt inject, 
              ChannelInputInt factor)
            { 
              this.interval = interval;
              this.suspend = suspend;
              this.inject = inject;
              this.factor = factor;
            }
          
            public void run ()
            {
          	int currFactor = 0;
          	final CSTimer tim = new CSTimer ();
          	long timeout = tim.read ();
          	while (true)
          	{
          	  timeout  = interval;
          	  tim.after (timeout);        // blocks until timeout reached
          	  suspend.write (0);          // suspend signal (value irrelevant)
          	  currFactor = factor.read ();			
          	  currFactor   ;              // compute new factor
          	  inject.write (currFactor);  // inject new factor
          	}
            }
          }
          
          import jcsp.lang.*;
          import jcsp.plugNplay.ints.*;
          
          public class DriverProgram
          {
            public static void main(String args[])
            {
          	try
          	{
          	  final One2OneChannelInt temp = new One2OneChannelInt ();
          	  final One2OneChannelInt in = new One2OneChannelInt ();
          	  final One2OneChannelInt suspend = new One2OneChannelInt ();
          	  final One2OneChannelInt factor = new One2OneChannelInt ();
          	  final One2OneChannelInt inject = new One2OneChannelInt ();
          	  final One2OneChannelInt out = new One2OneChannelInt ();
          		
          	  new Parallel
          	  (
          		new CSProcess[]
          		{
          		  new NumbersInt (temp),
          		  new FixedDelayInt (1000, temp, in),
          		  new ScaleInt (2, suspend, in, factor, inject, out),
          		  new Controller (6000, suspend, inject, factor),
          		  new PrinterInt (out, "--> ", "\n")
          		}
          	  ).run ();
          	}
          	catch (Exception e)
          	{
          		e.printStackTrace();
          	}
            }
          }
          

          上面的類 ScaleInt 對應著放大設備。正如前面提到的,這個類必須實現 CSProcess 接口。因為上面的代碼演示了許多概念,所以我將逐個討論它的不同方面。

          兩個備選項
          ScaleInt 類中,我們感興趣的第一個方法是 run()。在 run() 方法中,要做的第一件事是創建 Alternative 類的兩個實例,每個都有一組不同的 Guard 對象。 軟件開發網

          第一個 Alternative 實例由變量 normalAlt 表示,它是為設備正常操作的時候使用的。與之關聯的警衛列表如下所示:

            suspendOne2OneChannelInt 的實例。正如前面提到過的,One2OneChannelInt 實現了單一閱讀器/寫入器整數通道,通道是零緩沖、完全同步的。這是控制器進程向設備發送中止信號的通道。

            timerCSTimer 的實例,它被設置成每 5 秒觸發一次,每次觸發時,設備會把乘數的當前值加倍。

            inOne2OneChannelInt 的實例,設備通過它接收輸入的整數。

            第二個 Alternative 實例由 suspendedAlt 表示,它是供設備在已經被 Controller 中止的情況下使用的。與之關聯的警衛如下如示: 軟件開發網

              injectOne2OneChannelInt 的實例,由控制器進程使用,用來向設備發送新的乘數(也充當喚醒信號)。

              in 是前面已經看到的 One2OneChannelInt 相同的實例;設備通過這個通道接收輸入整數。

              兩個 Alternative 實例被用在不同的情況下等候警衛就緒,列表的順序是隱式的優先級順序。例如,如果 normalAltsuspendtimer 警衛恰好同時就緒,那麼和 suspend 警衛對應的事件首先被處理。

              警衛就緒
              下一個我們感興趣的是在每個警衛就緒的時候,發生了什麼。我首先研究 normalSelect,假設設備操作正常(也就是說,還沒有被中止):

                如果控制器向設備發送了 suspend 信號,那麼這個事件以最高優先級得到處理。作為響應,設備把乘數的當前值通過叫作 factor 的通道發送給控制器。然後將叫作 suspended 的內部標志設置為 true,然後進入循環,等候別人發送信號,以繼續其操作。在循環內部,設備調用第二個 Alternative 實例上的 priSelect() 方法 (suspendedAlt)。

                這個 Alternative 實例包含兩個警衛:第一個表示控制器向設備發送乘數的事件,第二個表示整數到達設備的輸入通道。在前一種情況下,設備用從 inject 通道讀取的值來更新乘數(保存在變量 s 中),並將 suspended 標志設置回 false (這樣就保證了在下一次迭代時可以退出內部循環),用當前計時器的值作為基值重新設置鬧鐘。在後一種情況下,設備只是從它的輸入通道讀取整數,並把整數寫入輸出通道(也就是說,在設備中止時,不許使用乘數的要求)。

                具有下一個優先級得到處理的事件是鬧鐘到期事件。這造成設備把當前乘數加倍,用當前計時器的值作為基值重新設置鬧鐘,然後返回,繼續等候下一個事件。 軟件開發網

                第三個可能是事件是從設備的輸入通道接收整數的事件。與之對應的是,設備讀取整數,用當前乘數 s
                乘上它,並將結果寫入設備的輸出通道。

                Controller 類
                下一個要考慮的類是 Controller 類。請記住,控制器類的任務是周期性地(大概是基於復雜的計算)向設備進程插入乘數值。在這個示例中,周期基礎只是一個計時器,該計時器按照規律的、配置好的間隔到期。每次到期時,控制器就在 suspend 上寫一個 0(也就是說,它將中止設備),並在叫作 factor 的輸入通道上讀取當前的乘數。 http://www.mscto.com

                這時,控制器只是把這個值加一,然後通過一對一通道 (叫作 inject,專門用於為這個目的) 將它插回設備。這就通知設備繼續工作的方式,這時計時器被重新設置成在適當間隔後到期。

                DriverProgram 類
                最後剩下的類是驅動器類 DriverProgram。這個類創建適當的通道和 CSProcess 實例數組。它用 JCSP 提供的類 NumbersInt 生成一系列自然數,通過temp 通道傳遞給另一個叫作 FixedDelayInt 的內置類。顧名思義,FixedDelayInt 將來自其輸入通道的值在固定延遲(在示例代碼中,該延遲是 1 秒)之後發送到它的輸出通道。

                這個自然數的流每隔一秒就被發送到 ScaleInt 進程的 in 通道。ScaleInt 進程的 out 通道的輸出傳遞給 JCSP 提供的 PrinterInt 進程,然後該進程再接著把整數值輸出到 System.out

                http://www.mscto.com

                第 2 部分的結束語
                在這個由三部分組成的介紹適用於 Java 程序員的 CSP 的系列文章的第 2 部分中,我解釋並演示了並發編程中的 CSP 理論。然後是對 CSP 構造的概述,其中介紹了最流行的基於 Java 的 CSP 庫 —— JCSP。由於 Java 語言沒有對 CSP 構造提供自帶的支持,所以 JCSP 庫內部采 Java 支持 的自帶構造,例如 synchronized()wait()notify()。為了幫助您正確地理解 JCSP 是如何工作的,我從 Java 構造的角度解釋了一些 JCSP 類庫的內部實現,然後在幾個實際示例中演示了它們的用法。

                這裡所進行的討論可以作為本系列最後一篇文章的絕好基礎。在最後一篇文章中,我將解釋 CSP 和 AOP 的相似性,並簡要地對 CSP 解決並發性的方法和新的 Java.util.concurrent 包解決並發性的方法進行比較,還將介紹許多用 JCSP 進行高級同步 的技術。

                致謝
                非常感謝 Peter Welch 教授在我編寫這個文章系列期間給予的鼓勵。他在百忙之中抽出時間非常細致地審閱了草稿,並提供了許多寶貴的提高系列質量和准確性的建議。文章中如果還存在錯誤的話,那都是由於我的原因!我在文章中使用的示例基於或來自 JCSP 庫的 Javadoc 中提供的示例,以及 JCSP Web 站點上提供的 PowerPoint 演示文稿。這兩個來源提供了需要探索的大量信息。

                軟件開發網

                參考資料 您可以參閱本文在 developerWorks 全球站點上的 英文原文。

                Brian Goetz 編寫的由三部分組成的 “Threading lightly”是解決 Java 平台上同步問題的巧妙而系統的方法。(developerWorks,2001 年 7 月)

                Allen Holub 撰寫的“如果我是國王:關於解決 Java編程語言線程問題的建議” (developerWorks,2000 年 10 月)是一篇啟蒙性的、至今仍有意義的、關於 Java 平台多線程編程錯誤的概述。

                C.A.R. Hoare 開創性的論文“Communicating Sequential Processes”把通信順序進程的並行組成作為一種基本的編程結構化方法提了出來(
                Communications of the ACM Archive,1978)。

                可以免費獲得 PDF 格式的 C.A.R. Hoare 撰寫的 關於 CSP 的書籍。

                Bill Roscoe 撰寫的 Theory and Practice of Concurrency (Prentice Hall, 1997) 是最關於並發性和 CS 主題的最新書籍。

                牛津大學計算機實驗室負責的 CSP Archive 是學習更多關於 CSP 內容的好地方,除此之外,還有 WoTUG homepage。

                Peter Welch 教授和 Jeremy Martin 合著的“Formal Analysis of Concurrent Java Systems” (iOS Press, 2000) 是在 Java 語言中實踐 CSP 的良好起點。

                JCSP homepage 由英國坎特伯雷市肯特大學負責。

                FDR2 (故障偏差求精,Failures-Divergence Refinement) 是面向基於 CSP 的程序的幾個商業化模型檢測工具之一。

                CSP 的實現可用於 Java 語言之外的其他語言:C CSP 是針對 C 的實現,而 J#.Net 是針對 .Net 的實現。

                Occam-pi 是一個語言平台,它期望用 pi-calculus 的移動特性擴展 occam 語言的 CSP 想法。請從 occam-pi homepage 學習這個尖端的研究。

                在學習 occam 時,您可能還想調查 occam 編程器的各種擴展。

                軟件開發網



                在 developerWorks Java 技術專區 可以找到 Java 編程各方面的文章。

                請參閱 Developer Bookstore,以獲得技術書籍的完整清單,其中包括數百本 Java 相關主題的書籍。

                還請參閱 Java 技術專區教程頁,以獲得 developerWorks 上免費的、以 Java 為重點的教程。

                關於作者
                Abhijit Belapurkar 擁有位於印度德裡市的印度理工學院(IIT)計算機科學的理工學士學位。在過去的 11 年中,他一直工作在分布式應用程序的架構和信息安全領域,他在使用 Java 平台構建 n 層應用程序方面也已經有大約 6 年的工作經驗。他目前作為高級技術架構師在 J2EE 領域工作,服務於印度班加羅爾的 Infosys 科技有限公司。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved