程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 異步代理 - 使用異步代理庫進行基於角色的編程

異步代理 - 使用異步代理庫進行基於角色的編程

編輯:關於.NET

隨著多核處理器在市場上的日益普及,它已廣泛用於服務器、台式機以及便攜 式計算機,代碼並行化的重要性也前所未有地凸顯出來。為了滿足這一關鍵需求 ,Visual Studio 2010 引入了若干新的方法,幫助 C++ 開發人員利用新的並行 運行時和新的並行編程模型帶來的這些功能。然而,開發人員面臨的一個主要障 礙是確定哪種編程模型適合於他們的應用程序。正確的模型可以充分利用底層並 行性,不過也需要重新考慮程序結構和實際的執行方式。

目前,最常見的並行編程模型涉及到通用的並發感知容器以及並行循環迭代等 算法。雖然這些傳統技術功能強大,可擴展應用程序來配合多核計算機使用,但 它們並未解決影響並行性能的其他主要因素之一,那就是不斷加深的延遲影響。 由於並行技術加快計算速度並將計算分布在多個內核之間,因此,Amdahl 定律 (wikipedia.org/wiki/Amdahl's_law) 告訴我們性能改進受到執行速度最慢的那 一部分制約。在許多情況下,等待來自 I/O(例如磁盤或網絡)的數據所花的時 間比例越來越大。

基於角色的編程模型能夠很好地處理延遲等問題,這些模型最初是在二十世紀 七十年代初引入的,目的是利用具有成百上千個獨立處理器的高度並行計算機資 源。角色模型背後的基本概念是將應用程序的各個組件視為單獨的角色,這些角 色可以通過發送、接收和處理消息與外界交互。

最近,隨著大量多核處理器的運用,角色模型已作為一種減少延遲、實現高效 並行執行的有效方法重新露面。Visual Studio 2010 引入了異步代理庫 (AAL), 這是一個令人激動的基於角色的新模型,它具有消息傳遞接口,在該模型中代理 就是角色。AAL 使開發人員可以通過更加以數據流為中心的方式設計自己的應用 程序。這樣的設計通常有利於在等待數據時有效使用延遲。

在本文中,我們將概述 AAL 並介紹如何在應用程序中使用它。

並發運行時

Visual Studio 2010 和 AAL 中並發支持的基礎是新的並發運行時,該運行時 作為 Visual Studio 2010 中 C 運行時 (CRT) 的一部分提供。並發運行時提供 協調任務計劃程序和資源管理器,後者對計算機的底層資源有深入了解。這就允 許運行時以負載平衡的方式在整個多核計算機中執行任務。

圖 1 簡要地展示了 Visual Studio 2010 中對本機代碼並發的支持。計劃程 序是確定何時何地執行任務的主要組件。它借助資源管理器收集的信息來充分地 利用執行資源。盡管應用程序和庫也可以直接與運行時交互,但它們本身主要還 是通過兩個位於計劃程序之上的編程模型(即 AAL 和並行模式庫 (PPL))與並發 運行時交互。

圖 1 並發運行時

PPL 提供更為傳統的並行技術(例如 parallel_for 和 parallel_for_each constructs)、可識別運行時的鎖和並發數據結構(例如隊列和向量)。雖然 PPL 不是本文介紹的重點,但它也是一種功能強大的工具,開發人員可以將其與 AAL 中引入的所有新方法配合使用。有關 PPL 的詳細信息,請參閱 2009 年 2 月刊載的《使用 C++ 的 Windows》專欄 (msdn.microsoft.com/magazine/dd434652)。

相比之下,AAL 能夠在更高級別以不同於傳統技術的角度來並行化應用程序。 開發人員需要從待處理數據的角度思考應用程序,並思考如何將數據處理分隔到 可並行執行的組件或階段中。

AAL 提供兩個主要組件:消息傳遞框架和異步代理。

消息傳遞框架包括一組消息塊,用於接收、處理和傳播消息。通過將消息塊串 連起來,可創建能夠同時執行的工作管道。

異步代理是通過接收消息、在自己維護的狀態下執行本地工作和發送消息,以 此與外界交互的角色。

這兩個組件結合在一起,使開發人員能夠在數據流而不是控制流方面利用並行 性,並通過更高效地使用並行資源來改善對延遲的容忍度。

消息傳遞框架

AAL 的第一個重要組件是消息傳遞框架,該框架是協助開發數據流網絡以便將 工作管道化的一組構造。將工作管道化是數據流模型的基本部分,因為它允許將 工作分解為多個獨立的階段,只要數據就緒便可對流數據進行並行處理。當一個 階段的數據處理結束時,該階段可將數據傳遞到下一階段,同時第一個階段尋找 要處理的新數據。

我們以設置傳出消息格式並審查消息中是否存在不當內容的電子郵件應用程序 為例。這種類型操作的代碼顯示如下:

std::foreach(reader.begin(); reader.end();
  [](const string& word) {
    auto w1 = censor(word);
    auto w2 = format(w1);
    writer.write_word(w2);
  });

對於電子郵件中的每個詞,該應用程序都需要檢查它是否存在於審查詞的字典 中,如果存在則予以替換。然後,代碼根據一組指導原則設置每個詞的格式。

這種方案中存在大量固有的並行性。但是,傳統並行技術還不能滿足要求。例 如,一種簡單的方法是對文本中的字符串使用 parallel_for_each 算法,審查這 些字符串並設置格式。

這種解決方案的第一個主要阻礙是必須讀取整個文件,以便迭代器能夠正確地 劃分工作。

強制讀取整個文件會導致進程受到 I/O 的限制,並且會降低並行 效率。當然,您可以使用智能迭代器將詞的處理與讀取輸入的操作重疊進行。

傳統並行方法的第二個主要問題是排序。顯然,對於電子郵件來說,對文本的 並行處理必須保持文本順序,否則會完全無法理解郵件的含義。為了保持文本順 序,parallel_for_each 技術會產生同步和緩沖方面的大量開銷,而這一過程可 由 AAL 自動處理。

通過采用管道技術處理郵件,您可以避免上述兩個問題,同時還能利用並行能 力。請看一下圖 2,其中創建了一個簡單管道。在此示例中,應用程序的主要任 務(審查和設置格式)被分為兩個階段。第一個階段接收字符串並在審查詞的字 典中查找該字符串。如果找到匹配項,審查塊會使用字典中的另一個詞替換該字 符串。否則,它會輸出已輸入的同一封郵件。同樣,在第二個階段中,格式設置 塊接收每個詞並將其恰當地設置為特定樣式。

圖 2 電子郵件處理管道

此示例可在以下幾個方面從數據流方法獲益。首先,由於它不需要在處理前讀 取整封郵件,郵件中的字符串可以通過審查和設置格式階段立即開始流處理。其 次,管道處理允許一個字符串由設置格式塊進行處理,同時下一個字符串由審查 塊進行處理。最後,由於字符串的處理順序是它們在原文中出現的順序,因此不 需要執行額外的同步。

消息塊

消息塊接收、處理、存儲和傳播消息。消息塊有三種形式:源、目標和傳播器 。源只能傳播消息,而目標能夠接收、存儲和處理消息。大多數塊都是傳播器, 既是源又是目標。換句話說,它們能夠接收、存儲和處理消息,也可以轉而將這 些消息發送出去。

AAL 包含一組消息塊基元,能夠滿足開發人員的大部分使用需求。圖 3 簡要 概述了 AAL 中包括的所有消息塊。不過該模型仍然是開放式的,因此,如果您的 應用程序需要具有特定行為的消息塊,可以自己編寫可與所有預定義塊交互的自 定義塊。每個塊都有各自處理、存儲和傳播消息的獨有特征。

圖 3 AAL 消息塊

消息塊 用途 unbounded_buffer<Type> 存儲不限數量的消息並將其傳播到目標。 overwrite_buffer<Type> 存儲一條消息,每次有新消息傳播進來時都會覆蓋該消息 ,然後將其廣播到目標。 single_assignment<Type> 存儲一條一次寫入的消息,然後將其廣播到目標。 transformer<Input,Output> 接收一條類型為 Input 的消息,然後運行用戶提供的函 數將其轉換為類型為 Output 的消息。

將這條轉換後的消息傳播到 目標。 call<Type> 接收一條消息,然後使用該消息的負載作為參數來運行用 戶提供的函數。

這種塊是純粹的消息目標。 timer<Type> 在用戶定義的時間量之後將消息傳播到目標。

可以是重復或非重復的塊。

這種塊是純粹的消息源。 choice<Type1,Type2,...> 接收來自多種類型的多個源的消息,但只接受來自傳播到 所選類型的第一個塊的消息。 join<Type> 接收來自多個源的消息,將它們組合起來輸出單條消息。

異步等待從各個源輸入的消息准備就緒。 multitype_join<Type1,Type2,...> 接收來自多種類型的多個源的消息,將它們組合起來。

異步等待從各個源輸入的消息准備就緒。

AAL 提供的消息塊基元的一個主要優勢是它們的可組合性。因此,您可以根據 所需行為進行組合。例如,您可以輕松創建將多個輸入添加到一起的塊,方法是 將轉換器塊附加到聯接塊的末尾。當聯接塊成功檢索到來自它的各個源的消息時 ,可將消息傳遞給轉換器,而轉換器將匯總消息負載。

您也可以將重復的計時器塊連接為聯接塊的源。這會形成一個限制消息的塊, 只在計時器塊觸發其消息時允許消息通過。圖 4 中說明了這兩種可組合塊。

圖 4 組合來自基元的加法器塊與消息限制塊
創建消息傳遞管道

現在,我們來看看創建上文所示的消息塊管道的代碼。我們可以用兩個轉換器 消息塊替換此管道,如圖 5 所示。轉換器塊的用途是接收特定類型的消息並對消 息執行用戶定義的函數,這一操作可修改消息負載甚至徹底更改消息類型。例如 ,審查塊將包含字符串的消息作為輸入接收,然後需要對其進行處理。

圖 5 消息塊管道

圖 6 中顯示了創建和連接消息塊的代碼。此代碼從實例化兩個轉換器消息塊 開始。審查塊構造函數中的 C++0x lambda 參數定義轉換函數,該轉換函數在字 典內查找消息的存儲輸入字符串,看看是否應更改為其他字符串。系統返回結果 字符串,然後在審查塊內將其封裝成單條消息並從該塊傳播出去。除非轉換器塊 的輸出是格式設置函數更改過的字符串,否則對於格式設置轉換器塊會采用類似 途徑。

圖 6 簡單消息管道

dictionary dict;

transformer<string, string>
  censor([&dict](const string& s) -> string {

  string result = s;
  auto iter = dict.find(s);

  if (iter != dict.end()) {
    result =  iter->second;
  }

  return result;
});

transformer<string, string>
  format([](const string& s) -> string {

  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }

  return result;
});

censor.link_target(&format);

asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);

兩個塊實例化以後,下一行代碼通過對審查塊調用 link_target 方法,將兩 個塊鏈接到一起。每個源塊和傳播器塊都有 link_target 方法,用於確定源應該 將它的消息傳播到哪些消息塊。

審查塊和格式設置塊鏈接到一起後,轉換函數會處理傳播到審查塊的任何消息 ,生成的消息將隱式傳遞到格式設置塊進行處理。如果消息塊是沒有連接目標的 源或傳播器,它可以按特定於塊的方式存儲消息,直到鏈接了目標或消息被檢索 。

示例代碼的最後三行顯示將消息初始化到塊中以及從塊中檢索消息的過程。 AAL 中有兩種消息初始化 API:send 和 asend。它們分別將消息同步或異步輸入 塊中。

主要區別是,當 send 調用返回時,保證已將消息推送到塊,並且已通過塊將 消息發送到所需目標。asend 調用可以立即返回,並且允許並發運行時計劃傳播 。同樣地,AAL 中有兩種消息檢索 API:receive 和 try_receive。receive 方 法在消息到達前始終處於阻止狀態,而 try_receive 則會在無法檢索消息時立即 返回。

如圖 6 所示,字符串“foo”會異步發送到審查塊。審查塊將接收該消息,檢 查其字符串是否存在於審查詞的字典中,然後將結果字符串傳播到消息中。接著 ,結果字符串被傳遞到格式設置塊,後者接收該字符串,將每個字母變成大寫, 然後由於沒有目標而保留到消息中。當調用 receive 時,將從格式設置塊中獲取 該消息。這樣,假定字典中沒有“foo”,此示例的輸出將是“FOO”。雖然此示 例只是通過網絡推送單個字符串,但可以看到輸入字符串流是如何形成執行管道 的。

請看一下此消息示例,注意消息本身明顯缺少引用。消息只是一個信封,其中 封裝要在數據流網絡中傳遞的數據。消息傳遞本身是通過提供和接受過程來處理 的。當消息塊收到消息時,能夠以任何想要的方式存儲該消息。如果稍後要將消 息發送出去,它會將該消息提供給每個連接的目標。若要真正將消息送出,接收 方必須接受提供的消息,以完成該事務。消息在塊間傳遞的整個過程是由並發運 行時計劃和執行的任務來計劃和處理的。

消息塊傳播

現在,您已了解消息塊是如何創建和關聯在一起的,以及如何將消息初始化到 每個塊中並從中檢索消息。接下來讓我們簡單了解一下消息如何在塊間傳遞,以 及並發運行時如何成為 AAL 的核心。

使用消息塊或 AAL 不一定需要了解此信息,但它有助於加深對消息傳遞協議 工作方式及其使用方式的理解。在本節的其余部分,我將介紹傳播器塊,因為它 們既是源又是目標。顯然,純粹的源塊或純粹的目標塊只是傳播器塊實現的子集 。

在內部,每個傳播器塊都有一個消息輸入隊列和另一個特定於塊的消息存儲容 器。鏈接到此傳播器塊的其他塊會發送存儲在輸入隊列中的消息。

例如,在圖 7 中,審查轉換器塊有一個輸入隊列,該隊列當前存儲包含字符 串 str6 的消息。實際轉換器本身包含兩個消息:str4 和 str5。因為這是轉換 器,所以它的特定於塊的存儲是另一隊列。不同的塊類型可以有不同的存儲容器 。例如,overwrite_buffer 塊只存儲始終會被覆蓋的單條消息。

圖 7 消息傳遞協議

從某個鏈接的源(或 send/asend API)向塊提供消息時,此塊首先會檢查篩 選器函數,以決定是否接受消息。如果決定接受消息,則將消息放入輸入隊列。 篩選器是一個可選函數,可傳遞到返回布爾值的每個目標的構造函數或傳播器塊 中,該布爾值決定是否應接受某個源提供的消息。如果消息被拒絕,該源會繼續 向下一個目標提供消息。

一旦消息放入輸入隊列,它的源塊就不再保留此消息。不過,接受塊尚未准備 好傳播消息。因此在等待處理時,消息可以緩沖到輸入隊列中。

當消息到達某個消息塊的輸入隊列時,並發運行時計劃程序會計劃一個輕型任 務 (LWT)。此 LWT 有雙重目的。首先,它必須將消息從輸入隊列移到塊的內部存 儲中(我們稱之為消息處理)。其次,它還必須嘗試將消息傳播到任意目標(我 們稱之為消息傳播)。

例如,在圖 7 中,輸入隊列中存在提示系統計劃 LWT 的消息。接下來 LWT 會處理消息,方法是先對消息執行用戶提供的轉換器函數,在審查字符串字典中 檢查該消息,然後將它移到塊的存儲緩沖區。

將消息轉移到存儲緩沖區之後,LWT 開始執行傳播步驟,將消息發送到目標設 置格式塊。在這種情況下,由於消息 str4 位於轉換器的前端,它會先傳播到格 式設置塊,然後再傳播下一條消息 str5。同樣的整個過程會在格式設置塊中發生 。

根據消息塊的類型,消息處理方式會有所不同。例如,unbounded_buffer 只 有將消息移到存儲緩沖區的簡單處理步驟。轉換器處理消息的方式是先對消息調 用用戶定義的函數,然後再將其移到存儲緩沖區。其他塊的處理方式甚至更復雜 ,例如聯接,它必須組合來自不同源的多條消息,然後將它們存儲到緩沖區中以 備傳播。

就性能效率而言,AAL 在創建 LWT 方面是智能化的,因此每次只會為每個消 息塊計劃一個 LWT。如果處理 LWT 處於活動狀態時有更多消息到達輸入隊列, LWT 會繼續選取並處理這些消息。因此,如圖 7 所示,如果消息 str7 進入輸入 隊列時轉換器的 LWT 仍在處理,它將選取並處理此消息,而不是啟動新的處理和 傳播任務。

每個消息塊都有各自用於控制處理和傳播的 LWT,這是此設計的核心,它允許 消息傳遞框架按數據流的方式將工作管道化。因為每個消息塊在自己的 LWT 中處 理和傳播消息,所以 AAL 可以將塊彼此分離,並允許跨多個塊執行並行工作。每 個 LWT 必須只將自己的消息傳播到目標塊的輸入隊列,而每個目標僅計劃一個 LWT 來處理自己的輸入。使用單個 LWT 處理和傳播消息可確保為消息塊保持消息 次序。

異步代理

AAL 的第二個主要組件是異步代理。異步代理是粗粒度應用程序組件,專門用 於異步處理較大型的計算任務和 I/O。代理應該可以與其他代理通信,並啟動較 低級別的並行處理。這些代理是隔離的,因為它們對於外界的理解完全包含在類 中,它們可以通過消息傳遞與其他應用程序組件通信。代理本身被計劃為並發運 行時內部的任務。這允許它們配合同時執行的其他工作來阻止和運行。

異步代理有固定生命周期,如圖 8 所示。可以監視和等待此生命周期。綠色 狀態表示運行狀態,而紅色狀態表示終止狀態。開發人員可通過從代理基類派生 的方式創建自己的代理。

圖 8 異步代理生命周期

三種基類函數(start、cancel 和 done)可轉換代理的不同狀態。一旦完成 構造,代理即處於已創建狀態。啟動代理和啟動線程類似。除非對代理調用 start 方法,否則代理不會執行任何操作。此時,代理將根據計劃執行,並進入 可運行狀態。

當並發運行時選取此代理時,它會進入已啟動狀態並繼續運行,直到用戶調用 done 方法,該方法指示它的工作已經完成。已計劃但尚未啟動代理時,調用 cancel 會將代理轉換成已取消狀態,代理將不再執行。

讓我們回顧一下電子郵件篩選示例。在此示例中,管道式消息塊將數據流引入 應用程序,並提高自己並行處理詞語的能力。但是,此示例沒有顯示如何控制處 理電子郵件本身的 I/O,以及如何將它們分解成字符串流,以便管道進行處理。 此外,一旦字符串通過管道,必須進行收集,以便以新的已審查和已設置格式的 狀態重新編寫文本。這就是代理可以發揮作用的地方,目的是幫助容忍 I/O 延遲 差異。

例如,請看一下電子郵件管道的末尾。此時,字符串正由格式設置塊輸出,並 需要寫入郵箱的文件中。圖 9 顯示輸出代理如何捕獲字符串和創建輸出電子郵件 。WriterAgent 的 run 函數接收來自循環中的格式設置塊的消息。

圖 9 代理捕獲格式設置塊的輸出

此應用程序中的大部分處理工作是使用數據流完成的,而 WriterAgent 則顯 示了如何在程序中引入某些控制流。例如,當文件結尾消息到達時,根據接收的 輸入字符串,WriterAgent 必須有不同的行為;它必須知道停止操作。圖 10 中 顯示了 WriterAgent 的代碼。

圖 10 WriterAgent

class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }

  ~WriterAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ...
          );

    string s;
    string eof("EOF");

    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }

    fclose(stream);
    done();
  }

private:

  ISource<string> * m_source;
};

此代碼有幾個值得關注的地方。首先是析構函數中對靜態函數 agent::wait 的調用。可以使用指向任何代理的指針調用此函數,並且此函數會一直處於阻止 狀態,直到代理進入一種終止狀態(done 或 canceled)。雖然並不是所有代理 都需要在析構函數中調用 wait,但多數情況下應讓它完成,這樣可確保析構時代 理不再執行任何代碼。

其次,此代碼的有趣部分是 run 方法本身。此方法定義代理的主執行過程。 在此代碼中,代理正在處理從源(在本例中是格式設置塊)讀取的字符串的寫出 操作。

最後,請注意 run 方法的最後一行,此行是對代理函數 done 的調用。對 done 方法的調用可將代理從運行狀態轉變成完成狀態。在大多數情況下,需在 run 方法末尾調用此方法。不過,在某些情況下,應用程序可能希望使用代理來 設置狀態。例如在數據流網絡中,該網絡在 run 方法生存期後仍應保持活動狀態 。
將所有內容整合在一起

現在,我們已經創建了消息傳遞管道對字符串進行篩選和設置格式,創建了輸 出代理對字符串進行處理,我們可以將具有相似行為的輸入代理附加到輸出代理 。圖 11 舉例說明了此應用程序如何組合到一起。

圖 11 用於處理電子郵件的代理

代理處理的一個優勢是能夠在應用程序中使用異步角色。這樣,當數據到達並 等待處理時,輸入代理將異步開始通過管道發送字符串,輸出代理同樣可以讀取 和輸出文件。這些角色可以完全獨立地開始和停止處理,並且完全由數據驅動。 此類行為在許多情況下非常有用,特別是在延遲驅動和異步 I/O 情況下,例如電 子郵件處理示例。

在此示例中,我添加了另一個代理 ReaderAgent,它與 WriterAgent 工作方 式類似,不同的是,它處理 I/O 以讀取電子郵件並向網絡發送字符串。圖 12 中 顯示了 ReaderAgent 的代碼。

圖 12 ReaderAgent

class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }

  ~ReaderAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;      
    fopen_s( &stream, ...);

    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }

    fclose( stream );

    asend(m_target, string("eof"));
    done();
  }

private:

  ITarget<string> * m_target;
};

現在,我們已經有 ReaderAgent 和 WriterAgent 對程序 I/O 進行異步處理 ,只需將它們鏈接至網絡中的轉換器塊,便可開始處理。將兩個塊鏈接在一起之 後就可輕松完成此任務:

censor.link_target(&format);

ReaderAgent r(&censor);
r.start();

WriterAgent w(&format);
w.start();

ReaderAgent 是通過對審查的引用創建的,因此可以正確地將消息發送到該代 理,而 WriterAgent 是通過對格式設置的引用創建的,因此可以檢索消息。每個 代理都使用啟動 API 進行啟動,該 API 安排代理在並發運行時中執行。每個代 理都在自己的析構函數中調用 agent::wait(this),因此要等到兩個代理都到達 完成狀態才會開始執行。

同步

本文旨在讓讀者初步了解內置於 Visual Studio 2010 中的基於角色的編程和 數據流管道的一些新功能。我們鼓勵您試用一下。

如果深入探索的話,可以了解到許多本文無法一一詳述的其他功能:自定義消 息塊創建和篩選消息等等。MSDN 上的並行計算開發中心 (msdn.microsoft.com/concurrency) 包含更多有關這一令人興奮的新編程模型如 何幫助您以全新方式並行化程序的詳細信息和使用步驟。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved