程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> MIT 2012 分布式課程基礎源碼解析-底層通訊實現,mit2012

MIT 2012 分布式課程基礎源碼解析-底層通訊實現,mit2012

編輯:關於PHP編程

MIT 2012 分布式課程基礎源碼解析-底層通訊實現,mit2012


本節內容和前節事件管理封裝是息息相關的,本節內容主要包含的代碼在connection{.h, .cc}中。

這裡面最主要的有兩個類:connection類和tcpsconn類,connetion類主要服務於單個套接字,包括套接字上的數據讀取寫入等,而tcpsconn類則是服務於套接字集合,如接收連接,更新失效套接字等。具體我們看頭文件。

class chanmgr {
    public:
        virtual bool got_pdu(connection *c, char *b, int sz) = 0;
        virtual ~chanmgr() {}
};

我們首先看到的是這個虛基類類,這個類會以委托的形式用在connection和tcpsconn類中,它只有一個方法即got_pdu,它在RPC實現中扮演著重要角色,後面使用的時候會再次介紹它。

connection類

1 class connection : public aio_callback { 2 public: 3 //內部buffer類,主要用於接收/寫入數據的buffer 4 struct charbuf { 5 charbuf(): buf(NULL), sz(0), solong(0) {} 6 charbuf (char *b, int s) : buf(b), sz(s), solong(0){} 7 char *buf; 8 int sz; 9 int solong; //amount of bytes written or read so far 10 }; 11 //m1: chanmgr, f1: socket or file, 12 connection(chanmgr *m1, int f1, int lossytest=0); 13 ~connection(); 14 15 int channo() { return fd_; } 16 bool isdead(); 17 void closeconn(); 18 19 bool send(char *b, int sz); 20 void write_cb(int s); 21 void read_cb(int s); 22 //增加/減少引用計數 23 void incref(); 24 void decref(); 25 int ref(); 26 27 int compare(connection *another); 28 private: 29 30 bool readpdu(); 31 bool writepdu(); 32 33 chanmgr *mgr_; 34 const int fd_; 35 bool dead_; 36 37 charbuf wpdu_; //write pdu 38 charbuf rpdu_; //read pdu 39 40 struct timeval create_time_; 41 42 int waiters_; 43 int refno_; 44 const int lossy_; 45 46 pthread_mutex_t m_; 47 pthread_mutex_t ref_m_; //保護更新引用計數的安全性 48 pthread_cond_t send_complete_; 49 pthread_cond_t send_wait_; 50 }; View Code

這段代碼即是connetion類的定義,它繼承至aio_callback,在上一節說過,aio_callback在事件管理類中作為回調類,讀取或寫入數據,現在connection類就相當於一個回調類。

我們從connection的構造函數中便可以得知。

connection::connection(chanmgr *m1, int f1, int l1) 
: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
{

    int flags = fcntl(fd_, F_GETFL, NULL);
    flags |= O_NONBLOCK;  //no blocking
    fcntl(fd_, F_SETFL, flags);
    //ignore信號
    signal(SIGPIPE, SIG_IGN);
    VERIFY(pthread_mutex_init(&m_,0)==0);
    VERIFY(pthread_mutex_init(&ref_m_,0)==0);
    VERIFY(pthread_cond_init(&send_wait_,0)==0);
    VERIFY(pthread_cond_init(&send_complete_,0)==0);
 
       VERIFY(gettimeofday(&create_time_, NULL) == 0); 
       //事件管理類將本類作為回調類添加到相應的事件管理數組中
    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}

 那這個類的具體作用是啥呢?其實它就是用於在給定套接字上通信用的,對於發送數據,會發送直到數據發送完成為止,未發送完成則會將該事件添加到事件管理中,在下一輪事件循環中繼續發送,這一點我們可以從send函數中看出:

bool connection::send(char *b, int sz) { ScopedLock ml(&m_); waiters_++; //當活著,且write pdu中還有數據時等待數據清空(發送完) while (!dead_ && wpdu_.buf) { VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); } waiters_--; if (dead_) { return false; } wpdu_.buf = b; wpdu_.sz = sz; wpdu_.solong = 0; if (lossy_) { if ((random()%100) < lossy_) { jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_); shutdown(fd_,SHUT_RDWR); } } //發送失敗時 if (!writepdu()) { dead_ = true; VERIFY(pthread_mutex_unlock(&m_) == 0); PollMgr::Instance()->block_remove_fd(fd_); VERIFY(pthread_mutex_lock(&m_) == 0); }else{ if (wpdu_.solong == wpdu_.sz) { }else{ //should be rare to need to explicitly add write callback //這會繼續寫,因為這會添加本類(回調),然後調用裡面的回調函數write_cb, //就像是一個遞歸 PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); } } } //清空寫buffer bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); wpdu_.solong = wpdu_.sz = 0; wpdu_.buf = NULL; if (waiters_ > 0) pthread_cond_broadcast(&send_wait_); //喚醒上面的等待 return ret; } send

對於讀取數據,則當rpdu_(read buffer)未滿時繼續讀,讀取完成後就是用chanmgr類的got_pdu處理讀取後的數據。

注意發送數據/接收數據都會首先發送數據大小/接收數據大小,然後再做後續發送數據/接收數據的工作。

除了connection類的發送/接收數據的功能外,我們還看到一個私有變量refno_變量,該變量的作用是用於引用計數,引用計數是一種很常見的編程技巧,例如在python中,引用計數用於對象的管理,當引用計數為0時,對象便會銷毀,這裡的引用計數也是也是同樣的道理,這一點可以從decref函數中得知

void
connection::decref()
{
    VERIFY(pthread_mutex_lock(&ref_m_)==0);
    refno_ --;
    VERIFY(refno_>=0);
    //當引用計數為0時,銷毀對象
    if (refno_==0) {
        VERIFY(pthread_mutex_lock(&m_)==0);
        if (dead_) {
            VERIFY(pthread_mutex_unlock(&ref_m_)==0);
            VERIFY(pthread_mutex_unlock(&m_)==0);
            delete this;
            return;
        }
        VERIFY(pthread_mutex_unlock(&m_)==0);
    }
    pthread_mutex_unlock(&ref_m_);
}

tcpscon類:

這個類則是用於管理connection的,我們先看它的定義

/**
 *  管理客戶連接,將連接放入一個map中map<int, connection*>
 *
 */
class tcpsconn {
    public:
        tcpsconn(chanmgr *m1, int port, int lossytest=0);
        ~tcpsconn();

        void accept_conn();
    private:

        pthread_mutex_t m_;
        pthread_t th_;
        int pipe_[2];

        int tcp_; //file desciptor for accepting connection
        chanmgr *mgr_;
        int lossy_;
        std::map<int, connection *> conns_;

        void process_accept();
};

可看到裡面定義了一個map,該map的key其實是connection類指針對應的套接字,我們看構造函數實現

tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) : mgr_(m1), lossy_(lossytest) { VERIFY(pthread_mutex_init(&m_,NULL) == 0); struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); if(tcp_ < 0){ perror("tcpsconn::tcpsconn accept_loop socket:"); VERIFY(0); } int yes = 1; //設置TCP參數, reuseaddr, nodelay setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ perror("accept_loop tcp bind:"); VERIFY(0); } if(listen(tcp_, 1000) < 0) { perror("tcpsconn::tcpsconn listen:"); VERIFY(0); } jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port, sin.sin_port); if (pipe(pipe_) < 0) { perror("accept_loop pipe:"); VERIFY(0); } int flags = fcntl(pipe_[0], F_GETFL, NULL); flags |= O_NONBLOCK; fcntl(pipe_[0], F_SETFL, flags); //無阻塞管道 VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); } View Code

該構造函數主要是初始化服務器端連接,然後創建一個線程來等待客戶端的連接,後面處理客戶端連接時,會將連接的客戶端套接字添加到conns_的map中,即創建套接字到connection指針的對應關系,然後遍歷conns_,清除死亡的connection,從而達到及時處理死亡連接的效果。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved