程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> 關於C# >> C#實現的多線程異步Socket數據包接收器框架

C#實現的多線程異步Socket數據包接收器框架

編輯:關於C#

幾天前在博問中看到一個C# Socket問題,就想到筆者2004年做的一個省級交通流量接收服務器項目,當時的基本求如下:

接收自動觀測設備通過無線網卡、Internet和Socket上報的交通量數據包 全年365*24運行的自動觀測設備5分鐘上報一次觀測數據,每筆記錄約2K大小 規劃全省將有100個左右的自動觀測設備(截止2008年10月還只有30個)

當時,VS2003才發布年多,筆者也是接觸C#不久。於是Google了國內國外網,希望找點應用C#解決Socket通信問題的思路和代碼。最後,找到了兩篇幫助最大的文章:一篇是國人寫的Socket接收器框架,應用了獨立的客戶端Socket會話(Session)概念,給筆者提供了一個接收服務器的總體框架思路;另一篇是美國人寫的,提出了多線程、分段接收數據包的技術方案,描述了多線程、異步Socket的許多實現細節,該文堅定了筆者采用多線程和異步方式處理Socket接收器的技術路線。

具體實現和測試時筆者還發現,在Internet環境下的Socket應用中,需要系統有極強的容錯能力:沒有辦法控制異常,就必須允許它們存在(附加源代碼中可以看到,try{}catch{}語句較多)。對此,筆者設計了一個專門的檢查和清理線程,完成無效或超時會話的清除和資源釋放工作。

依稀記得,國內框架作者的名稱空間有ibm,認為是IBM公司職員,通過郵件後才知道其人在深圳。筆者向他請教了幾個問題,相互探討了幾個技術關鍵點。可惜,現在再去找,已經查不到原文和郵件了。只好借此機會,將本文獻給這兩個素未謀面的技術高人和同行,也盼望拙文或源碼能給讀者一點有用的啟發和幫助。

1、主要技術思路

整個系統由三個核心線程組成,並由.NET線程池統一管理:

偵聽客戶端連接請求線程:ListenClientRequest(),循環偵聽客戶端連接請求。如果有,檢測該客戶端IP,看是否是同一觀測設備,然後建立一個客戶端TSession對象,並通過Socket異步調用方法BeginReceive()接收數據包、EndReceive()處理數據包 數據包處理線程:HandleDatagrams(),循環檢測數據包隊列_datagramQueue,完成數據包解析、判斷類型、存儲等工作 客戶端狀態檢測線程:CheckClientState(),循環檢查客戶端會話表_sessionTable,判斷會話對象是否有效,設置超時會話關閉標志,清楚無效會話對象及釋放其資源 2、主要類簡介

系統主要由3個類組成:

TDatagramReceiver(數據包接收服務器):系統的核心進程類,建立Socket連接、處理與存儲數據包、清理系統資源,該類提供全部的public屬性和方法 TSession(客戶端會話):由每個客戶端的Socket對象組成,有自己的數據緩沖區,清理線程根據該對象的最近會話時間判斷是否超時 TDatagram(數據包類):判

斷數據包類別、解析數據包

3、關鍵函數和代碼

下面簡介核心類TDatagramReceiver的關鍵實現代碼。

3.1  系統啟動

系統啟動方法StartReceiver()首先清理資源、創建數據庫連接、初始化若干計數值,然後創建服務器端偵聽Socket對象,最後調用靜態方法ThreadPool.QueueUserWorkItem()在線程池中創建3個核心處理線程。

/// <summary>
/// 啟動接收器
/// </summary>
public bool StartReceiver()
{
try
{
_stopReceiver = true;
this.Close();
if (!this.ConnectDatabase()) return false;
_clientCount = 0;
_datagramQueueCount = 0;
_datagramCount = 0;
_errorDatagramCount = 0;
_exceptionCount = 0;
_sessionTable = new Hashtable(_maxAllowClientCount);
_datagramQueue = new Queue<TDatagram>(_maxAllowDatagramQueueCount);
_stopReceiver = false; // 循環中均要該標志
if (!this.CreateReceiverSocket()) //建立服務器端 Socket 對象
{
return false;
}
// 偵聽客戶端連接請求線程, 使用委托推斷, 不建 CallBack 對象
if (!ThreadPool.QueueUserWorkItem(ListenClientRequest))
{
return false;
}
// 處理數據包隊列線程
if (!ThreadPool.QueueUserWorkItem(HandleDatagrams))
{
return false;
}
// 檢查客戶會話狀態, 長時間未通信則清除該對象
if (!ThreadPool.QueueUserWorkItem(CheckClientState))
{
return false;
}
_stopConnectRequest = false; // 啟動接收器,則自動允許連接
}
catch
{
this.OnReceiverException();
_stopReceiver = true;
}
return !_stopReceiver;
}

下面是創建偵聽Socket對象的方法代碼。

/// <summary>
/// 創建接收服務器的 Socket, 並偵聽客戶端連接請求
/// </summary>
private bool CreateReceiverSocket()
{
try
{
_receiverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_receiverSocket.Bind(new IPEndPoint(IPAddress.Any, _tcpSocketPort)); // 綁定端口
_receiverSocket.Listen(_maxAllowListenQueueLength); // 開始監聽
return true;
}
catch
{
this.OnReceiverException();
return false;
}
}

3.2 偵聽客戶端連接請求

服務器端循環等待客戶端連接請求。一旦有請求,先判斷客戶端連接數是否超限,接著檢測該客戶端IP地址,一切正常後建立TSession對象,並調用異步方法接收客戶端Socket數據包。

代碼中,Socket讀到數據時的回調AsyncCallback委托方法EndReceiveData()完成數據接收工作,正常情況下啟動另一個異步BeginReceive()調用。

.NET中,每個異步方法都有自己的獨立線程,異步處理其實也基於多線程機制的。下面代碼中的異步套異步調用,既占用較大的系統資源,也給處理帶來意想不到的結果,更是出現異常時難以控制和處理的關鍵所在。

/// <summary>
/// 循環偵聽客戶端請求,由於要用線程池,故帶一個參數
/// </summary>
private void ListenClientRequest(object state)
{
Socket client = null;
while (!_stopReceiver)
{
if (_stopConnectRequest) // 停止客戶端連接請求
{
if (_receiverSocket != null)
{
try
{
_receiverSocket.Close(); // 強制關閉接收器
}
catch
{
this.OnReceiverException();
}
finally
{
// 必須為 null,否則 disposed 對象仍然存在,將引發下面的錯誤
_receiverSocket = null;
}
}
continue;
}
else
{
if (_receiverSocket == null)
{
if (!this.CreateReceiverSocket())
{
continue;
}
}
}
try
{
if (_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))
{
// 頻繁關閉、啟動時,這裡容易產生錯誤(提示套接字只能有一個)
client = _receiverSocket.Accept();
if (client != null && client.Connected)
{
if (this._clientCount >= this._maxAllowClientCount)
{
this.OnReceiverException();
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
else if (CheckSameClientIP(client)) // 已存在該 IP 地址
{
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
else
{
TSession session = new TSession(client);
session.LoginTime = DateTime.Now;
lock (_sessionTable)
{
int preSessionID = session.ID;
while (true)
{
if (_sessionTable.ContainsKey(session.ID)) // 有可能重復該編號
{
session.ID = 100000 + preSessionID;
}
else
{
break;
}
}
_sessionTable.Add(session.ID, session); // 登記該會話客戶端
Interlocked.Increment(ref _clientCount);
}
this.OnClientRequest();
try // 客戶端連續連接或連接後立即斷開,易在該處產生錯誤,系統忽略之
{
// 開始接受來自該客戶端的數據
session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0,
session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);
}
catch
{
session.DisconnectType = TDisconnectType.Exception;
session.State = TSessionState.NoReply;
}
}
}
else if (client != null) // 非空,但沒有連接(connected is false)
{
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
}
}
catch
{
this.OnReceiverException();
if (client != null)
{
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
}
// 該處可以適當暫停若干毫秒
}
// 該處可以適當暫停若干毫秒
}

3.3 處理數據包

該線程循環查看數據包隊列,完成數據包的解析與存儲等工作。具體實現時,如果隊列中沒有數據包,可以考慮等待若干毫秒,提高CPU利用率。

private void HandleDatagrams(object state)
{
while (!_stopReceiver)
{
this.HandleOneDatagram(); // 處理一個數據包
if (!_stopReceiver)
{
// 如果連接關閉,則重新建立,可容許幾個連接錯誤出現
if (_sqlConnection.State == ConnectionState.Closed)
{
this.OnReceiverWork();
try
{
_sqlConnection.Open();
}
catch
{
this.OnReceiverException();
}
}
}
}
}
/// <summary>
/// 處理一個包數據,包括:驗證、存儲
/// </summary>
private void HandleOneDatagram()
{
TDatagram datagram = null;
lock (_datagramQueue)
{
if (_datagramQueue.Count > 0)
{
datagram = _datagramQueue.Dequeue(); // 取隊列數據
Interlocked.Decrement(ref _datagramQueueCount);
}
}
if (datagram == null) return;
datagram.Clear();
datagram = null; // 釋放對象
}

3.4 檢查與清理會話

本線程負責處理建立連接後的客戶端會話TSession或Socket對象的關閉與資源清理工作,其它方法中出現異常等情況,盡可能標記相關TSession對象的屬性NoReply=true,表示該會話已經無效、需要清理。

檢查會話隊列並清理資源分3步:第一步,Shutdown()客戶端Socket,此時可能立即觸發某些Socket的異步方法EndReceive();第二步,Close()客戶端Socket,釋放占用資源;第三步,從會話表中清除該會話對象。其中,第一步完成後,某個TSession也許不會立即到第二步,因為可能需要處理其異步結束方法。

需要指出, 由於涉及多線程處理,需要頻繁加解鎖操作,清理工作前先建立一個會話隊列列副本sessionTable2,檢查與清理該隊副本列列的TSession對象。

/// <summary>
/// 檢查客戶端狀態(掃描方式,若長時間無數據,則斷開)
/// </summary>
private void CheckClientState(object state)
{
while (!_stopReceiver)
{
DateTime thisTime = DateTime.Now;
// 建立一個副本 ,然後對副本進行操作
Hashtable sessionTable2 = new Hashtable();
lock (_sessionTable)
{
foreach (TSession session in _sessionTable.Values)
{
if (session != null)
{
sessionTable2.Add(session.ID, session);
}
}
}
foreach (TSession session in sessionTable2.Values) // 對副本進行操作
{
Monitor.Enter(session);
try
{
if (session.State == TSessionState.NoReply) // 分三步清除一個 Session
{
session.State = TSessionState.Closing;
if (session.ClientSocket != null)
{
try
{
// 第一步:shutdown
session.ClientSocket.Shutdown(SocketShutdown.Both);
}
catch { }
}
}
else if (session.State == TSessionState.Closing)
{
session.State = TSessionState.Closed;
if (session.ClientSocket != null)
{
try
{
// 第二步: Close
session.ClientSocket.Close();
}
catch { }
}
}
else if (session.State == TSessionState.Closed)
{
lock (_sessionTable)
{
// 第三步:remove from table
_sessionTable.Remove(session.ID);
Interlocked.Decrement(ref _clientCount);
}
this.OnClientRequest();
session.Clear(); // 清空緩沖區
}
else if (session.State == TSessionState.Normal) // 正常的會話
{
TimeSpan ts = thisTime.Subtract(session.LastDataReceivedTime);
if (Math.Abs(ts.TotalSeconds) > _maxSocketDataTimeout) // 超時,則准備斷開連接
{
session.DisconnectType = TDisconnectType.Timeout;
session.State = TSessionState.NoReply; // 標記為將關閉、准備斷開
}
}
}
finally
{
Monitor.Exit(session);
}
} // end foreach
sessionTable2.Clear();
} // end while
}

4 、結語

基於多線程處理的系統代價是比較大的,需要經常調用加/解鎖方法lock()或Monitor.Enter(),需要經常創建處理線程等。從實際運行效果看,筆者的實現方案有較好的穩定性:2005年4月到5月間,在一個普通PC機器上連續運行30多天不出一點故障。同時,筆者采用了時序區間判重等算法,有效地提高了系統處理與響應速度。測試表明,在普通的PC機器(P4 2.0)上,可以做到0.5秒處理一個數據包,如果優化代碼和服務器,還有較大的性能提升空間。

上面的代碼是筆者實現的省級公路交通流量數據服務中心(DSC)項目中的接收服務器框架部分,整個系統還包括:數據轉發交通部的轉發服務器、數據遠程查詢客戶端、綜合報表數據處理系統、數據在線發布系統、系統運行監控系統等。

實際的接收服務器類及其輔助類超過3K行,整個系統則超過了60K。因為是早期實現的程序,難免有代碼粗糙、方法欠妥的感覺,只有留待下個版本完善擴充了。由於與甲方有保密合同和版權保護等,不可能公開全部源代碼,刪減也有不當之處,讀者發現時請不吝指正。下面是帶詳細注釋的代碼下載URL。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved