程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> Swoole源碼學習記錄(十四)——Server模塊詳解(下)

Swoole源碼學習記錄(十四)——Server模塊詳解(下)

編輯:C++入門知識

Swoole源碼學習記錄(十四)——Server模塊詳解(下)


 


1.swServer函數分析

swServer_addListener

該函數用於在swServer中添加一個需要監聽的host及port。函數原型如下:

// Server.h 438h
int swServer_addListener(swServer *serv, int type, char *host,int port);
參數 說明 swServer *serv swServer對象 int type 創建的socket類型,見枚舉swSocket_type char* host 監聽地址 int port 監聽端口

函數核心源碼:

    // Server.c 900~943h
    swListenList_node *listen_host = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenList_node));

    listen_host->type = type;
    listen_host->port = port;
    listen_host->sock = 0;
    listen_host->ssl = 0;

    bzero(listen_host->host, SW_HOST_MAXSIZE);
    strncpy(listen_host->host, host, SW_HOST_MAXSIZE);
    LL_APPEND(serv->listen_list, listen_host);

    //UDP需要提前創建好
    if (type == SW_SOCK_UDP || type == SW_SOCK_UDP6 || type == SW_SOCK_UNIX_DGRAM)
    {
        int sock = swSocket_listen(type, listen_host->host, port, serv->backlog);
        if (sock < 0)
        {
            return SW_ERR;
        }
        //設置UDP緩存區尺寸,高並發UDP服務器必須設置
        int bufsize = serv->udp_sock_buffer_size;
        setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
        setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));

        listen_host->sock = sock;
        serv->have_udp_sock = 1;
    }
    else
    {
        if (type & SW_SOCK_SSL)
        {
            type = type & (~SW_SOCK_SSL);
            listen_host->type = type;
            listen_host->ssl = 1;
        }
        if (type != SW_SOCK_UNIX_STREAM && port <= 0)
        {
            swError(listen port must greater than 0.);
            return SW_ERR;
        }
        serv->have_tcp_sock = 1;
    }
    return SW_OK;

源碼解釋:
在SwooleG的共享內存池中創建一個swListenList_node,並設置type等相關參數,並將該node添加進swServer的listen_list。隨後,判定type類型。如果是UDP類型的socket,需要直接調用swSocket_listen進行監聽,並根據swServer的udp_sock_buffer_size設置socket的緩存區大小;如果是TCP類型的socket,只針對兩種特別的type類型做判定(SSL類型要設置ssl開關,非Unix Sock類型要保證監聽端口大於0)。

swServer_listen

該函數用於開始監聽swServer中全部的TCP類型的socket。函數原型如下:

// Server.h 440h
int swServer_listen(swServer *serv, swReactor *reactor);
參數 說明 swServer *serv swServer對象 swReactor *reactor Reactor對象,監聽實際的Listen事件

函數核心源碼:

    // Server.c 949-998h
    LL_FOREACH(serv->listen_list, listen_host)
    {
        //UDP
        if (listen_host->type == SW_SOCK_UDP || listen_host->type == SW_SOCK_UDP6 || listen_host->type == SW_SOCK_UNIX_DGRAM)
        {
            continue;
        }

        //TCP
        sock = swSocket_listen(listen_host->type, listen_host->host, listen_host->port, serv->backlog);
        if (sock < 0)
        {
            LL_DELETE(serv->listen_list, listen_host);
            return SW_ERR;
        }

        if (reactor!=NULL)
        {
            reactor->add(reactor, sock, SW_FD_LISTEN);
        }

#ifdef TCP_DEFER_ACCEPT
        int sockopt;
        if (serv->tcp_defer_accept > 0)
        {
            sockopt = serv->tcp_defer_accept;
            setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &sockopt, sizeof(sockopt));
        }
#endif
        listen_host->sock = sock;
        //將server socket也放置到connection_list中
        serv->connection_list[sock].fd = sock;
        serv->connection_list[sock].addr.sin_port = listen_host->port;
        //save listen_host object
        serv->connection_list[sock].object = listen_host;
    }
    //將最後一個fd作為minfd和maxfd
    if (sock >= 0)
    {
        swServer_set_minfd(serv, sock);
        swServer_set_maxfd(serv, sock);
    }

源碼解釋:
遍歷listen_list列表,對所有TCP類型的socket,調用swSocket_listen函數啟動監聽,並將socket添加到reactor中。如果設置了TCP_DEFER_ACCEPT屬性,則設置相應的socket option。最後,將監聽的socket加入swServer的connection_list,並設置swServer的minfd和maxfd為最後一個待監聽的server socket。

swServer_addTimer

該函數用於在swServer中添加一個定時器。函數原型如下:

// Server.h 443h
int swServer_addTimer(swServer *serv, int interval);
參數 說明 swServer *serv swServer對象 int interval Timer的時間間隔

函數核心源碼:

    // Server.c 263-293h
    if (serv->onTimer == NULL)
    {
        swWarn(onTimer is null. Can not use timer.);
        return SW_ERR;
    }

    //timer no init
    if (SwooleG.timer.fd == 0)
    {
        if (swTimer_create(&SwooleG.timer, interval, SwooleG.use_timer_pipe) < 0)
        {
            return SW_ERR;
        }

        if (swIsMaster())
        {
            serv->connection_list[SW_SERVER_TIMER_FD_INDEX].fd = SwooleG.timer.fd;
        }

        if (SwooleG.use_timer_pipe)
        {
            SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_TIMER, swTimer_event_handler);
            SwooleG.main_reactor->add(SwooleG.main_reactor, SwooleG.timer.fd, SW_FD_TIMER);
        }

        SwooleG.timer.onTimer = swServer_onTimer;
    }
    return swTimer_add(&SwooleG.timer, interval);

源碼解釋: 首先判定是否有onTimer回調,如果沒有則返回一個Error。隨後,如果沒有初始化timer計時器,則調用swTimer_create函數創建計時器。如果當前進程是master進程,將timer的fd添加到connection_list中。如果timer指定使用了pipe管道,則將timer的fd添加到SwooleG的main_reactor中。最後,調用swTimer_add添加timer。

swServer_tcp_send

該函數用於發送TCP數據。函數原型如下:

// Server.h 443h
int swServer_tcp_send(swServer *serv, int fd, void *data, int length);
參數 說明 swServer *serv swServer對象 int fd 發送的socket描述符 void *data 需要發送的數據 int length 數據長度

函數核心源碼:

    // Server.c 788-858h
    swSendData _send;
    swFactory *factory = &(serv->factory);

#ifndef SW_WORKER_SEND_CHUNK
    /**
     * More than the output buffer
     */
    if (length >= serv->buffer_output_size)
    {
        swWarn(More than the output buffer size[%d], please use the sendfile., serv->buffer_output_size);
        return SW_ERR;
    }
    else
    {
        _send.info.fd = fd;
        _send.info.type = SW_EVENT_TCP;
        _send.data = data;

        if (length >= SW_BUFFER_SIZE)
        {
            _send.length = length;
        }
        else
        {
            _send.info.len = length;
            _send.length = 0;
        }
        return factory->finish(factory, &_send);
    }
#else
    char buffer[SW_BUFFER_SIZE];
    int trunk_num = (length / SW_BUFFER_SIZE) + 1;
    int send_n = 0, i, ret;

    swConnection *conn = swServer_connection_get(serv, fd);
    if (conn == NULL || conn->active == 0)
    {
        swWarn(Connection[%d] has been closed., fd);
        return SW_ERR;
    }

    for (i = 0; i < trunk_num; i++)
    {
        //last chunk
        if (i == (trunk_num - 1))
        {
            send_n = length % SW_BUFFER_SIZE;
            if (send_n == 0)
                break;
        }
        else
        {
            send_n = SW_BUFFER_SIZE;
        }
        memcpy(buffer, data + SW_BUFFER_SIZE * i, send_n);
        _send.info.len = send_n;
        ret = factory->finish(factory, &_send);

#ifdef SW_WORKER_SENDTO_YIELD
        if ((i % SW_WORKER_SENDTO_YIELD) == (SW_WORKER_SENDTO_YIELD - 1))
        {
            swYield();
        }
#endif
    }
    return ret;
#endif

源碼解釋: 如果沒有定義SW_WORKER_SEND_CHUNK宏,執行如下操作: 如果數據長度大於swServer的輸出緩存大小,則報錯。否則,設置swSendData的相關屬性,調用swServer中的factory的finish函數將數據發出。 如果定義了SW_WORKER_SEND_CHUNK宏,執行如下操作: 首先根據數據長度length計算出需要多少個trunk。隨後,獲取fd對應的swConnecton,如果找不到connection或者connection已經關閉,則報錯。隨後,將數據劃分為一個個trunk的長度,放進swSendData中後調用swServer中的factory的finish函數將數據發出。

swServer_reload

該函數用於通知Manager進程重啟全部的Worker進程。函數原型如下:

// Server.h 443h
int swServer_reload(swServer *serv);
參數 說明 swServer *serv swServer對象

函數核心源碼:

    // Server.c 1009-1017h
    int manager_pid = swServer_get_manager_pid(serv);
    if (manager_pid > 0)
    {
        return kill(manager_pid, SIGUSR1);
    }
    return SW_ERR;

源碼解釋: 通過kill函數向Manager進程發送SIGUSR1信號,該信號的行為是殺死並重啟全部的Worker。

剩下的swServer的操作函數較為簡單,在此不再貼出源碼進行分析。

2.Server相關結構體分析

下面分析一些聲明在Server.h中的結構體。

swPackage

該結構體已被swEventData替代。

swPackage_task

聲明:

// Server.h 420-424h
typedef struct
{
    int length;
    char tmpfile[sizeof(SW_TASK_TMP_FILE)];
} swPackage_task;
成員 說明 int length 數據長度 char tmpfile[sizeof(SW_TASK_TMP_FILE)] 臨時文件的文件名

說明: swPackage_task用於封裝內容較大的task包(超過8K),tmpfile指向一個由mkstemp函數(如果開啟了HAVE_MKOSTEMP選項,則為mkostemp函數)創造的臨時文件,所有的數據會被暫時存放在這個文件裡。

swPackage_response

聲明:

// Server.h 426-430h
typedef struct
{
    int length;
    int worker_id;
} swPackage_response;
成員 說明 int length 數據長度 int worker_id 用於接收該應答的Worker ID

說明: swPackage_response結構體用於Factory回應Reactor,主要作用是通知Reactor響應數據的實際長度以及是由哪個worker處理的,然後會根據是否為Big Response決定是否從worker的共享內存中讀取數據。(參考ReactorThread的swReactorThread_onPipeReceive函數以及FactoryProcess的swFactoryProcess_finish函數)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved