程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 用winsock和iocp api打造一個echo server

用winsock和iocp api打造一個echo server

編輯:關於.NET

這裡用到了一些技術點,比如平台調用、反射,多線程等,當然還有iocp和winsock的api,及 GCHandle,SafeHandle,Marshal類的使用等,不過相當多的東西,我上篇帖子講的都很細了,如果對 winsock api不了解可以查閱MSDN。也沒什麼技術難點,說幾個細節的地方吧。

1、.net自帶的System.Threading.NativeOverlapped類型是完全按照win32的Overlapped結構實現的, 因為我們在WSASend和WSAReceive的時候想要傳遞更多的數據,而不只是一個重疊結構,所以我自己定義 了一個WaOverlapped,在原有結構的末尾加了一個指針,指向一個自定義類的GC句柄,這樣在工作線程裡 就可以拿到自定義的單IO數據了,這個是我想了N種辦法不行後的一個可行的辦法。

2、注意GCHandle在取到數據後不用的話記著Free掉,否則就有可能造成內存洩漏。

3、如果調用WSASend或者WSAReceive返回6的話,多半是你准備的單IO數據不對,6表示無效的句柄。

4、如果傳遞給WSASend或者WSAReceive的Overlapped沒pin住,會拋異常的,等不到 GetLastWin32Error,所以用GCHandle.Alloc(PerIoData.Overlapped, GCHandleType.Pinned)把它pin住 。

5、這個類還沒有進行各方面的優化,其中的單IO數據,socket等都可以做成對象池來重用,Accept還 可以替換成AcceptEx來用一個現成的Socket來接受新的連接,而不是自動創建一個新的,還有緩沖區可以 做成環狀的,關於性能方面的優化,下次有機會再給大家做實驗。

完整代碼如下,windows2008打開不安全代碼進行編譯,然後可以用telnet進行測試。

using System;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;

namespace WawaSocket.Net.Iocp
{
    用IOCP和winsock api實現一個echo服務器#region 用IOCP和winsock api實現一個echo服務器
    class IocpTest
    {
        private static readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1); //無 效句柄
        const int PORT = 5150; //要監聽的端口
        const int DATA_BUFSIZE = 8192;  //默認緩沖區
        const int ERROR_IO_PENDING = 997; //表示數據正在接受或者發送中
        const uint INIFINITE = 0xffffffff; //表示等待無限時長
        private static readonly Logger _logger = Logger.GetLogger(typeof (IocpTest));

        單IO數據#region 單IO數據
        [StructLayout(LayoutKind.Sequential)]
        class PerIoOperationData
        {
            public WaOverlapped Overlapped;
            public WSABuffer DataBuf;
            public readonly byte[] Buffer = new byte[DATA_BUFSIZE];
            public uint BytesSEND;
            public uint BytesRECV;
        }
        #endregion

        單句柄數據#region 單句柄數據
        [StructLayout(LayoutKind.Sequential)]
        class PerHandleData
        {
            public SafeSocketHandle Socket;
        }
        #endregion

        public static void Run()
        {
            WSAData wsaData;
            SocketError Ret;

            初始化套接字#region 初始化套接字
            _logger.Log("初始化socket");
            if ((Ret = Win32Api.WSAStartup(0x0202, out wsaData)) != SocketError.Success)
            {
                _logger.Error("WSAStartup failed with error {0}\n", Ret);
                return;
            }
            #endregion

            創建一個完成端口內核對象#region 創建一個完成端口內核對象
            _logger.Log("創建完成端口");
            // Setup an I/O completion port.
            SafeFileHandle CompletionPort = Win32Api.CreateIoCompletionPort (INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, 0);
            if (CompletionPort.IsInvalid)
            {
                _logger.Error("CreateIoCompletionPort failed with error: {0}\n", Marshal.GetLastWin32Error());
                Marshal.ThrowExceptionForHR(Marshal.GetLastWin32Error ());
                return;
            }
            #endregion

            創建工作線程#region 創建工作線程
            int processorCount = Environment.ProcessorCount;
            _logger.Log("創建{0}個工作線程", processorCount);
            for (int i = 0; i < processorCount; i++)
            {
                // Create a server worker thread and pass the completion port to the thread.
                var thread = new Thread(ThreadProc);
                thread.Start(CompletionPort);
            }
            #endregion

            創建監聽用的套接字#region 創建監聽用的套接字
            _logger.Log("創建監聽套接字");
            // Create a listening socket
            SafeSocketHandle Listen = Win32Api.WSASocket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp, IntPtr.Zero, 0, SocketConstructorFlags.WSA_FLAG_OVERLAPPED);
            if (Listen.IsInvalid)
            {
                Listen.SetHandleAsInvalid();
                _logger.Error("WSASocket() failed with error {0}\n", Win32Api.WSAGetLastError());
                Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
                return;
            }
            #endregion

            將套接字與本地端口綁定#region 將套接字與本地端口綁定
            IPEndPoint InternetAddr = new IPEndPoint(IPAddress.Any, PORT);
            SocketAddress socketAddress = InternetAddr.Serialize();
            byte[] adress_buffer;
            int adress_size;
            _logger.Log("進行套接字綁定");
            if (!DoBind(Listen, socketAddress, out adress_buffer, out adress_size))
            {
                _logger.Error("bind() failed with error {0}\n", Win32Api.WSAGetLastError());
                Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
                return;
            }
            #endregion

            開始監聽端口#region 開始監聽端口
            _logger.Log("開始監聽:{0}-{1}", InternetAddr.Address, InternetAddr.Port);
            // Prepare socket for listening
            if (Win32Api.listen(Listen, 5) == SocketError.SocketError)
            {
                _logger.Error("listen() failed with error {0}\n", Win32Api.WSAGetLastError());
                Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
                return;
            }
            #endregion

            起一個循環來接受新連接#region 起一個循環來接受新連接
            // Accept connections and assign to the completion port.
            while (true)
                unsafe
                {
                    接受新連接#region 接受新連接
                    _logger.Log("開始接受入站連接");
                    SafeSocketHandle Accept = Win32Api.accept (Listen.DangerousGetHandle(), adress_buffer, ref adress_size);
                    if (Accept.IsInvalid)
                    {
                        _logger.Error("WSAAccept() failed with error {0}\n", Win32Api.WSAGetLastError());
                        Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
                    }
                    _logger.Log("有新連接進入:{0}", Accept.GetHashCode ());
                    #endregion

                    創建單句柄數據#region 創建單句柄數據
                    // Create a socket information structure to associate with the socket
                    PerHandleData PerHandleData = new PerHandleData ();
                    GCHandle gch_PerHandleData = GCHandle.Alloc (PerHandleData);
                    // Associate the accepted socket with the original completion port.
                    PerHandleData.Socket = Accept;
                    #endregion

                    把新接受的套接字與完成端口綁定#region 把新接受的套 接字與完成端口綁定
                    SafeFileHandle iocp = Win32Api.CreateIoCompletionPort(Accept.DangerousGetHandle(),
                                                                  CompletionPort.DangerousGetHandle(),
                                                                 GCHandle.ToIntPtr(gch_PerHandleData), 0);
                    if (iocp == null)
                    {
                        _logger.Error("CreateIoCompletionPort failed with error {0}\n", Marshal.GetLastWin32Error());
                        Marshal.ThrowExceptionForHR (Marshal.GetLastWin32Error());
                        return;
                    }
                    #endregion

                    准備單IO數據#region 准備單IO數據
                    // Create per I/O socket information structure to associate with the
                    // WSARecv call below.
                    PerIoOperationData PerIoData = new PerIoOperationData();
                    GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
                    PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
                    GCHandle gcHandle = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
                    PerIoData.BytesSEND = 0;
                    PerIoData.BytesRECV = 0;
                    PerIoData.DataBuf.Length = DATA_BUFSIZE;
                    PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(PerIoData.Buffer, 0);
                    #endregion

                    開始投遞異步接受數據的請求#region 開始投遞異步接受 數據的請求
                    SocketFlags Flags = SocketFlags.None;
                    _logger.Log("開始異步接受數據");
                    int RecvBytes;
                    SocketError error = Win32Api.WSARecv(Accept, ref PerIoData.DataBuf,
                        1, out RecvBytes, ref Flags, gcHandle.AddrOfPinnedObject(),
                        IntPtr.Zero);
                    if (error == SocketError.SocketError)
                    {
                        if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
                        {
                            _logger.Error("WSARecv() failed with error {0}\n", Win32Api.WSAGetLastError());
                            Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
                            //其實在主線程退出之前都應該用 PostQueuedCompletionStatus通知工作線程退出
                            return;
                        }
                    }
                    #endregion
                }
            #endregion
        }

把一個套接字綁定在一個端口上的工具方法#region 把一個套接字綁定在一個端口上的工具方法

private static bool DoBind(SafeSocketHandle Listen, SocketAddress address, out byte[] buffer, out int size)
        {
            FieldInfo socketAddress_m_Buffer = typeof(SocketAddress).GetField ("m_Buffer",
                                                                               BindingFlags.Instance | BindingFlags.NonPublic);
            FieldInfo socketAddress_m_Size = typeof(SocketAddress).GetField ("m_Size",
                                                                             BindingFlags.Instance | BindingFlags.NonPublic);
            var m_buffer = (byte[])socketAddress_m_Buffer.GetValue (address);
            var m_Size = (int)socketAddress_m_Size.GetValue(address);
            buffer = m_buffer;
            size = m_Size;

            if (Win32Api.bind(Listen, m_buffer, m_Size) != SocketError.Success)
            {
                return false;
            }
            return true;
        }
        #endregion

工作線程#region 工作線程

static unsafe void ThreadProc(object CompletionPortID)
        {
            var CompletionPort = (SafeFileHandle)CompletionPortID; //接受通知的 完成端口
            SocketFlags Flags;
            IntPtr intptr_per_io_data, intptr_per_handle_data; //單句柄數據,單 實例數據的指針
            GCHandle gcHandle_per_io_data, gcHandle_per_handle_data;//單句柄數 據,單實例數據的gc句柄
            uint BytesTransferred; //接受或發送的數據
            PerHandleData PerHandleData; //單據並數據
            PerIoOperationData PerIoData; //單IO數據
            int SendBytes; //發送出的字節
            int RecvBytes; //接受到的字節
            在循環裡接受和發送數據#region 在循環裡接受和發送數據
            while (true)
            {

                在完成端口上等消息#region 在完成端口上等消息
                if (!Win32Api.GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
             out intptr_per_handle_data, out intptr_per_io_data, INIFINITE))
                {
                    _logger.Error("GetQueuedCompletionStatus failed with error {0}\n",
                        Marshal.GetLastWin32Error());
                    return;
                }
                #endregion

                拿到單據並數據#region 拿到單據並數據
                gcHandle_per_handle_data = GCHandle.FromIntPtr (intptr_per_handle_data);
                PerHandleData = (PerHandleData) gcHandle_per_handle_data.Target;
                #endregion

                拿到單IO數據#region 拿到單IO數據
                WaOverlapped o = new WaOverlapped();
                Marshal.PtrToStructure(intptr_per_io_data, o);
                gcHandle_per_io_data = GCHandle.FromIntPtr(o.State);
                PerIoData = (PerIoOperationData) gcHandle_per_io_data.Target;
                #endregion

                判斷是否為斷開請求#region 判斷是否為斷開請求
                if (BytesTransferred == 0)
                {
                    _logger.Log("斷開連接 {0}", PerHandleData.Socket.GetHashCode());
                    PerHandleData.Socket.Close();
                    gcHandle_per_handle_data.Free();
                    gcHandle_per_io_data.Free();
                    continue;
                }
                #endregion

                根據異步操作的類型來更新單IO數據#region 根據異步操作的類型 來更新單IO數據
                // Check to see if the BytesRECV field equals zero. If this is so, then
                // this means a WSARecv call just completed so update the BytesRECV field
                // with the BytesTransferred value from the completed WSARecv() call.
                if (PerIoData.BytesRECV == 0)
                {
                    PerIoData.BytesRECV = BytesTransferred;
                    PerIoData.BytesSEND = 0;
                }
                else
                {
                    PerIoData.BytesSEND += BytesTransferred;
                }
                #endregion

                try
                {
                    if (PerIoData.BytesRECV > PerIoData.BytesSEND)
                    {
                        如果收到消息就原封不動發給發送者#region 如 果收到消息就原封不動發給發送者
                        _logger.Log("開始異步發送數據:{0}-{1}", PerHandleData.Socket.GetHashCode(),
                            PerIoData.Overlapped.GetHashCode ());

                        更新單IO數據#region 更新單IO數據
                        // Post another WSASend() request.
                        // Since WSASend() is not gauranteed to send all of the bytes requested,
                        // continue posting WSASend() calls until all received bytes are sent.
                        GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
                        PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
                        GCHandle gchOverlapped = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
                        PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(
                            PerIoData.Buffer, (int) PerIoData.BytesSEND);
                        PerIoData.DataBuf.Length = PerIoData.BytesRECV - PerIoData.BytesSEND;
                        #endregion

                        投遞異步發送數據請求#region 投遞異步發送數 據請求
                        SocketError error = Win32Api.WSASend (PerHandleData.Socket, ref PerIoData.DataBuf,
                    1, out SendBytes, 0, gchOverlapped.AddrOfPinnedObject(), IntPtr.Zero);
                        if (error == SocketError.SocketError)
                        {
                            if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
                            {
                                _logger.Error("WSASend() failed with error {0}", Win32Api.WSAGetLastError());
                                return;
                            }
                        }
                        #endregion
                        #endregion
                    }
                    else
                    {
                        如果沒有需要發送的數據,就投遞一個異步接受 數據請求#region 如果沒有需要發送的數據,就投遞一個異步接受數據請求
                        _logger.Log("開始異步接受數據:{0}-{1}", PerHandleData.Socket.GetHashCode(),
    PerIoData.Overlapped.GetHashCode());
                        更新單IO數據#region 更新單IO數據

                        PerIoData.BytesRECV = 0;
                        // Now that there are no more bytes to send post another WSARecv() request.
                        Flags = SocketFlags.None;
                        GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
                        PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
                        GCHandle gchOverlapped = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
                        PerIoData.DataBuf.Length = DATA_BUFSIZE;
                        PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(PerIoData.Buffer, 0);
                        #endregion

                        投遞異步接受請求#region 投遞異步接受請求
                        SocketError error = Win32Api.WSARecv (PerHandleData.Socket, ref PerIoData.DataBuf,
                    1, out RecvBytes, ref Flags, gchOverlapped.AddrOfPinnedObject(),
                    IntPtr.Zero);
                        if (error == SocketError.SocketError)
                        {
                            if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
                            {
                                _logger.Error("WSARecv() failed with error{0}", Win32Api.WSAGetLastError());
                                return;
                            }
                        }
                        #endregion
                        #endregion
                    }
                }
                finally
                {
                    if (gcHandle_per_handle_data.IsAllocated)
                        gcHandle_per_io_data.Free();
                }

            }
            #endregion
        }
        #endregion
    }
    #endregion

封裝原生的socket對象#region 封裝原生的socket對象

public class SafeSocketHandle : SafeHandleMinusOneIsInvalid
    {
        private Logger _logger = Logger.GetLogger(typeof(SafeSocketHandle));
        public SafeSocketHandle()
            : base(true)
        {
        }

        public SafeSocketHandle(bool ownsHandle)
            : base(ownsHandle)
        {
        }

        protected override bool ReleaseHandle()
        {
            if (Win32Api.closesocket(base.handle) == SocketError.SocketError)
            {
                _logger.Error("closesocket() failed with error {0}\n", Win32Api.WSAGetLastError());
            }
            return true;
        }
    }
    #endregion

日志類#region 日志類

class Logger
    {
        public static Logger GetLogger(Type type)
        {
            return new Logger();
        }

        public void Log(object o)
        {
            Console.WriteLine(o);
        }
        public void Log(string format, params object[] objects)
        {
            Console.WriteLine(format, objects);
        }
        public void Error(object o)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(o);
            Console.ForegroundColor = ConsoleColor.White;
        }
        public void Error(string format, params object[] objects)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, objects);
            Console.ForegroundColor = ConsoleColor.White;
        }
    }
    #endregion

    win32 structs#region win32 structs
    [StructLayout(LayoutKind.Sequential)]
    public class WaOverlapped
    {
        public IntPtr InternalLow;
        public IntPtr InternalHigh;
        public int OffsetLow;
        public int OffsetHigh;
        public IntPtr EventHandle;
        public IntPtr State;

    }

    [StructLayout(LayoutKind.Sequential)]
    internal struct WSABuffer
    {
        internal uint Length;
        internal IntPtr Pointer;
    }
    [StructLayout(LayoutKind.Sequential)]
    internal struct WSAData
    {
        internal short wVersion;
        internal short wHighVersion;
        [MarshalAs(UnmanagedType.ByValTStr, SizeConst = 0x101)]
        internal string szDescription;
        [MarshalAs(UnmanagedType.ByValTStr, SizeConst = 0x81)]
        internal string szSystemStatus;
        internal short iMaxSockets;
        internal short iMaxUdpDg;
        internal IntPtr lpVendorInfo;
    }
    [Flags]
    internal enum SocketConstructorFlags
    {
        WSA_FLAG_MULTIPOINT_C_LEAF = 4,
        WSA_FLAG_MULTIPOINT_C_ROOT = 2,
        WSA_FLAG_MULTIPOINT_D_LEAF = 0x10,
        WSA_FLAG_MULTIPOINT_D_ROOT = 8,
        WSA_FLAG_OVERLAPPED = 1
    }
    #endregion

封裝winsock和iocp的相關API原型#region 封裝winsock和iocp的相關API原型

class Win32Api
    {
        [DllImport("ws2_32.dll", CharSet = CharSet.Ansi, SetLastError = true)]
        internal static extern SocketError WSAStartup([In] short wVersionRequested, out WSAData lpWSAData);

        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
        [DllImport("ws2_32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        internal static extern SafeSocketHandle WSASocket([In] AddressFamily addressFamily, [In] SocketType socketType, [In] ProtocolType protocolType, [In] IntPtr protocolInfo, [In] uint group, [In] SocketConstructorFlags flags);
        [DllImport("Ws2_32.dll", EntryPoint = "WSAGetLastError", SetLastError = true, CharSet = CharSet.Ansi, ExactSpelling = true, CallingConvention = CallingConvention.StdCall)]
        public static extern int WSAGetLastError();
        [DllImport("ws2_32.dll", SetLastError = true)]
        internal static extern SocketError bind([In] SafeSocketHandle socketHandle, [In] byte[] socketAddress, [In] int socketAddressSize);
        [DllImport("ws2_32.dll", SetLastError = true)]
        internal static extern SocketError listen([In] SafeSocketHandle socketHandle, [In] int backlog);
        [DllImport("ws2_32.dll", SetLastError = true, ExactSpelling = true)]
        internal static extern SafeSocketHandle accept([In] IntPtr socketHandle, [Out] byte[] socketAddress, [In, Out] ref int socketAddressSize);
        [DllImport("ws2_32.dll", SetLastError = true)]
        internal static extern SocketError WSARecv([In] SafeSocketHandle socketHandle, [In, Out] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In, Out] ref SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern unsafe bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
            out uint lpNumberOfBytes, out IntPtr lpCompletionKey,
            out IntPtr lpOverlapped, uint dwMilliseconds);
        [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), DllImport("ws2_32.dll", SetLastError = true, ExactSpelling = true)]
        internal static extern SocketError closesocket([In] IntPtr socketHandle);
        [DllImport("ws2_32.dll", SetLastError = true)]
        internal static extern SocketError WSASend([In] SafeSocketHandle socketHandle, [In] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In] SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);
    }
    #endregion

    public class WawaIocpTest
    {
        public static void Main(String[] args)
        {
            IocpTest.Run();
        }
    }
}

小結:希望通過這3篇帖子能加深大家對c#開發windows上的網絡應用的了解。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved