程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> C#消息隊列運用程序 -2

C#消息隊列運用程序 -2

編輯:C#入門知識

在這個數組內部,CWorker 類創建了 CWorkerThread類的一個實現版
本。CWorkerThread 類(將在下面討論)是一個必須繼承的抽象類。導出
類定義了消息的處理方式:
aThreads = new ArrayList();
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
  WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
  wfThread.ProcessName = sfWorker.ProcessName;
  wfThread.ProcessDesc = sfWorker.ProcessDesc;
  wfThread.ThreadNumber = idx;
  wfThread.InputQueue = sfWorker.InputQueue;
  wfThread.ErrorQueue = sfWorker.ErrorQueue;
  wfThread.OutputName = sfWorker.OutputName;
  // 定義輔助類型,並將其插入輔助線程結構
  CWorkerThread wtBase;
  switch (sfWorker.ProcessType)
  {
    case WorkerFormatter.SFProcessType.ProcessRoundRobin:
     wtBase = new CWorkerThreadRoundRobin(this, wfThread);
     break;
    case WorkerFormatter.SFProcessType.ProcessAppSpecific:
     wtBase = new CWorkerThreadAppSpecific(this, wfThread);
     break;
    case WorkerFormatter.SFProcessType.ProcessAssembly:
     wtBase = new CWorkerThreadAssembly(this, wfThread);
     break;
    default:
     throw new Exception("Unknown Processing Type");
  }
  // 添加對數組的調用
  aThreads.Insert(idx, wtBase);
}

  一旦所有的對象都已創建,就可以通過調用每個線程對象的 Start方
法來啟動它們:
foreach(CWorkerThread cThread in aThreads)
  cThread.Start();

  Stop、Pause 和 Continue 方法在 foreach循環裡執行的操作類似。
Stop方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this);

  在類析構函數中將調用 Stop 方法,這樣,在沒有顯式調用 Stop 方
法的情況下也可以正確地終止對象。如果調用了 Stop 方法,將不需要析
構函數。SuppressFinalize方法能夠防止調用對象的 Finalize 方法(析
構函數的實際實現)。

CWorkerThread 抽象類

  CWorkerThread 是一個由 CWorkerThreadAppSpecifc、CWorkerThread
RoundRobin 和 CWorkerThreadAssembly繼承的抽象類。無論如何處理消
息,隊列的大部分處理是相同的,所以 CWorkerThread類提供了這一功能。
這個類提供了抽象方法(必須被實際方法替代)以管理資源和處理消息。

  類的工作再一次通過 Start、Stop、Pause 和 Continue 方法來實現。
在 Start方法中引用了輸入和錯誤隊列。在 .NET 框架中,消息由 System.
Messaging 名稱空間處理:
// 嘗試打開隊列,並設置默認的讀寫屬性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,則將格式化程序設置為 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

  一旦定義了消息隊列引用,即會創建一個線程用於實際的處理函數
(稱為 ProcessMessages)。在 .NET 框架中,使用 System.Threading
名稱空間很容易實現線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();

  ProcessMessages 函數是基於 Boolean值的處理循環。當數值設為
False,處理循環將終止。因此,線程對象的 Stop 方法只設置這一Boolean
值,然後關閉打開的消息隊列,並加入帶有主線程的線程:
// 加入服務線程和處理線程
bRun = false;
procMessage.Join();
// 關閉打開的消息隊列
mqInput.Close();
mqError.Close();

Pause 方法只設置一個 Boolean 值,使處理線程休眠半秒鐘:

if (bPause)
  Thread.Sleep(500);

  最後,每一個 Start、Stop、Pause 和 Continue 方法將調用抽象的
OnStart 、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實現

[1] [2] [3] [4] 下一頁  

的類提供了掛鉤,以捕獲和釋放所需的資源。

  ProcessMessages 循環具有如下基本結構:
●接收Message。
●如果Message具有成功的Receive,則調用抽象ProcessMessage方法。
●如果Receive或ProcessMessage失敗,將Message發送至錯誤隊列中。

Message mInput;
try
{
  // 從隊列中讀取,並等候 1 秒
  mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
  // 將消息設置為 null
  mInput = null;
  // 查看錯誤代碼,了解是否超時
  if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
  {
    // 如果未超時,發出一個錯誤並記錄錯誤號
    LogError("Error: " + mqe.Message);
    throw mqe;
  }
}
if (mInput != null)
{
  // 得到一個要處理的消息,調用處理消息抽象方法
  try
  {
    ProcessMessage(mInput);
  }
  // 捕獲已知異常狀態的錯誤
  catch (CWorkerThreadException ex)
  {
    ProcessError(mInput, ex.Terminate);
  }
  // 捕獲未知異常,並調用 Terminate
  catch
  {
    ProcessError(mInput, true);
  }
}

  ProcessError方法將錯誤的消息發送至錯誤隊列。另外,它也可能引
發異常來終止線程。如果ProcessMessage方法引發了終止錯誤或 CWorker
ThreadException類型,它將執行此操作。

CworkerThread 導出類

  任何從 CWorkerThread中繼承的類都必須提供 OnStart、OnStop、On
Pause、OnContinue和 ProcessMessage 方法。OnStart 和 OnStop方法獲
取並釋放處理資源。OnPause 和 OnContinue 方法允許臨時釋放和重新獲
取這些資源。ProcessMessage方法應該處理消息,並在出現失敗事件時引
發 CWorkerThreadException 異常。

  由於 CWorkerThread構造函數定義運行時參數,導出類必須調用基類
構造函數:
public CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
  : base (v_cParent, v_wfThread) {}

  導出類提供了兩種類型的處理:將消息發送至另一隊列,或者調用組
件方法。接收和發送消息的兩種實現使用了循環技術或應用程序偏移(保
留在消息 AppSpecific屬性中),作為使用哪一隊列的決定因素。此方案
中的配置文件應該包括隊列路徑的列表。實現的 OnStart和 OnStop 方法
應該打開和關閉對這些隊列的引用:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues; idx++)
{
  mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
  mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}

  在這些方案中,消息的處理很簡單:將消息發送必要的輸出隊列。在
循環情況下,這個進程為:
try
{
  mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
  // 如果錯誤強制終止異常
  throw new CWorkerThreadException(ex.Message, true);
}
// 計算下一個隊列號
iNextQueue++;
iNextQueue %= iQueues;

  後一種調用帶消息參數的組件的實現方法比較有趣。ProcessMessage
方法使用 IWebMessage接口調入一個 .NET 組件。OnStart 和 OnStop 方
法獲取和釋放此組件的引用。

  此方案中的配置文件應該包含兩個項目:完整的類名和類所在文件的
位置。按照 IWebMessage接口中的定義,在組件上調用 Process方法。

  要獲取對象引用,需要使用 Activator.CreateInstance 方法。此函
數需要一個程序集類型。在這裡,它是從程序集文件路徑和類名中導出的。
一旦獲取對象引用,它將被放入合適的接口:
private IWebMessage iwmSample;
private string sFilePath, sTypeName;
// 保存程序集路徑和類型名稱
sFilePath = wfThread.OutputName[0];
sTypeName = wfThre

上一頁  [1] [2] [3] [4] 下一頁  

ad.OutputName[1];
// 獲取對必要對象的引用
Assembly asmSample = Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// 定義給對象的必要接口
iwmSample = (IWebMessage)objSample;

  獲取對象引用後,ProcessMessage方法將在 IWebMessage接口上調用
Process 方法:
WebMessageReturn wbrSample;
try
{
  // 定義方法調用的參數
  string sLabel = v_mInput.Label;
  string sBody = (string)v_mInput.Body;
  int iAppSpecific = v_mInput.AppSpecific;
  // 調用方法並捕捉返回代碼
  wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch (InvalidCastException ex)
{
  // 如果在消息內容中發生錯誤,則強制發出一個非終止異常
  throw new CWorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
  // 如果錯誤調用程序集,則強制發出終止異常
  throw new CWorkerThreadException(ex.Message, true);
}
// 如果沒有錯誤,則檢查對象調用的返回狀態
switch (wbrSample)
{
  case WebMessageReturn.ReturnBad:
    throw new CWorkerThreadException
     ("Unable to process message: Message marked bad", false);
  case WebMessageReturn.ReturnAbort:
    throw new CWorkerThreadException
     ("Unable to process message: Process terminating", true);
  default:
    break;
}

  提供的示例組件將消息正文寫入數據庫表。如果捕獲到嚴重數據庫錯
誤,您可能希望終止處理過程,但是在這裡,僅僅將消息標記為錯誤的消
息。

  由於此示例中創建的類實例可能會獲取並保留昂貴的數據庫資源,所
以用 OnPause和 OnContinue 方法釋放和重新獲取對象引用。

檢測設備

  就象在所有優秀的應用程序中一樣,檢測設備用於監測應用程序的狀
態。。NET 框架大大簡化了將事件日志、性能計數器和 Windows管理檢測
設備(WMI )納入應用程序的過程。消息應用程序使用時間日志和性能計
數器,二者都是來自 System.Diagnostics 程序集。

  在 ServiceBase類中,您可以自動啟用事件日志。另外,ServiceBase
EventLog成員支持寫入應用程序事件日志:
EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);

  對於寫入事件日志而不是應用程序日志的應用程序,它能夠很容易地
創建和獲取 EventLog 資源的引用(正如在 CWorker類中所做的一樣),
並能夠使用 WriteEntry 方法記錄日志項:
private EventLog cLog;
string sSource = ServiceControl.ServiceControlName;
string sLog = "Application";
// 查看源是否存在,如果不存在,則創建源
if (!EventLog.SourceExists(sSource))
  EventLog.CreateEventSource(sSource, sLog);
// 創建日志對象,並引用現在定義的源
cLog = new EventLog();
cLog.Source = sSource;
// 在日志中寫入條目,表明創建成功
cLog.WriteEntry("已成功創建", EventLogEntryType.Information);

  .NET 框架大大簡化了性能計數器。對於每一個處理線程、線程導出
的用戶和整個應用程序,這一消息應用程序都能提供計數器,用於跟蹤消
息數量和每秒鐘處理消息的數量。要提供此功能,您需要定義性能計數器
的類別,然後增加相應的計數器實例。

  性能計數器的類別在服務 OnStart方法中定義。這些類別代表兩種計
數器——消息總數和每秒鐘處理的消息數:
CounterCreationData[] cdMessage = new CounterCreationData[2];
cdMessage[0] = new CounterCreationData("Messages/Total", "Total
Messages Processed",
PerformanceCounterType.NumberOfItems64);
cdMessage[1] = new CounterCreationData("Messages/Second",
"Messages Processed a Second",
PerformanceCounterType.RateOfChangePerSecond32);
PerformanceCounterCategory.

上一頁  [1] [2] [3] [4] 下一頁  

Create("MSDN Message Service", "MSDN
Message Service Counters", cdMessage);

  一旦定義了性能計數器類別,將創建 PerformanceCounter 對象以訪
問計數器實例功能。PerformanceCounter對象需要類別、計數器名稱和一
個可選的實例名稱。對於輔助進程,將使用來自 XML文件的進程名稱,代
碼如下:
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Total", sProcessName);
pcMsgSecWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Second", sProcessName);
pcMsgTotWorker.RawValue = 0;
pcMsgSecWorker.RawValue = 0;

要增加計數器的值,僅僅需要調用適當的方法:

pcMsgTotWorker.IncrementBy(1);
pcMsgSecWorker.IncrementBy(1);

最後說明一點,服務終止時,安裝的性能計數器類別應該從系統中刪除:

PerformanceCounterCategory.Delete("MSDN Message Service");

  由於性能計數器在 .NET 框架中工作,因此需要運行一項特殊的服務。
此服務(PerfCounterService)提供了共享內存。計數器信息將寫入共享
內存,並被性能計數器系統讀取。

安裝

  在結束以前,我們來簡要介紹一下安裝以及稱為 installutil.exe的
安裝工具。由於此應用程序是 Windows服務,它必須使用installutil.exe
來安裝。因此,需要使用一個從 System.Configuration.Install 程序集
中繼承的 Installer類:
public class ServiceRegister: Installer
{
  private ServiceInstaller serviceInstaller;
  private ServiceProcessInstaller processInstaller;
  public ServiceRegister()
  {
    // 創建服務安裝程序
    serviceInstaller = new ServiceInstaller();
    serviceInstaller.StartType = ServiceStart.Manual;
    serviceInstaller.ServiceName = ServiceControl.ServiceControl
    Name;
    serviceInstaller.DisplayName = ServiceControl.ServiceControl
    Desc;
    Installers.Add(serviceInstaller);
    // 創建進程安裝程序
    processInstaller = new ServiceProcessInstaller();
    processInstaller.RunUnderSystemAccount = true;
    Installers.Add(processInstaller);
  }
}

  如此示例類所示,對於一個 Windows服務,服務和服務進程各需要一
個安裝程序,以定義運行服務的帳戶。其他安裝程序允許注冊事件日志和
性能計數器等資源。

總結

  從這個 .NET 框架應用程序示例中可以看出,以前只有 Visual C++
程序員能夠編寫的應用程序,現在使用簡單的面向對象程序即可實現。盡
管我們的重點是 C# ,但本文所述的內容也同樣適用於 Visual Basic 和
Managed C++.新的 .NET 框架使開發人員能夠使用任何編程語言來創建功
能強大、可伸縮的 Windows應用程序和服務。

  新的 .NET 框架不僅簡化和擴展了編程的種種可能,還能夠輕松地將
人們經常遺忘的應用程序檢測設備(例如性能監測計數器和事件日志通知)
合並到應用程序中。盡管這裡的應用程序沒有使用 Windows管理檢測設備
(WMI ),但 .NET 框架同樣也可以應用它。

 

上一頁  [1] [2] [3] [4] 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved