程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> c#中關於udp實現可靠地傳輸(數據包的分組發送)

c#中關於udp實現可靠地傳輸(數據包的分組發送)

編輯:C#入門知識

在做c#中面向無連接的傳輸時用到了UDP,雖然沒有TCP穩定可靠。但是效率是要高些,優勢也有,缺點也有

就是有的時候要丟包,有的時候不得不用UDP,但是如何才能比較穩定的實現可靠傳輸呢,這是一個問題。

TCP傳輸數據的時候沒有大小限制,但是UDP傳輸的時候是有大小限制的,我們怎麼才能夠實現大數據的穩定傳輸呢。我們想到了,把數據包分包。

把一個大數據分割為一系列的小數據包然後分開發送,然後服務端收到了就拼湊起完整數據。

如果遇到中途丟包就重發。

UDP線程類,實現數據的分包發送和重發。具體的接收操作需要實現其中的事件

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Model;
using System.Net;
using Tool;
using System.Threading;

namespace ZZUdp.Core
{
    //udp的類  
    public class UDPThread
    {
        #region 私有變量

        UdpClient client;//UDP客戶端

        List sendlist;// 用於輪詢是否發送成功的記錄

        Dictionary RecListDic = new Dictionary();//數據接收列表,每一個sequence對應一個

        IPEndPoint remotIpEnd = null;//用來在接收數據的時候對遠程主機的信息存放

        int port=6666;//定義服務器的端口號
        #endregion

        #region 屬性
        public int CheckQueueTimeInterval { get; set; }//檢查發送隊列間隔

        public int MaxResendTimes { get; set; }//沒有收到確認包時,最大重新發送的數目,超過此數目會丟棄並觸發PackageSendFailture事件
        #endregion

        #region 事件
        
        /// 
        /// 當數據包收到時觸發
        /// 
        public event EventHandler PackageReceived;
        /// 
        /// 當數據包收到事件觸發時,被調用
        /// 
        /// 包含事件的參數
        protected virtual void OnPackageReceived(PackageEventArgs e)
        {
            if (PackageReceived != null) 
                PackageReceived(this, e);
        }

        /// 
        /// 數據包發送失敗
        /// 
        public event EventHandler PackageSendFailure;
        /// 
        /// 當數據發送失敗時調用
        /// 
        /// 包含事件的參數
        protected virtual void OnPackageSendFailure(PackageEventArgs e)
        {
            if (PackageSendFailure != null) 
                PackageSendFailure(this, e);
        }

        /// 
        /// 數據包未接收到確認,重新發送
        /// 
        public event EventHandler PackageResend;
        /// 
        /// 觸發重新發送事件
        /// 
        /// 包含事件的參數
        protected virtual void OnPackageResend(PackageEventArgs e)
        {
            if (PackageResend != null) 
                PackageResend(this, e);
        }
        #endregion


        //無參構造函數
        public UDPThread()
        { 
        }
        //構造函數
        public UDPThread(string ipaddress, int port)
        {
            IPAddress ipA = IPAddress.Parse(ipaddress);//構造遠程連接的參數
            IPEndPoint ipEnd = new IPEndPoint(ipA, port);
            client = new UdpClient();// client = new UdpClient(ipEnd)這樣的話就沒有創建遠程連接
            client.Connect(ipEnd);//使用指定的遠程主機信息建立默認遠程主機連接
            sendlist = new List();

            CheckQueueTimeInterval = 2000;//輪詢間隔時間
            MaxResendTimes = 5;//最大發送次數
            
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//啟動輪詢線程
            //開始監聽數據
            AsyncReceiveData();
        }
        /// 
        /// 同步數據接收方法
        /// 
        public void ReceiveData()
        {
            while (true)
            {
                IPEndPoint retip = null;
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.Receive(ref retip);//接收數據,當Client端連接主機的時候,retip就變成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //異常處理操作
                }
                if (udpp != null)
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, retip);
                    OnPackageReceived(arg);//數據包收到觸發事件
                }
            }
        }

        //異步接受數據
        public void AsyncReceiveData()
        {
            try
            {
                client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
            }
            catch (SocketException ex)
            {
                throw ex;
            }
        }
        //接收數據的回調函數
        public void ReceiveCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.EndReceive(param, ref remotIpEnd);//接收數據,當Client端連接主機的時候,test就變成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //異常處理操作
                }
                finally
                {
                    AsyncReceiveData();
                }
                if (udpp != null)//觸發數據包收到事件
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, null);
                    OnPackageReceived(arg);
                }
            }
        }


        /// 
        /// 同步發送分包數據
        /// 
        /// 
        public void SendData(Msg message)
        {
           
            ICollection udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步發送
               client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip);
               if (udpPacket.IsRequireReceiveCheck)
                   PushSendItemToList(udpPacket);//將該消息壓入列表

            }
        }
        
        /// 
        /// 異步分包發送數組的方法
        /// 
        /// 
        public void AsyncSendData(Msg message)
        {
            
            ICollection udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步發送
                //client.Send(udpPacketDatagram, udpPacketDatagram.Length);

                //使用異步的方法發送數據
                this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null);

            }
        }
        //發送完成後的回調方法
        public void SendCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                try
                {
                    client.EndSend(param);//這句話必須得寫,BeginSend()和EndSend()是成對出現的 
                }
                catch (Exception e)
                {
                    //其他處理異常的操作
                }
            }

        }
        static object lockObj = new object();
        /// 
        /// 自由線程,檢測未發送的數據並發出,存在其中的就是沒有收到確認包的數據包
        /// 
        void CheckUnConfirmedQueue()
        {
            do
            {
                if (sendlist.Count > 0)
                {
                    UdpPacket[] array = null;

                    lock (sendlist)
                    {
                        array = sendlist.ToArray();
                    }
                    //挨個重新發送並計數
                    Array.ForEach(array, s =>
                    {
                        s.sendtimes++;
                        if (s.sendtimes >= MaxResendTimes)
                        {
                            //sOnPackageSendFailure//出發發送失敗事件
                            sendlist.Remove(s);//移除該包
                        }
                        else
                        {
                            //重新發送
                            byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s);
                            client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip);
                        }
                    });
                }

               Thread.Sleep(CheckQueueTimeInterval);//間隔一定時間重發數據
            } while (true);
        }
        /// 
        /// 將數據信息壓入列表
        /// 
        /// 
        void PushSendItemToList(UdpPacket item)
        {
            sendlist.Add(item);
        }
        /// 
        /// 將數據包從列表中移除
        /// 
        /// 數據包編號
        /// 數據包分包索引
        public void PopSendItemFromList(long packageNo, int packageIndex)
        {
            lock (lockObj)
            {
                Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s));
            }
        }
        /// 
        /// 關閉客戶端並釋放資源
        /// 
        public void Dispose()
        {
            if (client != null)
            {
                client.Close();
                client = null;
            }
        }



    }
}



首先是數據信息實體類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;

namespace Model
{
    //封裝消息類
    [Serializable]
   public class Msg
    {
        //所屬用戶的用戶名
        public string name { get; set; }

        //所屬用戶的ip
        public string host { get; set; }

        //命令的名稱
        public string command { get; set; }

        //收信人的姓名
        public string desname { get; set; }

        //你所發送的消息的目的地ip,應該是對應在服務器的列表裡的主鍵值
        public string destinationIP { get; set; }

        //端口號
        public int port { get; set; }

        //文本消息
        public string msg { get; set; }

        //二進制消息
        public byte[] byte_msg { get; set; }

        //附加數據
        public string extend_msg { get; set; }

        //時間戳
        public DateTime time { get; set; }

        //構造函數
        public Msg(string command,string desip,string msg,string host)
        {
            this.command = command;
            this.destinationIP = desip;
            this.msg = msg;
            this.time = DateTime.Now;
            this.host = host;
        }
        override
        public string ToString()
        {
            return name + "說:" + msg;
        }
    }
}

MSG數據分割後生成分包數據

分包實體類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using System.Net;

namespace Model
{
    [Serializable]
    public class UdpPacket
    {
        public long sequence{get;set;}//所屬組的唯一序列號 包編號
        public int total { get; set; }//分包總數
        public int index { get; set; }//消息包的索引
        public byte[] data { get; set; }//包的內容數組
        public int dataLength { get; set; }//分割的數組包大小
        public int remainder { get; set; }//最後剩余的數組的數據長度
        public int sendtimes { get; set; }//發送次數
        public IPEndPoint remoteip { get; set; }//接受該包的遠程地址
        public bool IsRequireReceiveCheck { get; set; }//獲得或設置包收到時是否需要返回確認包
        public static int HeaderSize = 30000;
        public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port)
        {
            this.sequence = sequence;
            this.total = total;
            this.index = index;
            this.data = data;
            this.dataLength = dataLength;
            this.remainder = remainder;
            this.IsRequireReceiveCheck = true;//默認都需要確認包
            //構造遠程地址
            IPAddress ipA = IPAddress.Parse(desip);
            this.remoteip = new IPEndPoint(ipA, port);
        }
        //把這個對象生成byte[]
        public byte[] ToArray()
        {
            return SerializationUnit.SerializeObject(this);
        }
    }
}

數據包分割工具類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;

namespace Model
{
        /// 
        /// UDP數據包分割器
        /// 
        public static class UdpPacketSplitter
        {


            public static ICollection Split(Msg message)
            {
                byte[] datagram = null;
                try
                {
                    datagram = SerializationUnit.SerializeObject(message);
                }
                catch (Exception e)
                {
                    //AddTalkMessage("數據轉型異常");
                }
                //產生一個序列號,用來標識包數據屬於哪一組
                Random Rd = new Random();
                long SequenceNumber = Rd.Next(88888, 999999);
                ICollection udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port);

                return udpPackets;
            }
            /// 
            /// 分割UDP數據包
            /// 
            /// UDP數據包所持有的序號
            /// 被分割的UDP數據包
            /// 分割塊的長度
            /// 
            /// 分割後的UDP數據包列表
            /// 
            public static ICollection Split(long sequence, byte[] datagram, int chunkLength,string desip,int port)
            {
                if (datagram == null)
                    throw new ArgumentNullException("datagram");

                List packets = new List();

                int chunks = datagram.Length / chunkLength;
                int remainder = datagram.Length % chunkLength;
                int total = chunks;
                if (remainder > 0) total++;

                for (int i = 1; i <= chunks; i++)
                {
                    byte[] chunk = new byte[chunkLength];
                    Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength);
                    packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port));
                }
                if (remainder > 0)
                {
                    int length = datagram.Length - (chunkLength * chunks);
                    byte[] chunk = new byte[length];
                    Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length);
                    packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port));
                }

                return packets;
            }
        }
}

服務端存儲數據的數據結構

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using Model;

namespace Model
{
    //一個sequence對應一組的數據包的數據結構
    public class RecDataList
    {
        public long sequence { get; set; }//序列號
        //對應的存儲包的List
        List RecudpPackets = new List();
        public int total { get; set; }
        public int dataLength { get; set; }
        public int remainder { get; set; }
        public byte[] DataBuffer = null;
        public RecDataList(UdpPacket udp)
        {

            this.sequence = udp.sequence;
            this.total = udp.total;
            this.dataLength = udp.dataLength;
            this.remainder = udp.remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[dataLength * (total - 1) + remainder];
            }
        }
        public RecDataList(long sequence, int total, int chunkLength, int remainder)
        {
            
            this.sequence = sequence;
            this.total = total;
            this.dataLength = chunkLength;
            this.remainder = remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder];
            }
        }
        public void addPacket(UdpPacket p)
        {
            RecudpPackets.Add(p);
        }
        public Msg show() 
        {
            if (RecudpPackets.Count == total)//表示已經收集滿了
            {
                //重組數據
                foreach (UdpPacket udpPacket in RecudpPackets)
                {
                    //偏移量
                    int offset = (udpPacket.index - 1) * udpPacket.dataLength;
                    Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length);
                }
                Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer);
                DataBuffer = null;
                RecudpPackets.Clear();
                return rmsg;
            }
            else
            {
                return null;
            }
        }
        public bool containskey(UdpPacket udp)
        {
            foreach (UdpPacket udpPacket in RecudpPackets)
            {
                if (udpPacket.index == udp.index)
                    return true;
            }
            return false;
        }
    }
}

編碼工具類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;

namespace Tool
{
    public class EncodingTool
    {
        //編碼
        public static byte[] EncodingASCII(string buf)
        {
            byte[] data = Encoding.Unicode.GetBytes(buf);
            return data;
        }
        //解碼
        public static string DecodingASCII(byte[] bt)
        {
            string st = Encoding.Unicode.GetString(bt);
            return st;
        }



        //編碼
        public static byte[] EncodingUTF_8(string buf)
        {
            byte[] data = Encoding.UTF8.GetBytes(buf);
            return data;
        }
        //編碼
        public static string DecodingUTF_8(byte[] bt)
        {
            string st = Encoding.UTF8.GetString(bt);
            return st;
        }
       
    }
}

序列化和反序列化的工具類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
namespace Tool
{

    public class SerializationUnit
    {
        ///   
        /// 把對象序列化為字節數組  
        ///   
        public static byte[] SerializeObject(object obj)
        {
            if (obj == null)
                return null;
            //內存實例
            MemoryStream ms = new MemoryStream();
            //創建序列化的實例
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, obj);//序列化對象,寫入ms流中  
            ms.Position = 0;
            //byte[] bytes = new byte[ms.Length];//這個有錯誤
            byte[] bytes = ms.GetBuffer();
            ms.Read(bytes, 0, bytes.Length);
            ms.Close();
            return bytes;
        }

        ///   
        /// 把字節數組反序列化成對象  
        ///   
        public static object DeserializeObject(byte[] bytes)
        {
            object obj = null;
            if (bytes == null)
                return obj;
            //利用傳來的byte[]創建一個內存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            obj = formatter.Deserialize(ms);//把內存流反序列成對象  
            ms.Close();
            return obj;
        }
        /// 
        /// 把字典序列化
        /// 
        /// 
        /// 
        public static byte[] SerializeDic(Dictionary dic)
        {
            if (dic.Count == 0)
                return null;
            MemoryStream ms = new MemoryStream();
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, dic);//把字典序列化成流

            byte[] bytes = new byte[ms.Length];//從流中讀出byte[]
            ms.Read(bytes, 0, bytes.Length);

            return bytes;
        }
        /// 
        /// 反序列化返回字典
        /// 
        /// 
        /// 
        public static Dictionary DeserializeDic(byte[] bytes)
        {
            Dictionary dic = null;
            if (bytes == null)
                return dic;
            //利用傳來的byte[]創建一個內存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            //把流中轉換為Dictionary
            dic = (Dictionary)formatter.Deserialize(ms);
            return dic;
        }
    }

}

通用的數據包事件類

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Model;

namespace ZZUdp.Core
{
	/// 
	/// 數據包事件數據
	/// 
	public class PackageEventArgs : EventArgs
	{
		/// 
		/// 網絡消息包
		/// 
		public UdpPacket udpPackage { get; set; }

		/// 
		/// 網絡消息包組
		/// 
        public UdpPacket[] udpPackages { get; set; }

        /// 
        /// 遠程IP
        /// 
        public IPEndPoint RemoteIP { get; set; }

		/// 
		/// 是否已經處理
		/// 
		public bool IsHandled { get; set; }

		/// 
		/// 創建一個新的 PackageEventArgs 對象.
		/// 
        public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP)
		{
            this.udpPackage = package;
            this.RemoteIP = RemoteIP;
			this.IsHandled = false;
		}
	}
}


  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved