程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> memcached學習筆記——存儲命令源碼分析上篇,memcached學習筆記

memcached學習筆記——存儲命令源碼分析上篇,memcached學習筆記

編輯:C++入門知識

memcached學習筆記——存儲命令源碼分析上篇,memcached學習筆記


原創文章,轉載請標明,謝謝。

上一篇分析過memcached的連接模型,了解memcached是如何高效處理客戶端連接,這一篇分析memcached源碼中的process_update_command函數,探究memcached客戶端的set命令,解讀memcached是如何解析客戶端文本命令,剖析memcached的內存管理,LRU算法是如何工作等等。

解析客戶端文本命令

客戶端向memcached server發出set操作,memcached server讀取客戶端的命令,客戶端的連接狀態由 conn_read > conn_parse_cmd 轉換,這時候,memcached server開始解析命令。memcached server調用try_read_command函數解析命令,memcached接收兩種格式的命令,一種是二進制格式,另一種是文本格式(本文只講文本格式的命令)。

 1 static int try_read_command(conn *c) {
 2 
 3     // ..........
 4     
 5     if (c->protocol == binary_prot) {
 6 
 7         // 二進制格式
 8         // ....
 9         
10     } else {
11         char *el, *cont;
12 
13         // 沒有接收到客戶端的命令,返回進入conn_waiting狀態,等待更多的客戶端數據
14         if (c->rbytes == 0)
15             return 0;
16 
17         el = memchr(c->rcurr, '\n', c->rbytes);
18         if (!el) {
19             if (c->rbytes > 1024) {
20                 /*
21                  * We didn't have a '\n' in the first k. This _has_ to be a
22                  * large multiget, if not we should just nuke the connection.
23                  */
24                 char *ptr = c->rcurr;
25                 while (*ptr == ' ') { /* ignore leading whitespaces */
26                     ++ptr;
27                 }
28 
29                 if (ptr - c->rcurr > 100 ||
30                     (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
31 
32                     conn_set_state(c, conn_closing);
33                     return 1;
34                 }
35             }
36 
37             return 0;
38         }
39         
40         // 客戶端報文以'\r\n'結尾
41         cont = el + 1;
42         if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
43             el--;
44         }
45         *el = '\0';
46 
47         assert(cont <= (c->rcurr + c->rbytes));
48 
49         // 真正解析命令的地方
50         process_command(c, c->rcurr);
51 
52         c->rbytes -= (cont - c->rcurr);
53         c->rcurr = cont;
54 
55         assert(c->rcurr <= (c->rbuf + c->rsize));
56     }
57 
58     return 1;
59 }

在分析process_command函數前,我們先看看memcached的命令格式:

 1     <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n  
 2       
 3     cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n 
 4     
 5     // 例如 set 命令 :    
 6     set key 0 60 2  
 7     12  
 8     STORED
 9     
10     // 空格對應著空格
11     set => <command name>
12     key => <key>
13     0   => <flags>
14     60  => <exptime>
15     2   => <bytes>

memcached在process_command中調用tokenize_command函數根據上面的命令格式處理命令,把相應位置的字段保存在 token_t *tokens 的相應位置。

 1 // 參數1:命令的字符串
 2 // 參數2:解析命令後,存放命令各個字段的結構體數組
 3 // 參數3:命令字段的最大數量
 4 /*
 5 *    tokens[0] => <command name> 的信息
 6 *    tokens[1] => <key> 的信息
 7 *    tokens[2] => <flags> 的信息
 8 */
 9 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
10     char *s, *e;
11     size_t ntokens = 0;
12     size_t len = strlen(command);
13     unsigned int i = 0;
14 
15     assert(command != NULL && tokens != NULL && max_tokens > 1);
16 
17     s = e = command;
18     for (i = 0; i < len; i++) {
19         if (*e == ' ') {
20             if (s != e) {
21                 tokens[ntokens].value = s;  // value存放各個字段的字符串值,例如:'set'
22                 tokens[ntokens].length = e - s; // length表示各個字段相應的長度,例如:'set'的長度為3
23                 ntokens++;
24                 *e = '\0';
25                 if (ntokens == max_tokens - 1) {
26                     e++;
27                     s = e; /* so we don't add an extra token */
28                     break;
29                 }
30             }
31             s = e + 1;
32         }
33         e++;
34     }
35 
36     if (s != e) {
37         tokens[ntokens].value = s;
38         tokens[ntokens].length = e - s;
39         ntokens++;
40     }
41 
42     /*
43      * If we scanned the whole string, the terminal value pointer is null,
44      * otherwise it is the first unprocessed character.
45      */
46     tokens[ntokens].value =  *e == '\0' ? NULL : e;
47     tokens[ntokens].length = 0;
48     ntokens++;
49 
50     return ntokens;
51 }

解析完文本命令後,回到process_command函數中,我們可以看到很熟悉的命令,是的,接下來,在一個if-else的多分支判斷中,memcached根據tokens[COMMAND_TOKEN].value決定調用那一個函數處理相應的命令:

 1 static void process_command(conn *c, char *command) {
 2     
 3     // ....
 4     
 5     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
 6     if (ntokens >= 3 &&
 7         ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
 8          (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
 9         
10         // 這裡就是執行get命令的分支
11         process_get_command(c, tokens, ntokens, false);
12 
13     } else if ((ntokens == 6 || ntokens == 7) &&
14                ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
15                 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
16                 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
17                 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
18                 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
19 
20         // 這裡就是執行set、add、replace等命令的分支
21         process_update_command(c, tokens, ntokens, comm, false);
22 
23     } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
24 
25         // 這裡也執行process_update_command函數,也是對相應的key執行寫操作,與上面一個分支不同的是最後一個參數是true,意思是寫的過程使用CAS協議,這裡不側重講,
26         // CAS目的是保證在並發寫的時候保證一致性
27         process_update_command(c, tokens, ntokens, comm, true);
28 
29     } else if .............    

memcached存儲命令分析

memcached把內存分割成各種尺寸的塊(chunk),並把尺寸相同的塊分成組(chunk的集合),每個chunk集合被稱為slab。Memcached的內存分配以Page為單位,Page默認值為1M,可以在啟動時通過-I參數來指定。Slab是由多個Page組成的,Page按照指定大小切割成多個chunk。

每一對[key,value]的數據被封裝到item的結構體裡,每種類型的slab用一個item鏈表來維護它的所有item。例如,一個item項的數據大小加上item的頭部信息(為了方便描述,下面把這兩項的和統稱[key,value]大小吧)是90KB,slab[i]的chunk塊大小是136KB,slab[i-1]的chunk塊大小是88KB,那麼item會被分配slab[i]的一個chunk塊(並保存到slab[i]維護的一個item鏈表),這樣做的目的是為了盡量減少內存碎片。更多關於Slab Allocation的原理可以查找其他的資料。這裡不詳解

memcached的存儲命令:add、set、replace、append、prepend等,上面簡單地說了memcached slab機制,知道memcached是根據相應的[key,value]大小找到相應的slab,那麼,我們再次調用set命令某個已存在的key的value的時候,memcached是怎麼工作的呢?

起初,我的直覺思維是,找到key相應的item,修改item的value就好了。那麼,問題來了,假如先前[key,value]大小是90KB,被分配到slab[i]的,現在我們修改了key對應的value,[key,value]大小也改變了,變成了80KB,應該分配到slab[i-1]的,如果只是修改原來item的數據,那麼就不符合Slab Allocation的原理,會造成很大的內存碎片浪費。

memcached對存儲命令:add、set、replace、append、prepend處理方法大體都相似的,從上面的源碼可以看出,都是通過執行process_update_command函數來處理。

memcached的處理存儲命令思路是這樣的:例如,客戶端的一個set命令,memcached都會重新根據[key,value]大小找到合適slab,並把相應的數據封裝到新的item裡面【源碼的注1】(不會直接修改舊的item項),如果對應的slab沒有內存空間不足,就調用LRU算法把該slab的一個最近最少使用項的空間分配給新的item【源碼的注2,出現在do_item_alloc函數】(如果關閉LRU移除項的功能,那麼就會報“SERVER_ERROR out of memory storing object”錯誤,是set命令的話,還會把key對應的舊的item項移除【源碼的注3】,即我們這時候不能通過get key來獲取到舊的數據了),分配空間成功,那就是對add、set、replace、append、prepend這幾個存儲命令做差異化處理。

 1 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
 2     
 3     // ....
 4 
 5     set_noreply_maybe(c, tokens, ntokens);  // 設置命令可選字段的[noreply]
 6 
 7     if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
 8         out_string(c, "CLIENT_ERROR bad command line format");
 9         return;
10     }
11 
12     key = tokens[KEY_TOKEN].value;
13     nkey = tokens[KEY_TOKEN].length;
14 
15     // 把命令相應字段的字符串安全轉換成整數
16     if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
17            && safe_strtol(tokens[3].value, &exptime_int)
18            && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
19         out_string(c, "CLIENT_ERROR bad command line format");
20         return;
21     }
22 
23     exptime = exptime_int;
24 
25     // #define REALTIME_MAXDELTA 60*60*24*30
26     if (exptime < 0)
27         exptime = REALTIME_MAXDELTA + 1;
28 
29     // CAS協議,防止並發寫不一致
30     if (handle_cas) {
31         if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
32             out_string(c, "CLIENT_ERROR bad command line format");
33             return;
34         }
35     }
36 
37     // ........
38 
39     // 注1:無論是add、set或者是replace命令,都會從新分配一個新的item
40     it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
41 
42     // 如果新的item分配失敗
43     if (it == 0) {
44         if (! item_size_ok(nkey, flags, vlen))
45             out_string(c, "SERVER_ERROR object too large for cache");        // 一種錯誤情況:數據太大,沒有合適slab,不能緩存數據
46         else
47             out_string(c, "SERVER_ERROR out of memory storing object");      // 另一種是:沒有了內存空間緩存數據,通常這種事在關閉LRU功能的情況下出現
48         /* swallow the data line */
49         c->write_and_go = conn_swallow;
50         c->sbytes = vlen;
51 
52         // 注3:新的item分配失敗,如果是set命令,並且key對應著存在舊的item,那麼就把舊的item刪除
53         if (comm == NREAD_SET) {
54             it = item_get(key, nkey);
55             if (it) {
56                 item_unlink(it);
57                 item_remove(it);
58             }
59         }
60 
61         return;
62     }
63     ITEM_set_cas(it, req_cas_id);
64 
65     c->item = it;
66     c->ritem = ITEM_data(it);
67     c->rlbytes = it->nbytes;
68     c->cmd = comm;
69     
70     // 會在這一步進行add、set、replace等存儲命令的差異化處理
71     conn_set_state(c, conn_nread);
72 }

do_item_alloc函數:

  1 item *do_item_alloc(char *key, const size_t nkey, const int flags,
  2                     const rel_time_t exptime, const int nbytes,
  3                     const uint32_t cur_hv) {
  4     uint8_t nsuffix;
  5     item *it = NULL;
  6     char suffix[40];
  7     size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);   //接收到的item數據長度+item頭部長度
  8     if (settings.use_cas) {
  9         ntotal += sizeof(uint64_t);
 10     }
 11 
 12     unsigned int id = slabs_clsid(ntotal);
 13     if (id == 0)
 14         return 0;
 15 
 16     mutex_lock(&cache_lock);
 17     /* do a quick check if we have any expired items in the tail.. */
 18     int tries = 5;
 19     int tried_alloc = 0;
 20     item *search;
 21     void *hold_lock = NULL;
 22     rel_time_t oldest_live = settings.oldest_live;
 23 
 24     search = tails[id];
 25     
 26     // tries = 5 ,循環查找過期的item,最多循環5次
 27     for (; tries > 0 && search != NULL; tries--, search=search->prev) {
 28         uint32_t hv = hash(ITEM_key(search), search->nkey, 0);
 29         
 30         // 如果當前item被上鎖,那麼就跳過 
 31         if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL)
 32             continue;
 33         /* Now see if the item is refcount locked */
 34         if (refcount_incr(&search->refcount) != 2) {
 35             refcount_decr(&search->refcount);
 36             /* Old rare bug could cause a refcount leak. We haven't seen
 37              * it in years, but we leave this code in to prevent failures
 38              * just in case */
 39             if (search->time + TAIL_REPAIR_TIME < current_time) {
 40                 itemstats[id].tailrepairs++;
 41                 search->refcount = 1;
 42                 do_item_unlink_nolock(search, hv);
 43             }
 44             if (hold_lock)
 45                 item_trylock_unlock(hold_lock);
 46             continue;
 47         }
 48 
 49         // item過期,如果沒有設置過期時間,那麼就使用系統設置的默認過期時間
 50         if ((search->exptime != 0 && search->exptime < current_time) 
 51             || (search->time <= oldest_live && oldest_live <= current_time)) {
 52             itemstats[id].reclaimed++;
 53             if ((search->it_flags & ITEM_FETCHED) == 0) {
 54                 itemstats[id].expired_unfetched++;
 55             }
 56             it = search;
 57             slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);   // 當前搜索的item過期,重新計算slab已經分配的字節
 58             do_item_unlink_nolock(it, hv);   // 把當前搜索的item從鏈表中移除
 59             /* Initialize the item block: */
 60             it->slabs_clsid = 0;
 61         } else if ((it = slabs_alloc(ntotal, id)) == NULL) {  // 沒有找到過期的item,新分配一個item,分配失敗就執行else if裡面的代碼
 62             tried_alloc = 1;
 63             if (settings.evict_to_free == 0) {  // 注2:內存耗盡,如果evict_to_free = 1(默認)LRU算法啟動,移除最近最少使用的item
 64                 itemstats[id].outofmemory++;
 65             } else {
 66                 itemstats[id].evicted++;
 67                 itemstats[id].evicted_time = current_time - search->time;
 68                 if (search->exptime != 0)
 69                     itemstats[id].evicted_nonzero++;
 70                 if ((search->it_flags & ITEM_FETCHED) == 0) {
 71                     itemstats[id].evicted_unfetched++;
 72                 }
 73                 it = search;
 74                 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);   // 當前搜索的item過期,重新計算slab已經分配的字節
 75                 do_item_unlink_nolock(it, hv);  // 把當前需要移除的item從鏈表中移除
 76                 /* Initialize the item block: */
 77                 it->slabs_clsid = 0;
 78 
 79                 if (settings.slab_automove == 2)
 80                     slabs_reassign(-1, id);
 81             }
 82         }
 83 
 84         refcount_decr(&search->refcount);
 85         /* If hash values were equal, we don't grab a second lock */
 86         if (hold_lock)
 87             item_trylock_unlock(hold_lock);
 88         break;
 89     }
 90 
 91     if (!tried_alloc && (tries == 0 || search == NULL))
 92         it = slabs_alloc(ntotal, id);
 93 
 94     if (it == NULL) {
 95         itemstats[id].outofmemory++;
 96         mutex_unlock(&cache_lock);
 97         return NULL;
 98     }
 99 
100     assert(it->slabs_clsid == 0);
101     assert(it != heads[id]);
102 
103 
104     it->refcount = 1;     
105     mutex_unlock(&cache_lock);
106     it->next = it->prev = it->h_next = 0;
107     it->slabs_clsid = id;
108 
109     DEBUG_REFCNT(it, '*');
110     it->it_flags = settings.use_cas ? ITEM_CAS : 0;
111     it->nkey = nkey;
112     it->nbytes = nbytes;
113     memcpy(ITEM_key(it), key, nkey);
114     it->exptime = exptime;
115     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
116     it->nsuffix = nsuffix;
117     return it;
118 }

以上memcached只是為[key,value]找到了新的slab,分配了新的item,並把命令相關的頭部信息保存到,但是,還有一個重要的步奏沒有說的,那就是[key,value]中的value怎麼和item關聯起來的,add和set的區別又是怎樣區分的,由於還有很長的一段代碼,所以我還是分篇記錄,預告一下,下一篇《memcached學習筆記——存儲命令源碼分析下》會講遺留的這兩個問題。

未完,待續。

 更多閱讀查看:JC&hcoding

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved