程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> Redis源碼分析(二十二)--- networking網絡協議傳輸

Redis源碼分析(二十二)--- networking網絡協議傳輸

編輯:C++入門知識

Redis源碼分析(二十二)--- networking網絡協議傳輸


上次我只分析了Redis網絡部分的代碼一部分,今天我把networking的代碼實現部分也學習了一遍,netWorking的代碼更多偏重的是Client客戶端的操作。裡面addReply()系列的方法操作是主要的部分。光光這個系列的方法,應該占據了一半的API的數量。我把API分成了3個部分:

/* ------------ API ---------------------- */
void *dupClientReplyValue(void *o)	/* 復制value一份 */
int listMatchObjects(void *a, void *b) /* 比價2個obj是否相等 */
robj *dupLastObjectIfNeeded(list *reply) /* 返回回復列表中最後一個元素對象 */
void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 將源Client的輸出buffer復制給目標Client */
static void acceptCommonHandler(int fd, int flags) /* 網絡連接後的調用方法 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void disconnectSlaves(void) /* 使server的slave失去連接 */
void replicationHandleMasterDisconnection(void)
void flushSlavesOutputBuffers(void) /* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */
int processEventsWhileBlocked(void)

/* ------------- addReply API -----------------   */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客戶端緩沖區中添加內容 */
void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */
void _addReplySdsToList(redisClient *c, sds s) /* 在回復列表中添加Sds字符串對象 */
void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回復列表中添加字符串對象,參數中已經給定字符的長度 */
void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中寫入數據,數據存在obj->ptr的指針中 */
void addReplySds(redisClient *c, sds s) /* 在回復中添加Sds字符串,下面的額addReply()系列方法原理基本類似 */
void addReplyString(redisClient *c, char *s, size_t len)
void addReplyErrorLength(redisClient *c, char *s, size_t len)
void addReplyError(redisClient *c, char *err) /* 往Reply中添加error類的信息 */
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
void addReplyStatusLength(redisClient *c, char *s, size_t len)
void addReplyStatus(redisClient *c, char *status)
void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一個空的obj對象 */
void setDeferredMultiBulkLength(redisClient *c, void *node, long length)
void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一個double類型值,bulk的意思為大塊的,bulk reply的意思為大數據量的回復 */
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)
void addReplyLongLong(redisClient *c, long long ll)
void addReplyMultiBulkLen(redisClient *c, long length)
void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大塊的數據的長度 */
void addReplyBulk(redisClient *c, robj *obj) /* 將一個obj的數據,拆分成大塊數據的添加 */
void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)
void addReplyBulkCString(redisClient *c, char *s)
void addReplyBulkLongLong(redisClient *c, long long ll)

/* ------------- Client API -----------------   */	
redisClient *createClient(int fd) /* 創建redisClient客戶端,1.建立連接,2.設置數據庫,3.屬性設置 */
int prepareClientToWrite(redisClient *c) /* 此方法將會被調用於Client准備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */
static void freeClientArgv(redisClient *c)
void freeClient(redisClient *c) /* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */
void freeClientAsync(redisClient *c)
void freeClientsInAsyncFreeQueue(void) /* 異步的free客戶端 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 將Client中的reply數據存入文件中 */
void resetClient(redisClient *c)
int processInlineBuffer(redisClient *c) /* 處理redis Client的內鏈的buffer,就是c->querybuf */
static void setProtocolError(redisClient *c, int pos)
int processMultibulkBuffer(redisClient *c) /* 處理大塊的buffer */
void processInputBuffer(redisClient *c) /* 處理redisClient的查詢buffer */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 從Client獲取查詢query語句 */
void getClientsMaxBuffers(unsigned long *longest_output_list,
                          unsigned long *biggest_input_buffer) /* 獲取Client中輸入buffer和輸出buffer的最大長度值 */
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口號的輸出,ip:port */
int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 獲取Client客戶端的ip,port地址信息 */
char *getClientPeerId(redisClient *c) /* 獲取c->peerid客戶端的地址信息 */
sds catClientInfoString(sds s, redisClient *client) /* 格式化的輸出客戶端的屬性信息,直接返回一個拼接好的字符串 */
sds getAllClientsInfoString(void) /* 獲取所有Client客戶端的屬性信息,並連接成一個總的字符串並輸出 */
void clientCommand(redisClient *c) /* 執行客戶端的命令的作法 */
void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重寫客戶端的命令集合,舊的命令集合的應用計數減1,新的Command  Vector的命令集合增1 */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重寫Client中的第i個參數 */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 獲取Client中已經用去的輸出buffer的大小 */
int getClientType(redisClient *c)
int getClientTypeByName(char *name) /* Client中的名字的3種類型,normal,slave,pubsub */
char *getClientTypeName(int class)
int checkClientOutputBufferLimits(redisClient *c) /* 判斷Clint的輸出緩沖區的已經占用大小是否超過軟限制或是硬限制 */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */
我們從最簡單的_addReplyToBuffer在緩沖區中添加回復數據開始說起,因為後面的各種addReply的方法都或多或少的調用了和這個歌方法。

/* -----------------------------------------------------------------------------
 * Low level functions to add more data to output buffers.
 * -------------------------------------------------------------------------- */
/* 往客戶端緩沖區中添加內容 */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
     //如果當前的reply已經存在內容,則操作出錯
    if (listLength(c->reply) > 0) return REDIS_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}
最直接影響的一句話,就是memcpy(c->buf+c->bufpos,s,len);所以內容是加到c->buf中的,這也就是客戶端的輸出buffer,添加操作還有另外一種形式是添加對象類型:

/* robj添加到reply的列表中 */
void _addReplyObjectToList(redisClient *c, robj *o) {
    robj *tail;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

    if (listLength(c->reply) == 0) {
        incrRefCount(o);
        //在回復列表匯總添加robj內容
        listAddNodeTail(c->reply,o);
        c->reply_bytes += zmalloc_size_sds(o->ptr);
    } else {
        tail = listNodeValue(listLast(c->reply));

        /* Append to this object when possible. */
        if (tail->ptr != NULL &&
            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
        {
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        } else {
            incrRefCount(o);
            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }
    }
    asyncCloseClientOnOutputBufferLimitReached(c);
}
把robj對象加載reply列表中,並且改變reply的byte大小,最後還調用了一個asyncCloseClientOnOutputBufferLimitReached(c);方法,這個方法我是在這個文件的最底部找到的,一開始還真不知道什麼意思,作用就是當添加完數據後,當客戶端的輸出緩沖的大小超出限制時,會被異步關閉:

/* Asynchronously close a client if soft or hard limit is reached on the
 * output buffer size. The caller can check if the client will be closed
 * checking if the client REDIS_CLOSE_ASAP flag is set.
 *
 * Note: we need to close the client asynchronously because this function is
 * called from contexts where the client can't be freed safely, i.e. from the
 * lower level functions pushing data inside the client output buffers. */
/* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
    redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
    if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
    if (checkClientOutputBufferLimits(c)) {
        sds client = catClientInfoString(sdsempty(),c);

        freeClientAsync(c);
        redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
        sdsfree(client);
    }
}
在addReply方法調用的時候,有時是需要一個前提的,我說的是在寫數據事件發生的時候,你得先對寫的文件創建一個監聽事件:

/* 在回復中添加Sds字符串 */
void addReplySds(redisClient *c, sds s) {
	//在調用添加操作之前,都要先執行prepareClientToWrite(c),設置文件事件的寫事件
    if (prepareClientToWrite(c) != REDIS_OK) {
        /* The caller expects the sds to be free'd. */
        sdsfree(s);
        return;
    }
    if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
        sdsfree(s);
    } else {
        /* This method free's the sds when it is no longer needed. */
        _addReplySdsToList(c,s);
    }
}
在這個prepareClientToWrite()裡面是干嘛的呢?

/* This function is called every time we are going to transmit new data
 * to the client. The behavior is the following:
 *
 * If the client should receive new data (normal clients will) the function
 * returns REDIS_OK, and make sure to install the write handler in our event
 * loop so that when the socket is writable new data gets written.
 *
 * If the client should not receive new data, because it is a fake client,
 * a master, a slave not yet online, or because the setup of the write handler
 * failed, the function returns REDIS_ERR.
 *
 * Typically gets called every time a reply is built, before adding more
 * data to the clients output buffers. If the function returns REDIS_ERR no
 * data should be appended to the output buffers. */
/* 此方法將會被調用於Client准備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */
int prepareClientToWrite(redisClient *c) {
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        //在這裡創建寫的文件事件
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}
在addReply的方法裡提到了一個addReplyBulk類型方法,Bulk的中文意思為大塊的,說明addReplyBulk添加的都是一些比較大塊的數據,找一個方法看看:

/* Add a Redis Object as a bulk reply */
/* 將一個obj的數據,拆分成大塊數據的添加 */
void addReplyBulk(redisClient *c, robj *obj) {
	//reply添加長度
    addReplyBulkLen(c,obj);
    //reply添加對象
    addReply(c,obj);
    addReply(c,shared.crlf);
}
將原本一個robj的數據拆分成可3個普通的addReply的方法調用。就變成了數據量變大了的數據。大數據的回復一個比較不好的地方是到時解析的時候或者是Data的復制的時候會比較耗時。在networking的方法裡還提供了freeClient()的操作:

/* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */
void freeClient(redisClient *c) {
    listNode *ln;

    /* If this is marked as current client unset it */
    if (server.current_client == c) server.current_client = NULL;

    /* If it is our master that's beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,"Connection with master lost.");
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
        	//如果是Master客戶端,需要做緩存Client的處理,可以迅速重新啟用
            replicationCacheMaster(c);
            return;
        }
    }
...後面代碼略去了

當Client中的輸出buffer數據漸漸變多了的時候就要准備持久化到磁盤文件了,要調用下面這個方法了,

/* Helper function used by freeMemoryIfNeeded() in order to flush slave
 * output buffers without returning control to the event loop. */
/* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */
void flushSlavesOutputBuffers(void) {
    listIter li;
    listNode *ln;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = listNodeValue(ln);
        int events;

        events = aeGetFileEvents(server.el,slave->fd);
        if (events & AE_WRITABLE &&
            slave->replstate == REDIS_REPL_ONLINE &&
            listLength(slave->reply))
        {
        	//在這裡調用了write的方法
            sendReplyToClient(server.el,slave->fd,slave,0);
        }
    }
}
這個方法的核心調用又在sendReplyToClient()方法,就是把Client的reply內容和buf內容存入文件。以上就是我的理解了,代碼量有點大,的確看的我頭有點大。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved