程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 基於libuv的TCP設計(一)

基於libuv的TCP設計(一)

編輯:C++入門知識

本人一直在尋找一個跨平台的網絡庫,boost與ACE比較龐大,不考慮。對比了libevent,libev,libuv後,最終選擇了libuv.可libuv文檔少,例子也簡單,對於tcp只有個echo-server的例子。網上也找過對其封裝的例子,如下

 

libsourcey庫,封裝了許多庫。對libuv的封裝跟其他代碼耦合比較緊,難為剝離 http://sourcey.com/libuv-cpp-wrappers/
C++11封裝的,可惜VS10未完全支持C++11 https://github.com/larroy/uvpp
C++封裝的  https://github.com/keepallsimple/uvpp

       本人想實現一個raw tcp server,支持上萬鏈接數的,網上找到的都沒合適我的,沒辦法只能參照各例子自己封裝了。

頭文件

/***************************************

* @file tcpsocket.h

* @brief 基於libuv封裝的tcp服務器與客戶端,使用log4z作日志工具

* @details

* @author 陳吉宏, [email protected]

* @date 2014-5-13

* @mod 2014-5-13 phata 修正服務器與客戶端的錯誤.現服務器支持多客戶端連接

修改客戶端測試代碼,支持並發多客戶端測試

****************************************/

#ifndef TCPSocket_H

#define TCPSocket_H

#include "uv.h"

#include <string>

#include <list>

#include <map>

#define BUFFERSIZE (1024*1024)

 

namespace uv

{

typedef void (*newconnect)(int clientid);

typedef void (*server_recvcb)(int cliendid, const char* buf, int bufsize);

typedef void (*client_recvcb)(const char* buf, int bufsize, void* userdata);

 

class TCPServer;

class clientdata

{

public:

clientdata(int clientid):client_id(clientid),recvcb_(nullptr) {

client_handle = (uv_tcp_t*)malloc(sizeof(*client_handle));

client_handle->data = this;

readbuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);

        writebuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);

}

virtual ~clientdata() {

free(readbuffer.base);

readbuffer.base = nullptr;

readbuffer.len = 0;

 

        free(writebuffer.base);

        writebuffer.base = nullptr;

        writebuffer.len = 0;

 

free(client_handle);

client_handle = nullptr;

}

int client_id;//客戶端id,惟一

uv_tcp_t* client_handle;//客戶端句柄

TCPServer* tcp_server;//服務器句柄(保存是因為某些回調函數需要到)

uv_buf_t readbuffer;//接受數據的buf

    uv_buf_t writebuffer;//寫數據的buf

uv_write_t write_req;

server_recvcb recvcb_;//接收數據回調給用戶的函數

};

 

 

class TCPServer

{

public:

TCPServer(uv_loop_t* loop = uv_default_loop());

virtual ~TCPServer();

static void StartLog(const char* logpath = nullptr);//啟動日志,必須啟動才會生成日志

public:

//基本函數

bool Start(const char *ip, int port);//啟動服務器,地址為IP4

bool Start6(const char *ip, int port);//啟動服務器,地址為IP6

void close();

 

bool setNoDelay(bool enable);

bool setKeepAlive(int enable, unsigned int delay);

 

const char* GetLastErrMsg() const {

return errmsg_.c_str();

};

 

virtual int send(int clientid, const char* data, std::size_t len);

virtual void setnewconnectcb(newconnect cb);

virtual void setrecvcb(int clientid,server_recvcb cb);//設置接收回調函數,每個客戶端各有一個

protected:

int GetAvailaClientID()const;//獲取可用的client id

bool DeleteClient(int clientid);//刪除鏈表中的客戶端

//靜態回調函數

static void AfterServerRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);

static void AfterSend(uv_write_t *req, int status);

static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);

static void AfterServerClose(uv_handle_t *handle);

    static void AfterClientClose(uv_handle_t *handle);

static void acceptConnection(uv_stream_t *server, int status);

 

private:

bool init();

bool run(int status = UV_RUN_DEFAULT);

bool bind(const char* ip, int port);

bool bind6(const char* ip, int port);

bool listen(int backlog = 1024);

 

 

uv_tcp_t server_;//服務器鏈接

std::map<int,clientdata*> clients_list_;//子客戶端鏈接

    uv_mutex_t mutex_handle_;//保護clients_list_

uv_loop_t *loop_;

std::string errmsg_;

newconnect newconcb_;

    bool isinit_;//是否已初始化,用於close函數中判斷

};

 

 

 

class TCPClient

{

//直接調用connect/connect6會進行連接

public:

TCPClient(uv_loop_t* loop = uv_default_loop());

virtual ~TCPClient();

static void StartLog(const char* logpath = nullptr);//啟動日志,必須啟動才會生成日志

public:

//基本函數

virtual bool connect(const char* ip, int port);//啟動connect線程,循環等待直到connect完成

virtual bool connect6(const char* ip, int port);//啟動connect線程,循環等待直到connect完成

virtual int send(const char* data, std::size_t len);

virtual void setrecvcb(client_recvcb cb, void* userdata);////設置接收回調函數,只有一個

void close();

 

//是否啟用Nagle算法

bool setNoDelay(bool enable);

bool setKeepAlive(int enable, unsigned int delay);

 

const char* GetLastErrMsg() const {

return errmsg_.c_str();

};

protected:

//靜態回調函數

static void AfterConnect(uv_connect_t* handle, int status);

static void AfterClientRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);

static void AfterSend(uv_write_t *req, int status);

static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);

static void AfterClose(uv_handle_t *handle);

 

static void ConnectThread(void* arg);//真正的connect線程

static void ConnectThread6(void* arg);//真正的connect線程

 

bool init();

bool run(int status = UV_RUN_DEFAULT);

private:

enum {

CONNECT_TIMEOUT,

CONNECT_FINISH,

CONNECT_ERROR,

CONNECT_DIS,

};

uv_tcp_t client_;//客戶端連接

uv_loop_t *loop_;

uv_write_t write_req_;//寫時請求

uv_connect_t connect_req_;//連接時請求

uv_thread_t connect_threadhanlde_;//線程句柄

std::string errmsg_;//錯誤信息

uv_buf_t readbuffer_;//接受數據的buf

    uv_buf_t writebuffer_;//寫數據的buf

    uv_mutex_t write_mutex_handle_;//保護write,保存前一write完成才進行下一write

 

int connectstatus_;//連接狀態

client_recvcb recvcb_;//回調函數

void* userdata_;//回調函數的用戶數據

std::string connectip_;//連接的服務器IP

int connectport_;//連接的服務器端口號

    bool isinit_;//是否已初始化,用於close函數中判斷

};

 

}

 

 

#endif // TCPSocket_H

 

源文件

#include "tcpsocket.h"

#include "log4z.h"

 

std::string GetUVError(int retcode)

{

    std::string err;

    err = uv_err_name(retcode);

    err +=":";

    err += uv_strerror(retcode);

    return std::move(err);

}

 

namespace uv

{

    /*****************************************TCP Server*************************************************************/

    TCPServer::TCPServer(uv_loop_t* loop)

        :newconcb_(nullptr), isinit_(false)

    {

        loop_ = loop;

    }

 

 

    TCPServer::~TCPServer()

    {

        close();

        LOGI("tcp server exit.");

    }

 

    //初始化與關閉--服務器與客戶端一致

    bool TCPServer::init()

    {

        if (isinit_) {

            return true;

        }

 

        if (!loop_) {

            errmsg_ = "loop is null on tcp init.";

            LOGE(errmsg_);

            return false;

        }

        int iret = uv_mutex_init(&mutex_handle_);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        iret = uv_tcp_init(loop_,&server_);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        isinit_ = true;

        server_.data = this;

        //iret = uv_tcp_keepalive(&server_, 1, 60);//調用此函數後後續函數會調用出錯

        //if (iret) {

        //    errmsg_ = GetUVError(iret);

        //    return false;

        //}

        return true;

    }

 

    void TCPServer::close()

    {

        for (auto it = clients_list_.begin(); it!=clients_list_.end(); ++it) {

            auto data = it->second;

            uv_close((uv_handle_t*)data->client_handle,AfterClientClose);

        }

        clients_list_.clear();

 

        LOGI("close server");

        if (isinit_) {

            uv_close((uv_handle_t*) &server_, AfterServerClose);

            LOGI("close server");

        }

        isinit_ = false;

        uv_mutex_destroy(&mutex_handle_);

    }

 

    bool TCPServer::run(int status)

    {

        LOGI("server runing.");

        int iret = uv_run(loop_,(uv_run_mode)status);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

    //屬性設置--服務器與客戶端一致

    bool TCPServer::setNoDelay(bool enable)

    {

        int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    bool TCPServer::setKeepAlive(int enable, unsigned int delay)

    {

        int iret = uv_tcp_keepalive(&server_, enable , delay);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    //作為server時的函數

    bool TCPServer::bind(const char* ip, int port)

    {

        struct sockaddr_in bind_addr;

        int iret = uv_ip4_addr(ip, port, &bind_addr);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        LOGI("server bind ip="<<ip<<", port="<<port);

        return true;

    }

 

    bool TCPServer::bind6(const char* ip, int port)

    {

        struct sockaddr_in6 bind_addr;

        int iret = uv_ip6_addr(ip, port, &bind_addr);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        LOGI("server bind ip="<<ip<<", port="<<port);

        return true;

    }

 

    bool TCPServer::listen(int backlog)

    {

        int iret = uv_listen((uv_stream_t*) &server_, backlog, acceptConnection);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        LOGI("server listen");

        return true;

    }

 

    bool TCPServer::Start( const char *ip, int port )

    {

        close();

        if (!init()) {

            return false;

        }

        if (!bind(ip,port)) {

            return false;

        }

        if (!listen(SOMAXCONN)) {

            return false;

        }

        if (!run()) {

            return false;

        }

        LOGI("start listen "<<ip<<": "<<port);

        return true;

    }

 

    bool TCPServer::Start6( const char *ip, int port )

    {

        close();

        if (!init()) {

            return false;

        }

        if (!bind6(ip,port)) {

            return false;

        }

        if (!listen(SOMAXCONN)) {

            return false;

        }

        if (!run()) {

            return false;

        }

        return true;

    }

 

    //服務器發送函數

    int TCPServer::send(int clientid, const char* data, std::size_t len)

    {

        auto itfind = clients_list_.find(clientid);

        if (itfind == clients_list_.end()) {

            errmsg_ = "can't find cliendid ";

            errmsg_ += std::to_string((long long)clientid);

            LOGE(errmsg_);

            return -1;

        }

        //自己控制data的生命周期直到write結束

        if (itfind->second->writebuffer.len < len) {

            itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base,len);

            itfind->second->writebuffer.len = len;

        }

        memcpy(itfind->second->writebuffer.base,data,len);

        uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base,len);

        int iret = uv_write(&itfind->second->write_req, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    //服務器-新客戶端函數

    void TCPServer::acceptConnection(uv_stream_t *server, int status)

    {

        if (!server->data) {

            return;

        }

        TCPServer *tcpsock = (TCPServer *)server->data;

        int clientid = tcpsock->GetAvailaClientID();

        clientdata* cdata = new clientdata(clientid);//uv_close回調函數中釋放

        cdata->tcp_server = tcpsock;//保存服務器的信息

        int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//析構函數釋放

        if (iret) {

            delete cdata;

            tcpsock->errmsg_ = GetUVError(iret);

            LOGE(tcpsock->errmsg_);

            return;

        }

        iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*) cdata->client_handle);

        if ( iret) {

            tcpsock->errmsg_ = GetUVError(iret);

            uv_close((uv_handle_t*) cdata->client_handle, NULL);

            delete cdata;

            LOGE(tcpsock->errmsg_);

            return;

        }

        tcpsock->clients_list_.insert(std::make_pair(clientid,cdata));//加入到鏈接隊列

        if (tcpsock->newconcb_) {

            tcpsock->newconcb_(clientid);

        }

        LOGI("new client("<<cdata->client_handle<<") id="<< clientid);

        iret = uv_read_start((uv_stream_t*)cdata->client_handle, onAllocBuffer, AfterServerRecv);//服務器開始接收客戶端的數據

        return;

    }

 

    //服務器-接收數據回調函數

    void TCPServer::setrecvcb(int clientid, server_recvcb cb )

    {

        auto itfind = clients_list_.find(clientid);

        if (itfind != clients_list_.end()) {

            itfind->second->recvcb_ = cb;

        }

    }

 

    //服務器-新鏈接回調函數

    void TCPServer::setnewconnectcb(newconnect cb )

    {

        newconcb_ = cb;

    }

 

    //服務器分析空間函數

    void TCPServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)

    {

        if (!handle->data) {

            return;

        }

        clientdata *client = (clientdata*)handle->data;

        *buf = client->readbuffer;

    }

 

    void TCPServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)

    {

        if (!handle->data) {

            return;

        }

        clientdata *client = (clientdata*)handle->data;//服務器的recv帶的是clientdata

        if (nread < 0) {/* Error or EOF */

            TCPServer *server = (TCPServer *)client->tcp_server;

            if (nread == UV_EOF) {

                fprintf(stdout,"客戶端(%d)連接斷開,關閉此客戶端\n",client->client_id);

                LOGW("客戶端("<<client->client_id<<")主動斷開");

            } else if (nread == UV_ECONNRESET) {

                fprintf(stdout,"客戶端(%d)異常斷開\n",client->client_id);

                LOGW("客戶端("<<client->client_id<<")異常斷開");

            } else {

                fprintf(stdout,"%s\n",GetUVError(nread));

                LOGW("客戶端("<<client->client_id<<")異常斷開:"<<GetUVError(nread));

            }

            server->DeleteClient(client->client_id);//連接斷開,關閉客戶端

            return;

        } else if (0 == nread) {/* Everything OK, but nothing read. */

 

        } else if (client->recvcb_) {

            client->recvcb_(client->client_id,buf->base,nread);

        }

    }

 

    //服務器與客戶端一致

    void TCPServer::AfterSend(uv_write_t *req, int status)

    {

        if (status < 0) {

            LOGE("發送數據有誤:"<<GetUVError(status));

            fprintf(stderr, "Write error %s\n", GetUVError(status));

        }

    }

 

    void TCPServer::AfterServerClose(uv_handle_t *handle)

    {

        //服務器,不需要做什麼

    }

 

    void TCPServer::AfterClientClose(uv_handle_t *handle)

    {

        clientdata *cdata = (clientdata*)handle->data;

        LOGI("client "<<cdata->client_id<<" had closed.");

        delete cdata;

    }

 

    int TCPServer::GetAvailaClientID() const

    {

        static int s_id = 0;

        return ++s_id;

    }

 

    bool TCPServer::DeleteClient( int clientid )

    {

        uv_mutex_lock(&mutex_handle_);

        auto itfind = clients_list_.find(clientid);

        if (itfind == clients_list_.end()) {

            errmsg_ = "can't find client ";

            errmsg_ += std::to_string((long long)clientid);

            LOGE(errmsg_);

            uv_mutex_unlock(&mutex_handle_);

            return false;

        }

        if (uv_is_active((uv_handle_t*)itfind->second->client_handle)) {

            uv_read_stop((uv_stream_t*)itfind->second->client_handle);

        }

        uv_close((uv_handle_t*)itfind->second->client_handle,AfterClientClose);

 

        clients_list_.erase(itfind);

        LOGI("刪除客戶端"<<clientid);

        uv_mutex_unlock(&mutex_handle_);

        return true;

    }

 

 

    void TCPServer::StartLog( const char* logpath /*= nullptr*/ )

    {

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);

        if (logpath) {

            zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);

        }

        zsummer::log4z::ILog4zManager::GetInstance()->Start();

    }

 

 

    /*****************************************TCP Client*************************************************************/

    TCPClient::TCPClient(uv_loop_t* loop)

        :recvcb_(nullptr),userdata_(nullptr)

        ,connectstatus_(CONNECT_DIS)

        , isinit_(false)

    {

        readbuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);

        writebuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);

        loop_ = loop;

        connect_req_.data = this;

        write_req_.data = this;

    }

 

 

    TCPClient::~TCPClient()

    {

        free(readbuffer_.base);

        readbuffer_.base = nullptr;

        readbuffer_.len = 0;

        free(writebuffer_.base);

        writebuffer_.base = nullptr;

        writebuffer_.len = 0;

        close();

        LOGI("客戶端("<<this<<")退出");

    }

    //初始化與關閉--服務器與客戶端一致

    bool TCPClient::init()

    {

        if (isinit_) {

            return true;

        }

 

        if (!loop_) {

            errmsg_ = "loop is null on tcp init.";

            LOGE(errmsg_);

            return false;

        }

        int iret = uv_tcp_init(loop_,&client_);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        iret = uv_mutex_init(&write_mutex_handle_);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        isinit_ = true;

        fprintf(stdout,"客戶端(%p) init type = %d\n",&client_,client_.type);

        client_.data = this;

        //iret = uv_tcp_keepalive(&client_, 1, 60);//

        //if (iret) {

        // errmsg_ = GetUVError(iret);

        // return false;

        //}

        LOGI("客戶端("<<this<<")Init");

        return true;

    }

 

    void TCPClient::close()

    {

        if (!isinit_) {

            return;

        }

        uv_mutex_destroy(&write_mutex_handle_);

        uv_close((uv_handle_t*) &client_, AfterClose);

        LOGI("客戶端("<<this<<")close");

        isinit_ = false;

    }

 

    bool TCPClient::run(int status)

    {

        LOGI("客戶端("<<this<<")run");

        int iret = uv_run(loop_,(uv_run_mode)status);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    //屬性設置--服務器與客戶端一致

    bool TCPClient::setNoDelay(bool enable)

    {

        //http://blog.csdn.net/u011133100/article/details/21485983

        int iret = uv_tcp_nodelay(&client_, enable ? 1 : 0);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    bool TCPClient::setKeepAlive(int enable, unsigned int delay)

    {

        int iret = uv_tcp_keepalive(&client_, enable , delay);

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    //作為client的connect函數

    bool TCPClient::connect(const char* ip, int port)

    {

        close();

        init();

        connectip_ = ip;

        connectport_ = port;

        LOGI("客戶端("<<this<<")start connect to server("<<ip<<":"<<port<<")");

        int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread, this);//觸發AfterConnect才算真正連接成功,所以用線程

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        while ( connectstatus_ == CONNECT_DIS) {

#if defined (WIN32) || defined(_WIN32)

            Sleep(100);

#else

            usleep((100) * 1000)

#endif

        }

        return connectstatus_ == CONNECT_FINISH;

    }

 

    bool TCPClient::connect6(const char* ip, int port)

    {

        close();

        init();

        connectip_ = ip;

        connectport_ = port;

        LOGI("客戶端("<<this<<")start connect to server("<<ip<<":"<<port<<")");

        int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread6, this);//觸發AfterConnect才算真正連接成功,所以用線程

        if (iret) {

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        while ( connectstatus_ == CONNECT_DIS) {

            //fprintf(stdout,"client(%p) wait, connect status %d\n",this,connectstatus_);

#if defined (WIN32) || defined(_WIN32)

            Sleep(100);

#else

            usleep((100) * 1000)

#endif

        }

        return connectstatus_ == CONNECT_FINISH;

    }

 

    void TCPClient::ConnectThread( void* arg )

    {

        TCPClient *pclient = (TCPClient*)arg;

        fprintf(stdout,"client(%p) ConnectThread start\n",pclient);

        struct sockaddr_in bind_addr;

        int iret = uv_ip4_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);

        if (iret) {

            pclient->errmsg_ = GetUVError(iret);

            LOGE(pclient->errmsg_);

            return;

        }

        iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);

        if (iret) {

            pclient->errmsg_ = GetUVError(iret);

            LOGE(pclient->errmsg_);

            return;

        }

        fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);

        pclient->run();

    }

 

 

    void TCPClient::ConnectThread6( void* arg )

    {

        TCPClient *pclient = (TCPClient*)arg;

        LOGI("客戶端("<<pclient<<")Enter Connect Thread.");

        fprintf(stdout,"client(%p) ConnectThread start\n",pclient);

        struct sockaddr_in6 bind_addr;

        int iret = uv_ip6_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);

        if (iret) {

            pclient->errmsg_ = GetUVError(iret);

            LOGE(pclient->errmsg_);

            return;

        }

        iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);

        if (iret) {

            pclient->errmsg_ = GetUVError(iret);

            LOGE(pclient->errmsg_);

            return;

        }

        fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);

        LOGI("客戶端("<<pclient<<")Leave Connect Thread. connect status "<<pclient->connectstatus_);

        pclient->run();

    }

 

    void TCPClient::AfterConnect(uv_connect_t* handle, int status)

    {

        fprintf(stdout,"start after connect\n");

        TCPClient *pclient = (TCPClient*)handle->handle->data;

        if (status) {

            pclient->connectstatus_ = CONNECT_ERROR;

            fprintf(stdout,"connect error:%s\n",GetUVError(status));

            return;

        }

 

        int iret = uv_read_start(handle->handle, onAllocBuffer, AfterClientRecv);//客戶端開始接收服務器的數據

        if (iret) {

            fprintf(stdout,"uv_read_start error:%s\n",GetUVError(iret));

            pclient->connectstatus_ = CONNECT_ERROR;

        } else {

            pclient->connectstatus_ = CONNECT_FINISH;

        }

        LOGI("客戶端("<<pclient<<")run");

        fprintf(stdout,"end after connect\n");

    }

 

    //客戶端的發送函數

    int TCPClient::send(const char* data, std::size_t len)

    {

        //自己控制data的生命周期直到write結束

        if (!data || len <= 0) {

            errmsg_ = "send data is null or len less than zero.";

            return 0;

        }

 

        uv_mutex_lock(&write_mutex_handle_);

        if (writebuffer_.len < len) {

            writebuffer_.base = (char*)realloc(writebuffer_.base,len);

            writebuffer_.len = len;

        }

        memcpy(writebuffer_.base,data,len);

        uv_buf_t buf = uv_buf_init((char*)writebuffer_.base,len);

        int iret = uv_write(&write_req_, (uv_stream_t*)&client_, &buf, 1, AfterSend);

        if (iret) {

            uv_mutex_unlock(&write_mutex_handle_);

            errmsg_ = GetUVError(iret);

            LOGE(errmsg_);

            return false;

        }

        return true;

    }

 

    //客戶端-接收數據回調函數

    void TCPClient::setrecvcb(client_recvcb cb, void* userdata )

    {

        recvcb_ = cb;

        userdata_ = userdata;

    }

 

    //客戶端分析空間函數

    void TCPClient::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)

    {

        if (!handle->data) {

            return;

        }

        TCPClient *client = (TCPClient*)handle->data;

        *buf = client->readbuffer_;

    }

 

 

    void TCPClient::AfterClientRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)

    {

        if (!handle->data) {

            return;

        }

        TCPClient *client = (TCPClient*)handle->data;//服務器的recv帶的是TCPClient

        if (nread < 0) {

            if (nread == UV_EOF) {

                fprintf(stdout,"服務器(%p)主動斷開\n",handle);

                LOGW("服務器主動斷開");

            } else if (nread == UV_ECONNRESET) {

                fprintf(stdout,"服務器(%p)異常斷開\n",handle);

                LOGW("服務器異常斷開");

            } else {

                fprintf(stdout,"服務器(%p)異常斷開:%s\n",handle,GetUVError(nread));

                LOGW("服務器異常斷開"<<GetUVError(nread));

            }

            uv_close((uv_handle_t*)handle, AfterClose);

            return;

        }

        if (nread > 0 && client->recvcb_) {

            client->recvcb_(buf->base,nread,client->userdata_);

        }

    }

 

    //服務器與客戶端一致

    void TCPClient::AfterSend(uv_write_t *req, int status)

    {

        TCPClient *client = (TCPClient *)req->handle->data;

        uv_mutex_unlock(&client->write_mutex_handle_);

        if (status < 0) {

            LOGE("發送數據有誤:"<<GetUVError(status));

            fprintf(stderr, "Write error %s\n", GetUVError(status));

        }

    }

    //服務器與客戶端一致

    void TCPClient::AfterClose(uv_handle_t *handle)

    {

        fprintf(stdout,"客戶端(%p)已close\n",handle);

        LOGI("客戶端("<<handle<<")已close");

    }

 

    void TCPClient::StartLog( const char* logpath /*= nullptr*/ )

    {

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);

        zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);

        if (logpath) {

            zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);

        }

        zsummer::log4z::ILog4zManager::GetInstance()->Start();

    }

 

}

 

代碼已上傳到git: https://github.com/wqvbjhc/libuv_tcp

按照例子,客戶端可以並發20多路,超過uv_write就會assert出錯,未找到原因

服務器可以接收幾十路連接。萬百上千路未測試過,因為沒有模擬環境。

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved