程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> C++多進程並發框架

C++多進程並發框架

編輯:C++入門知識

三年來一直從事服務器程序開發,一直都是忙忙碌碌,不久前結束了職業生涯的第一份工作,有了一個禮拜的休息時間,終於可以寫寫總結了。於是把以前的開源代碼做了整理和優化,這就是FFLIB。雖然這邊總結看起來像日記,有很多廢話,但是此文仍然是有很大針對性的。針對服務器開發中常見的問題,如多線程並發、消息轉發、異步、性能優化、單元測試,提出自己的見解。
面對的問題
從事開發工程中,遇到過不少問題,很多時候由於時間緊迫,沒有使用優雅的方案。在跟業內的一些朋友交流過程中,我也意識到有些問題是大家都存在的。簡單列舉如下:
●多線程與並發
●異步消息/接口調用
●消息的序列化與Reflection
●性能優化
●單元測試
多線程與並發
現在是多核時代,並發才能實現更高的吞吐量、更快的響應,但也是把雙刃劍。總結如下幾個用法:
●多線程+顯示鎖;接口是被多線程調用的,當被調用時,顯示加鎖,再操作實體數據。悲劇的是,工程師為了優化會設計多個鎖,以減少鎖的粒度,甚至有些地方使用了原子操作。這些都為領域邏輯增加了額外的設計負擔。最壞的情況是會出現死鎖。
●多線程+任務隊列;接口被多線程調用,但請求會被暫存到任務隊列,而任務隊列會被單線程不斷執行,典型生產者消費者模式。它的並發在於不同的接口可以使用不同的任務隊列。這也是我最常用的並發方式。
這是兩種最常見的多線程並發,它們有個天生的缺陷——Scalability。一個機器的性能總是有瓶頸的。兩個場景的邏輯雖然由多個線程實現了並發,但是運算量十分有可能是一台機器無法承載的。如果是多進程並發,那麼可以分布式把其部署到其他機器(也可部署在一台機器)。所以多進程並發比多線程並發更加Scalability。另外采用多進程後,每個進程單線程設計,這樣的程序更加Simplicity。多進程的其他優點如解耦、模塊化、方便調試、方便重用等就不贅言了。
異步消息/接口調用
提到分布式,就要說一下分布式的通訊技術。常用的方式如下:
●類RPC;包括WebService、RPC、ICE等,特點是遠程同步調用。遠程的接口和本地的接口非常相似。但是游戲服務器程序一般非常在意延遲和吞吐量,所以這些阻塞線程的同步遠程調用方式並不常用。但是我們必須意識到他的優點,就是非常利於調用和測試。
●全異步消息;當調用遠程接口的時候,異步發送請求消息,接口響應後返回一個結果消息,調用方的回調函數處理結果消息繼續邏輯操作。所以有些邏輯就會被切割成ServiceStart和ServiceCallback兩段。有時異步會講領域邏輯變得支離破碎。另外消息處理函數中一般會寫一坨的switch/case 處理不同的消息。最大的問題在於單元測試,這種情況傳統單元測試根本束手無策。
消息的序列化與Reflection
實現消息的序列化和反序列化的方式有很多,常見的有Struct、json、Protobuff等都有很成功的應用。我個人傾向於使用輕量級的二進制序列化,優點是比較透明和高效,一切在掌握之中。在FFLIB 中實現了bin_encoder_t 和 bin_decoder_t 輕量級的消息序列化,幾十行代碼而已。
性能優化
已經寫過關於性能方面的總結,參見
http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html
有的網友提到profiler、cpuprofiler、callgrind等工具。這些工具我都使用過,說實話,對於我來說,我太認同它有很高的價值。第一他們只能用於開發測試階段,可以初步得到一些性能上參考數據。第二它們如何實現跟蹤人們無從得知。運行其會使程序變慢,不能反映真實數據。第三重要的是,開發測試階段性能和上線後的能一樣嗎?Impossible !
關於性能,原則就是數據說話,詳見博文,不在贅述。
單元測試
關於單元測試,前邊已經談論了一些。游戲服務器程序一般都比較龐大,但是不可思議的是,鄙人從來沒見有項目(c++ 後台架構的)有完整單元測試的。由於存在著異步和多線程,傳統的單元測試框架無法勝任,而開發支持異步的測試框架又是不現實的。我們必須看到的是,傳統的單元測試框架已經取得了非常大的成功。據我了解,使用web 架構的游戲後台已經對於單元測試的使用已經非常成熟,取得了極其好的效果。所以我的思路是利用現有的單元測試框架,將異步消息、多線程的架構做出調整。
已經多次談論單元測試了。其實在開發FFLIB的思路很大程度來源於此,否則可能只是一個c++ 網絡庫而已。我決定嘗試去解決這個問題的時候,把FFLIB 定位於框架。
先來看一段非常簡單的單元測試的代碼 :
1
Assert(2 == Add(1, 1));
請允許我對這行代碼做些解釋,對Add函數輸入參數,驗證返回值是否是預期的結果。這不就是單元測試的本質嗎?在想一下我們異步發送消息的過程,如果每個輸入消息約定一個結果消息包,每次發送請求時都綁定一個回調函數接收和驗證結果消息包。這樣的話就恰恰滿足了傳統單元測試的步驟了。最後還需解決一個問題,Assert是不能處理異步的返回值的。幸運的是,future機制可以化異步為同步。不了解future 模式的可以參考這裡:
http://blog.chinaunix.net/uid-23093301-id-190969.html
http://msdn.microsoft.com/zh-cn/library/dd764564.aspx#Y300
來看一下在FFLIB框架下遠程調用echo 服務的示例:

struct lambda_t
{
  static void callback(echo_t::out_t& msg_)
  {
    echo_t::in_t in;
    in.value = "XXX_echo_test_XXX";
    singleton_t<msg_bus_t>::instance()
       .get_service_group("echo")
       ->get_service(1)->async_call(in, &lambda_t::callback);
  }
};
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
singleton_t<msg_bus_t>::instance().get_service_group("echo")->get_service(1)->async_call(in, &lambda_t::callback);
當需要調用遠程接口時,async_call(in, &lambda_t::callback); 異步調用必須綁定一個回調函數,回調函數接收結果消息,可以觸發後續操作。這樣的話,如果對echo 的遠程接口做單元測試,可以這樣做:

rpc_future_t< echo_t::out_t> rpc_future;
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
const echo_t::out_t& out = rpc_future.call(
    singleton_t<msg_bus_t>::instance()
        .get_service_group("echo")->get_service(1), in);
Assert(in.value == out.value);
這樣所有的遠程接口都可以被單元測試覆蓋。
FFLIB 介紹
FFLIB 結構圖
C++多進程並發框架

如圖所示,Client 不會直接和Service 相連接,而是通過Broker 中間層完成了消息傳遞。關於Broker 模式可以參見:http://blog.chinaunix.net/uid-23093301-id-90459.html
進程間通信采用TPC,而不是多線程使用的共享內存方式。Service 一般是單線程架構的,通過啟動多進程實現相對於多線程的並發。由於Broker模式天生石分布式的,所以有很好的Scalability。
消息時序圖
C++多進程並發框架

如何注冊服務和接口
來看一下Echo 服務的實現:

struct echo_service_t
{
public:
    void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)
    {
        logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
        echo_t::out_t out;
        out.value = in_msg_.value;
        cb_(out);
    }
};
 
int main(int argc, char* argv[])
{
    int g_index = 1;
    if (argc > 1)
    {
        g_index = atoi(argv[1]);
    }
    char buff[128];
    snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");
 
    msg_bus_t msg_bus;
    assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");
 
    echo_service_t f;
 
    singleton_t<msg_bus_t>::instance().create_service_group("echo");
    singleton_t<msg_bus_t>::instance().create_service("echo", g_index)
            .bind_service(&f)
            .reg(&echo_service_t::echo);
 
    signal_helper_t::wait();
 
    singleton_t<msg_bus_t>::instance().close();
    //usleep(1000);
    cout <<"\noh end\n";
    return 0;
}
●create_service_group 創建一個服務group,一個服務組可能有多個並行的實例
●create_service 以特定的id 創建一個服務實例
●reg 為該服務注冊接口
●接口的定義規范為void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),第一個參數為輸入的消息struct,第二個參數為回調函數的模板特例,模板參數為返回消息的struct 類型。接口無需知道發送消息等細節,只需將結果callback 即可。
●注冊到Broker 後,所有Client都可獲取該服務
消息定義的規范
我們約定每個接口(遠程或本地都應滿足)都包含一個輸入消息和一個結果消息。來看一下echo 服務的消息定義:

struct echo_t
{
    struct in_t: public msg_i
    {
        in_t():
            msg_i("echo_t::in_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }
 
        string value;
    };
    struct out_t: public msg_i
    {
        out_t():
            msg_i("echo_t::out_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }
 
        string value;
    };
};
●每個接口必須包含in_t消息和out_t消息,並且他們定義在接口名(如echo _t)的內部
●所有消息都繼承於msg_i, 其封裝了二進制的序列化、反序列化等。構造時賦予類型名作為消息的名稱。
●每個消息必須實現encode 和 decode 函數
這裡需要指出的是,FFLIB 中不需要為每個消息定義對應的CMD。當接口如echo向Broker 注冊時,reg接口通過C++ 模板的類型推斷會自動將該msg name 注冊給Broker, Broker為每個msg name 分配唯一的msg_id。Msg_bus 中自動維護了msg_name 和msg_id 的映射。Msg_i 的定義如下:

struct msg_i : public codec_i
{
    msg_i(const char* msg_name_):
        cmd(0),
        uuid(0),
        service_group_id(0),
        service_id(0),
        msg_id(0),
        msg_name(msg_name_)
    {}
 
    void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
    {
        service_group_id = group_id;
        service_id       = id_;
        uuid             = uuid_;
        msg_id           = msg_id_;
    }
 
    uint16_t cmd;
    uint16_t get_group_id()   const{ return service_group_id; }
    uint16_t get_service_id() const{ return service_id;       }
    uint32_t get_uuid()       const{ return uuid;             }
 
    uint16_t get_msg_id()     const{ return msg_id;           }
    const string& get_name()  const
    {
        if (msg_name.empty() == false)
        {
            return msg_name;
        }
        return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());
    }
 
    void     set_uuid(uint32_t id_)   { uuid = id_;  }
    void     set_msg_id(uint16_t id_) { msg_id = id_;}
    void     set_sgid(uint16_t sgid_) { service_group_id = sgid_;}
    void     set_sid(uint16_t sid_)   { service_id = sid_; }
    uint32_t uuid;
    uint16_t service_group_id;
    uint16_t service_id;
    uint16_t msg_id;
    string   msg_name;
 
    virtual string encode(uint16_t cmd_)
    {
        this->cmd = cmd_;
        return encode();
    }
    virtual string encode() = 0;
    bin_encoder_t& init_encoder()
    {
        return encoder.init(cmd)  << uuid << service_group_id << service_id<< msg_id;
    }
    bin_encoder_t& init_encoder(uint16_t cmd_)
    {
        return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;
    }
    bin_decoder_t& init_decoder(const string& buff_)
    {
        return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;
    }
    bin_decoder_t decoder;
    bin_encoder_t encoder;
};
關於性能
由於遠程接口的調用必須通過Broker, Broker會為每個接口自動生成性能統計數據,並每10分鐘輸出到perf.txt 文件中。文件格式為CSV,參見:
http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html
總結
FFLIB框架擁有如下的特點:
●使用多進程並發。Broker 把Client 和Service 的位置透明化
●Service 的接口要注冊到Broker, 所有連接Broker的Client 都可以調用(publisher/ subscriber)
●遠程調用必須綁定回調函數
●利用future 模式實現同步,從而支持單元測試
●消息定義規范簡單直接高效
●所有service的接口性能監控數據自動生成,免費的午餐
●Service 單線程話,更simplicity
源代碼:
Svn co http://ffown.googlecode.com/svn/trunk/
運行示例:
●Cd example/broker && make && ./app_broker –l http://127.0.0.1:10241
●Cd example/echo_server && make && ./app_echo_server
●Cd example/echo_client && make && ./app_echo_client

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved