程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> FastSocket學習筆記~制定自已的傳輸協議~續~制定基於FastSocket的協議,fastsocket學習筆記

FastSocket學習筆記~制定自已的傳輸協議~續~制定基於FastSocket的協議,fastsocket學習筆記

編輯:C#入門知識

FastSocket學習筆記~制定自已的傳輸協議~續~制定基於FastSocket的協議,fastsocket學習筆記


FastSocket這個東西上次我已經說過,它使用簡單,功能強大,擴展靈活,目前在新浪的生產環境中已經被廣泛使用,所以它的性能,安全等各方面我們絕對可以信賴,今天我們來說一個話題,和上一講有關,這次我們制作一個基於FastSocket的傳輸協議,它的意義重大,當fastSocket提供的協議不能滿足項目要求時,我們就必須硬著頭皮去自己寫了,還好,fastsocket為我們鋪好了路,我們只要按著這條路走下去,就可以了。

自定義的協議格式如下

說干就干

首先,如果要想擴展一個自己的協議,要對 client和server端分別進行開發,下面我們來看一下client的開發

我們要添加的類有三個文件組成,分別是DSSBinaryProtocol,DSSBinaryResponse和一個使用這個協議的客戶端入口DSSBinarySocketClient

DSSBinaryProtocol

/// <summary> /// 異步二進制協議 /// 協議格式 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// 其中參數TableName和VersonNumber長度為40,不夠自動在左側補空格 /// </summary> public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryResponse> { #region IProtocol Members /// <summary> /// find response /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryResponse FindResponse(IConnection connection, ArraySegment<byte> buffer, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } //獲取message length var messageLength = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 8); var flagLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 10); var versonLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14, flagLength); var versonNumber = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14 + flagLength, versonLength); var dataLength = messageLength - 10 - flagLength - versonLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(buffer.Array, buffer.Offset + 14 + flagLength + versonLength, data, 0, dataLength); } return new DSSBinaryResponse(seqID, projectID, strName, versonNumber, data); } #endregion } View Code

DSSBinaryResponse

/// <summary> /// 數據同步系統DSS使用的Socket協議,我們稱為DSSBinary協議 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// </summary> public class DSSBinaryResponse : IResponse { /// <summary> /// 流水ID /// </summary> public int SeqID { get; private set; } /// <summary> /// 項目類型編號 /// </summary> public short ProjectID { get; set; } /// <summary> /// 本次傳輸的版本號,所有客戶端唯一[項目名稱(4字節)+guid(36字節)] /// </summary> public string VersonNumber { get; private set; } /// <summary> /// 命令名稱 /// </summary> public string Flag { get; private set; } /// <summary> /// 要操作的表對象,以字節數組形式進行傳輸 /// </summary> public readonly byte[] Buffer = null; public DSSBinaryResponse(int seqID, short projectID, string flag, string versonNumber, byte[] buffer) { this.SeqID = seqID; this.ProjectID = projectID; this.VersonNumber = versonNumber; this.Flag = flag; this.Buffer = buffer; } } View Code

DSSBinarySocketClient

/// <summary> /// 異步socket客戶端 /// </summary> public class DSSBinarySocketClient : PooledSocketClient<DSSBinaryResponse> { #region Constructors /// <summary> /// new /// </summary> public DSSBinarySocketClient() : base(new DSSBinaryProtocol()) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, 3000, 3000) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> /// <param name="millisecondsSendTimeout"></param> /// <param name="millisecondsReceiveTimeout"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize, int millisecondsSendTimeout, int millisecondsReceiveTimeout) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, millisecondsSendTimeout, millisecondsReceiveTimeout) { } #endregion #region Public Methods public Task<TResult> Send<TResult>(string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { return this.Send(null, cmdName, projectID, versonNumber, payload, funcResultFactory, asyncState); } public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory"); var seqID = base.NextRequestSeqID(); var cmdLength = cmdName.Length; var versonNumberLength = versonNumber.Length; var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + versonNumberLength + 10; var sendBuffer = new byte[messageLength + 4]; //write message length Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4); //write seqID. Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4); //write proejctID Buffer.BlockCopy(NetworkBitConverter.GetBytes(projectID), 0, sendBuffer, 8, 2); //write response flag length. Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 10, 2); //write verson length Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)versonNumberLength), 0, sendBuffer, 12, 2); //write response cmd Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 14, cmdLength); //write response versonNumber Buffer.BlockCopy(Encoding.ASCII.GetBytes(versonNumber), 0, sendBuffer, 14 + cmdLength, versonNumberLength); //write body buffer if (payload != null && payload.Length > 0) Buffer.BlockCopy(payload, 0, sendBuffer, 14 + cmdLength + versonNumberLength, payload.Length); var source = new TaskCompletionSource<TResult>(asyncState); base.Send(new Request<DSSBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer, ex => source.TrySetException(ex), response => { TResult result; try { result = funcResultFactory(response); } catch (Exception ex) { source.TrySetException(ex); return; } source.TrySetResult(result); })); return source.Task; } #endregion } View Code

然後,我們再來說一下server端的開發,它有兩個文件組成,分別是DSSBinaryCommandInfo,DSSBinaryProtocol

DSSBinaryCommandInfo

/// <summary> /// async binary command info. /// </summary> public class DSSBinaryCommandInfo : ICommandInfo { #region Constructors /// <summary> /// new /// </summary> /// <param name="cmdName"></param> /// <param name="seqID"></param> /// <param name="buffer"></param> /// <exception cref="ArgumentNullException">cmdName is null or empty.</exception> public DSSBinaryCommandInfo(int seqID, short projectID, string cmdName, string versonNumber, byte[] buffer) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (string.IsNullOrEmpty(versonNumber)) throw new ArgumentNullException("versonNumber"); this.VersonNumber = versonNumber; this.CmdName = cmdName; this.SeqID = seqID; this.ProjectID = projectID; this.Buffer = buffer; } #endregion #region Public Properties /// <summary> /// 版本號 /// </summary> public string VersonNumber { get; private set; } public short ProjectID { get; private set; } /// <summary> /// get the current command name. /// </summary> public string CmdName { get; private set; } /// <summary> /// seq id. /// </summary> public int SeqID { get; private set; } /// <summary> /// 主體內容 /// </summary> public byte[] Buffer { get; private set; } #endregion #region Public Methods /// <summary> /// reply /// </summary> /// <param name="connection"></param> /// <param name="payload"></param> public void Reply(IConnection connection, byte[] payload) { var packet = PacketBuilder.ToDSSBinary(this.SeqID, this.ProjectID, this.CmdName, this.VersonNumber, payload); connection.BeginSend(packet); } #endregion } View Code

DSSBinaryProtocol

/// <summary> /// 數據中心二進制協議 /// 協議格式 /// [Message Length(int32)][SeqID(int32)][Request|Response Flag Length(int16)][VersonNumber Length(int16)][Request|Response Flag + VersonNumber + Body Buffer] /// </summary> public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryCommandInfo> { #region IProtocol Members /// <summary> /// find command /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="maxMessageSize"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryCommandInfo FindCommandInfo(IConnection connection, ArraySegment<byte> buffer, int maxMessageSize, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } var payload = buffer.Array; //獲取message length var messageLength = NetworkBitConverter.ToInt32(payload, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); if (messageLength > maxMessageSize) throw new BadProtocolException("message is too long"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(payload, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(payload, buffer.Offset + 8); var cmdNameLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 10); var versonNumberLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(payload, buffer.Offset + 14, cmdNameLength); var versonNumber = Encoding.UTF8.GetString(payload, buffer.Offset + 14 + cmdNameLength, versonNumberLength); var dataLength = messageLength - 8 - cmdNameLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(payload, buffer.Offset + 14 + cmdNameLength + versonNumberLength, data, 0, dataLength); } return new DSSBinaryCommandInfo(seqID, projectID, strName, versonNumber, data); } #endregion } View Code

除了上面兩個文件外,我們還要修改服務端的管理類

    /// <summary>
    /// Socket server manager.
    /// </summary>
    public class SocketServerManager
    {
        #region Private Members
        static private readonly List<SocketBase.IHost> _listHosts = new List<SocketBase.IHost>();
        #endregion

        #region Static Methods
        /// <summary>
        /// 初始化Socket Server
        /// </summary>
        static public void Init()
        {
            Init("socketServer");
        }
        /// <summary>
        /// 初始化Socket Server
        /// </summary>
        /// <param name="sectionName"></param>
        static public void Init(string sectionName)
        {
            if (string.IsNullOrEmpty(sectionName)) throw new ArgumentNullException("sectionName");
            Init(ConfigurationManager.GetSection(sectionName) as Config.SocketServerConfig);
        }
        /// <summary>
        /// 初始化Socket Server
        /// </summary>
        /// <param name="config"></param>
        static public void Init(Config.SocketServerConfig config)
        {
            if (config == null) throw new ArgumentNullException("config");
            if (config.Servers == null) return;

            foreach (Config.Server serverConfig in config.Servers)
            {
                //inti protocol
                var objProtocol = GetProtocol(serverConfig.Protocol);
                if (objProtocol == null) throw new InvalidOperationException("protocol");

                //init custom service
                var tService = Type.GetType(serverConfig.ServiceType, false);
                if (tService == null) throw new InvalidOperationException("serviceType");

                var serviceFace = tService.GetInterface(typeof(ISocketService<>).Name);
                if (serviceFace == null) throw new InvalidOperationException("serviceType");

                var objService = Activator.CreateInstance(tService);
                if (objService == null) throw new InvalidOperationException("serviceType");

                //init host.
                var host = Activator.CreateInstance(typeof(SocketServer<>).MakeGenericType(
                    serviceFace.GetGenericArguments()),
                    objService,
                    objProtocol,
                    serverConfig.SocketBufferSize,
                    serverConfig.MessageBufferSize,
                    serverConfig.MaxMessageSize,
                    serverConfig.MaxConnections) as BaseSocketServer;

                host.AddListener(serverConfig.Name, new IPEndPoint(IPAddress.Any, serverConfig.Port));

                _listHosts.Add(host);
            }
        }
        /// <summary>
        /// get protocol.
        /// </summary>
        /// <param name="protocol"></param>
        /// <returns></returns>
        static public object GetProtocol(string protocol)
        {
            switch (protocol)
            {
                case Protocol.ProtocolNames.AsyncBinary:
                    return new Protocol.AsyncBinaryProtocol();
                case Protocol.ProtocolNames.Thrift:
                    return new Protocol.ThriftProtocol();
                case Protocol.ProtocolNames.CommandLine:
                    return new Protocol.CommandLineProtocol();
                case Protocol.ProtocolNames.DSSBinary:
                    return new Protocol.DSSBinaryProtocol();
            }
            return Activator.CreateInstance(Type.GetType(protocol, false));
        }

        /// <summary>
        /// 啟動服務
        /// </summary>
        static public void Start()
        {
            foreach (var server in _listHosts) server.Start();
        }
        /// <summary>
        /// 停止服務
        /// </summary>
        static public void Stop()
        {
            foreach (var server in _listHosts) server.Stop();
        }
        #endregion
    }

從上面的代碼中,我們看到了自己新加的協議DSSBinary,我們可以在配置文件中對它進行配置,方法和之前說的一樣,在這裡就不再重復了。

感謝各位的閱讀!

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved