在做c#中面向無連接的傳輸時用到了UDP,雖然沒有TCP穩定可靠。但是效率是要高些,優勢也有,缺點也有
就是有的時候要丟包,有的時候不得不用UDP,但是如何才能比較穩定的實現可靠傳輸呢,這是一個問題。
TCP傳輸數據的時候沒有大小限制,但是UDP傳輸的時候是有大小限制的,我們怎麼才能夠實現大數據的穩定傳輸呢。我們想到了,把數據包分包。
把一個大數據分割為一系列的小數據包然後分開發送,然後服務端收到了就拼湊起完整數據。
如果遇到中途丟包就重發。
UDP線程類,實現數據的分包發送和重發。具體的接收操作需要實現其中的事件
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Model;
using System.Net;
using Tool;
using System.Threading;
namespace ZZUdp.Core
{
//udp的類
public class UDPThread
{
#region 私有變量
UdpClient client;//UDP客戶端
List sendlist;// 用於輪詢是否發送成功的記錄
Dictionary RecListDic = new Dictionary();//數據接收列表,每一個sequence對應一個
IPEndPoint remotIpEnd = null;//用來在接收數據的時候對遠程主機的信息存放
int port=6666;//定義服務器的端口號
#endregion
#region 屬性
public int CheckQueueTimeInterval { get; set; }//檢查發送隊列間隔
public int MaxResendTimes { get; set; }//沒有收到確認包時,最大重新發送的數目,超過此數目會丟棄並觸發PackageSendFailture事件
#endregion
#region 事件
///
/// 當數據包收到時觸發
///
public event EventHandler PackageReceived;
///
/// 當數據包收到事件觸發時,被調用
///
/// 包含事件的參數
protected virtual void OnPackageReceived(PackageEventArgs e)
{
if (PackageReceived != null)
PackageReceived(this, e);
}
///
/// 數據包發送失敗
///
public event EventHandler PackageSendFailure;
///
/// 當數據發送失敗時調用
///
/// 包含事件的參數
protected virtual void OnPackageSendFailure(PackageEventArgs e)
{
if (PackageSendFailure != null)
PackageSendFailure(this, e);
}
///
/// 數據包未接收到確認,重新發送
///
public event EventHandler PackageResend;
///
/// 觸發重新發送事件
///
/// 包含事件的參數
protected virtual void OnPackageResend(PackageEventArgs e)
{
if (PackageResend != null)
PackageResend(this, e);
}
#endregion
//無參構造函數
public UDPThread()
{
}
//構造函數
public UDPThread(string ipaddress, int port)
{
IPAddress ipA = IPAddress.Parse(ipaddress);//構造遠程連接的參數
IPEndPoint ipEnd = new IPEndPoint(ipA, port);
client = new UdpClient();// client = new UdpClient(ipEnd)這樣的話就沒有創建遠程連接
client.Connect(ipEnd);//使用指定的遠程主機信息建立默認遠程主機連接
sendlist = new List();
CheckQueueTimeInterval = 2000;//輪詢間隔時間
MaxResendTimes = 5;//最大發送次數
new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//啟動輪詢線程
//開始監聽數據
AsyncReceiveData();
}
///
/// 同步數據接收方法
///
public void ReceiveData()
{
while (true)
{
IPEndPoint retip = null;
UdpPacket udpp = null;
try
{
byte[] data = client.Receive(ref retip);//接收數據,當Client端連接主機的時候,retip就變成Cilent端的IP了
udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
}
catch (Exception ex)
{
//異常處理操作
}
if (udpp != null)
{
PackageEventArgs arg = new PackageEventArgs(udpp, retip);
OnPackageReceived(arg);//數據包收到觸發事件
}
}
}
//異步接受數據
public void AsyncReceiveData()
{
try
{
client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
}
catch (SocketException ex)
{
throw ex;
}
}
//接收數據的回調函數
public void ReceiveCallback(IAsyncResult param)
{
if (param.IsCompleted)
{
UdpPacket udpp = null;
try
{
byte[] data = client.EndReceive(param, ref remotIpEnd);//接收數據,當Client端連接主機的時候,test就變成Cilent端的IP了
udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
}
catch (Exception ex)
{
//異常處理操作
}
finally
{
AsyncReceiveData();
}
if (udpp != null)//觸發數據包收到事件
{
PackageEventArgs arg = new PackageEventArgs(udpp, null);
OnPackageReceived(arg);
}
}
}
///
/// 同步發送分包數據
///
///
public void SendData(Msg message)
{
ICollection udpPackets = UdpPacketSplitter.Split(message);
foreach (UdpPacket udpPacket in udpPackets)
{
byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
//使用同步發送
client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip);
if (udpPacket.IsRequireReceiveCheck)
PushSendItemToList(udpPacket);//將該消息壓入列表
}
}
///
/// 異步分包發送數組的方法
///
///
public void AsyncSendData(Msg message)
{
ICollection udpPackets = UdpPacketSplitter.Split(message);
foreach (UdpPacket udpPacket in udpPackets)
{
byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
//使用同步發送
//client.Send(udpPacketDatagram, udpPacketDatagram.Length);
//使用異步的方法發送數據
this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null);
}
}
//發送完成後的回調方法
public void SendCallback(IAsyncResult param)
{
if (param.IsCompleted)
{
try
{
client.EndSend(param);//這句話必須得寫,BeginSend()和EndSend()是成對出現的
}
catch (Exception e)
{
//其他處理異常的操作
}
}
}
static object lockObj = new object();
///
/// 自由線程,檢測未發送的數據並發出,存在其中的就是沒有收到確認包的數據包
///
void CheckUnConfirmedQueue()
{
do
{
if (sendlist.Count > 0)
{
UdpPacket[] array = null;
lock (sendlist)
{
array = sendlist.ToArray();
}
//挨個重新發送並計數
Array.ForEach(array, s =>
{
s.sendtimes++;
if (s.sendtimes >= MaxResendTimes)
{
//sOnPackageSendFailure//出發發送失敗事件
sendlist.Remove(s);//移除該包
}
else
{
//重新發送
byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s);
client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip);
}
});
}
Thread.Sleep(CheckQueueTimeInterval);//間隔一定時間重發數據
} while (true);
}
///
/// 將數據信息壓入列表
///
///
void PushSendItemToList(UdpPacket item)
{
sendlist.Add(item);
}
///
/// 將數據包從列表中移除
///
/// 數據包編號
/// 數據包分包索引
public void PopSendItemFromList(long packageNo, int packageIndex)
{
lock (lockObj)
{
Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s));
}
}
///
/// 關閉客戶端並釋放資源
///
public void Dispose()
{
if (client != null)
{
client.Close();
client = null;
}
}
}
}
首先是數據信息實體類
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
namespace Model
{
//封裝消息類
[Serializable]
public class Msg
{
//所屬用戶的用戶名
public string name { get; set; }
//所屬用戶的ip
public string host { get; set; }
//命令的名稱
public string command { get; set; }
//收信人的姓名
public string desname { get; set; }
//你所發送的消息的目的地ip,應該是對應在服務器的列表裡的主鍵值
public string destinationIP { get; set; }
//端口號
public int port { get; set; }
//文本消息
public string msg { get; set; }
//二進制消息
public byte[] byte_msg { get; set; }
//附加數據
public string extend_msg { get; set; }
//時間戳
public DateTime time { get; set; }
//構造函數
public Msg(string command,string desip,string msg,string host)
{
this.command = command;
this.destinationIP = desip;
this.msg = msg;
this.time = DateTime.Now;
this.host = host;
}
override
public string ToString()
{
return name + "說:" + msg;
}
}
}
分包實體類
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using System.Net;
namespace Model
{
[Serializable]
public class UdpPacket
{
public long sequence{get;set;}//所屬組的唯一序列號 包編號
public int total { get; set; }//分包總數
public int index { get; set; }//消息包的索引
public byte[] data { get; set; }//包的內容數組
public int dataLength { get; set; }//分割的數組包大小
public int remainder { get; set; }//最後剩余的數組的數據長度
public int sendtimes { get; set; }//發送次數
public IPEndPoint remoteip { get; set; }//接受該包的遠程地址
public bool IsRequireReceiveCheck { get; set; }//獲得或設置包收到時是否需要返回確認包
public static int HeaderSize = 30000;
public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port)
{
this.sequence = sequence;
this.total = total;
this.index = index;
this.data = data;
this.dataLength = dataLength;
this.remainder = remainder;
this.IsRequireReceiveCheck = true;//默認都需要確認包
//構造遠程地址
IPAddress ipA = IPAddress.Parse(desip);
this.remoteip = new IPEndPoint(ipA, port);
}
//把這個對象生成byte[]
public byte[] ToArray()
{
return SerializationUnit.SerializeObject(this);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
namespace Model
{
///
/// UDP數據包分割器
///
public static class UdpPacketSplitter
{
public static ICollection Split(Msg message)
{
byte[] datagram = null;
try
{
datagram = SerializationUnit.SerializeObject(message);
}
catch (Exception e)
{
//AddTalkMessage("數據轉型異常");
}
//產生一個序列號,用來標識包數據屬於哪一組
Random Rd = new Random();
long SequenceNumber = Rd.Next(88888, 999999);
ICollection udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port);
return udpPackets;
}
///
/// 分割UDP數據包
///
/// UDP數據包所持有的序號
/// 被分割的UDP數據包
/// 分割塊的長度
///
/// 分割後的UDP數據包列表
///
public static ICollection Split(long sequence, byte[] datagram, int chunkLength,string desip,int port)
{
if (datagram == null)
throw new ArgumentNullException("datagram");
List packets = new List();
int chunks = datagram.Length / chunkLength;
int remainder = datagram.Length % chunkLength;
int total = chunks;
if (remainder > 0) total++;
for (int i = 1; i <= chunks; i++)
{
byte[] chunk = new byte[chunkLength];
Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength);
packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port));
}
if (remainder > 0)
{
int length = datagram.Length - (chunkLength * chunks);
byte[] chunk = new byte[length];
Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length);
packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port));
}
return packets;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using Model;
namespace Model
{
//一個sequence對應一組的數據包的數據結構
public class RecDataList
{
public long sequence { get; set; }//序列號
//對應的存儲包的List
List RecudpPackets = new List();
public int total { get; set; }
public int dataLength { get; set; }
public int remainder { get; set; }
public byte[] DataBuffer = null;
public RecDataList(UdpPacket udp)
{
this.sequence = udp.sequence;
this.total = udp.total;
this.dataLength = udp.dataLength;
this.remainder = udp.remainder;
if (DataBuffer == null)
{
DataBuffer = new byte[dataLength * (total - 1) + remainder];
}
}
public RecDataList(long sequence, int total, int chunkLength, int remainder)
{
this.sequence = sequence;
this.total = total;
this.dataLength = chunkLength;
this.remainder = remainder;
if (DataBuffer == null)
{
DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder];
}
}
public void addPacket(UdpPacket p)
{
RecudpPackets.Add(p);
}
public Msg show()
{
if (RecudpPackets.Count == total)//表示已經收集滿了
{
//重組數據
foreach (UdpPacket udpPacket in RecudpPackets)
{
//偏移量
int offset = (udpPacket.index - 1) * udpPacket.dataLength;
Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length);
}
Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer);
DataBuffer = null;
RecudpPackets.Clear();
return rmsg;
}
else
{
return null;
}
}
public bool containskey(UdpPacket udp)
{
foreach (UdpPacket udpPacket in RecudpPackets)
{
if (udpPacket.index == udp.index)
return true;
}
return false;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
namespace Tool
{
public class EncodingTool
{
//編碼
public static byte[] EncodingASCII(string buf)
{
byte[] data = Encoding.Unicode.GetBytes(buf);
return data;
}
//解碼
public static string DecodingASCII(byte[] bt)
{
string st = Encoding.Unicode.GetString(bt);
return st;
}
//編碼
public static byte[] EncodingUTF_8(string buf)
{
byte[] data = Encoding.UTF8.GetBytes(buf);
return data;
}
//編碼
public static string DecodingUTF_8(byte[] bt)
{
string st = Encoding.UTF8.GetString(bt);
return st;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
namespace Tool
{
public class SerializationUnit
{
///
/// 把對象序列化為字節數組
///
public static byte[] SerializeObject(object obj)
{
if (obj == null)
return null;
//內存實例
MemoryStream ms = new MemoryStream();
//創建序列化的實例
BinaryFormatter formatter = new BinaryFormatter();
formatter.Serialize(ms, obj);//序列化對象,寫入ms流中
ms.Position = 0;
//byte[] bytes = new byte[ms.Length];//這個有錯誤
byte[] bytes = ms.GetBuffer();
ms.Read(bytes, 0, bytes.Length);
ms.Close();
return bytes;
}
///
/// 把字節數組反序列化成對象
///
public static object DeserializeObject(byte[] bytes)
{
object obj = null;
if (bytes == null)
return obj;
//利用傳來的byte[]創建一個內存流
MemoryStream ms = new MemoryStream(bytes);
ms.Position = 0;
BinaryFormatter formatter = new BinaryFormatter();
obj = formatter.Deserialize(ms);//把內存流反序列成對象
ms.Close();
return obj;
}
///
/// 把字典序列化
///
///
///
public static byte[] SerializeDic(Dictionary dic)
{
if (dic.Count == 0)
return null;
MemoryStream ms = new MemoryStream();
BinaryFormatter formatter = new BinaryFormatter();
formatter.Serialize(ms, dic);//把字典序列化成流
byte[] bytes = new byte[ms.Length];//從流中讀出byte[]
ms.Read(bytes, 0, bytes.Length);
return bytes;
}
///
/// 反序列化返回字典
///
///
///
public static Dictionary DeserializeDic(byte[] bytes)
{
Dictionary dic = null;
if (bytes == null)
return dic;
//利用傳來的byte[]創建一個內存流
MemoryStream ms = new MemoryStream(bytes);
ms.Position = 0;
BinaryFormatter formatter = new BinaryFormatter();
//把流中轉換為Dictionary
dic = (Dictionary)formatter.Deserialize(ms);
return dic;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Model;
namespace ZZUdp.Core
{
///
/// 數據包事件數據
///
public class PackageEventArgs : EventArgs
{
///
/// 網絡消息包
///
public UdpPacket udpPackage { get; set; }
///
/// 網絡消息包組
///
public UdpPacket[] udpPackages { get; set; }
///
/// 遠程IP
///
public IPEndPoint RemoteIP { get; set; }
///
/// 是否已經處理
///
public bool IsHandled { get; set; }
///
/// 創建一個新的 PackageEventArgs 對象.
///
public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP)
{
this.udpPackage = package;
this.RemoteIP = RemoteIP;
this.IsHandled = false;
}
}
}