程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> memcached學習筆記——連接模型,memcached學習筆記

memcached學習筆記——連接模型,memcached學習筆記

編輯:關於C語言

memcached學習筆記——連接模型,memcached學習筆記


  memcached是什麼呢?memcached是一個優秀的、高性能的內存緩存工具。

  memcached具有以下的特點:

  • 協議簡單:memcached的服務器客戶端通信並不使用復雜的MXL等格式,而是使用簡單的基於文本的協議。
  • 基於libevent的事件處理:libevent是個程序庫,他將Linux 的epoll、BSD類操作系統的kqueue等時間處理功能封裝成統一的接口。memcached使用這個libevent庫,因此能在Linux、BSD、Solaris等操作系統上發揮其高性能。(libevent是什麼)
  • 內置內存存儲方式:為了提高性能,memcached中保存的數據都存儲在memcached內置的內存存儲空間中。由於數據僅存在於內存中,因此重啟memcached,重啟操作系統會導致全部數據消失。另外,內容容量達到指定的值之後memcached回自動刪除不適用的緩存。
  • Memcached不互通信的分布式:memcached盡管是“分布式”緩存服務器,但服務器端並沒有分布式功能。各個memcached不會互相通信以共享信息。他的分布式主要是通過客戶端實現的。

  本文主要講解memcached的連接模型,memcached由一條主線程(連接線程)監聽連接,然後把成功的連接交給子線程(工作線程)處理讀寫操作。N條【啟動memcached通過-t命令指定】子線程(工作線程)負責讀寫數據,一條子線程(工作線程)維護著多個連接。一個conn結構體對象對應著一個連接,主線程(連接線程)成功連接後,會把連接的內容賦值到一個conn結構體對象,並把這個conn結構體對象傳遞給一條子線程(工作線程)處理。

 

conn結構體:

1 typedef struct conn conn; 2 struct conn { 3 int sfd; 4 sasl_conn_t *sasl_conn; 5 6 // 連接狀態 7 enum conn_states state; 8 enum bin_substates substate; 9 struct event event; 10 short ev_flags; 11 12 // 剛剛出發的事件 13 short which; /** which events were just triggered */ 14 15 // read buffer 16 char *rbuf; /** buffer to read commands into */ 17 18 // 已經解析了一部分的命令, 指向已經解析結束的地方 19 char *rcurr; /** but if we parsed some already, this is where we stopped */ 20 21 // rbuf 已分配的大小 22 int rsize; /** total allocated size of rbuf */ 23 24 // 尚未解析的命令大小 25 int rbytes; /** how much data, starting from rcur, do we have unparsed */ 26 27 // buffer to write 28 char *wbuf; 29 30 // 指向已經返回的地方 31 char *wcurr; 32 33 // 寫大小 34 int wsize; 35 36 // 尚未寫的數據大小 37 int wbytes; 38 39 /** which state to go into after finishing current write */ 40 // 當寫回結束後需要即刻轉變的狀態 41 enum conn_states write_and_go; 42 43 void *write_and_free; /** free this memory after finishing writing */ 44 45 char *ritem; /** when we read in an item's value, it goes here */ 46 int rlbytes; 47 48 /* data for the nread state */ 49 50 /** 51 * item is used to hold an item structure created after reading the command 52 * line of set/add/replace commands, but before we finished reading the actual 53 * data. The data is read into ITEM_data(item) to avoid extra copying. 54 */ 55 56 // 指向當下需要完成的任務 57 void *item; /* for commands set/add/replace */ 58 59 /* data for the swallow state */ 60 int sbytes; /* how many bytes to swallow */ 61 62 /* data for the mwrite state */ 63 struct iovec *iov; 64 int iovsize; /* number of elements allocated in iov[] */ 65 int iovused; /* number of elements used in iov[] */ 66 67 // msghdr 鏈表, 一個連接可能有多個 msghdr 68 // 如果是 UDP, 需要為每一個 msghdr 填寫一個 UDP 頭部 69 struct msghdr *msglist; 70 int msgsize; /* number of elements allocated in msglist[] */ 71 int msgused; /* number of elements used in msglist[] */ 72 int msgcurr; /* element in msglist[] being transmitted now */ 73 int msgbytes; /* number of bytes in current msg */ 74 75 item **ilist; /* list of items to write out */ 76 int isize; 77 item **icurr; 78 79 // 記錄任務數量 80 int ileft; 81 82 char **suffixlist; 83 int suffixsize; 84 char **suffixcurr; 85 int suffixleft; 86 87 enum protocol protocol; /* which protocol this connection speaks */ 88 enum network_transport transport; /* what transport is used by this connection */ 89 90 /* data for UDP clients */ 91 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ 92 struct sockaddr request_addr; /* Who sent the most recent request */ 93 socklen_t request_addr_size; 94 95 unsigned char *hdrbuf; /* udp packet headers */ 96 int hdrsize; /* number of headers' worth of space is allocated */ 97 98 bool noreply; /* True if the reply should not be sent. */ 99 /* current stats command */ 100 struct { 101 char *buffer; 102 size_t size; 103 size_t offset; 104 } stats; 105 106 /* Binary protocol stuff */ 107 /* This is where the binary header goes */ 108 protocol_binary_request_header binary_header; 109 uint64_t cas; /* the cas to return */ 110 short cmd; /* current command being processed */ 111 112 // ? 不透明 113 int opaque; 114 int keylen; 115 116 // 可見是一個鏈表 117 conn *next; /* Used for generating a list of conn structures */ 118 119 // 指向服務於此連接的線程 120 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ 121 }; View Code
  1 //memcached.c
  2 int main{
  3 
  4     // ......
  5 
  6     // 第一步:初始化主線程的事件機制
  7     /* initialize main thread libevent instance */
  8     // libevent 事件機制初始化
  9     main_base = event_init();
 10 
 11     // ......
 12 
 13     // 第二步:初始化 N 個 (初始值200,當連接超過200個的時候會往上遞增) conn結構體對象
 14     // 空閒連接數組初始化
 15     conn_init();
 16 
 17     // ......
 18 
 19     
 20     // 第三步:啟動工作線程
 21     /* start up worker threads if MT mode */
 22     thread_init(settings.num_threads, main_base);
 23     
 24     // ......
 25     
 26     // 第四步:初始化socket,綁定監聽端口,為主線程的事件機制設置連接監聽事件(event_set、event_add)
 27     /**
 28         memcached 有可配置的兩種模式: unix 域套接字和 TCP/UDP, 允許客戶端以兩種方式向 memcached 發起請求. 客戶端和服務器在同一個主機上的情況下可以用 unix 域套接字, 否則可以采用 TCP/UDP 的模式. 兩種模式是不兼容的.
 29         以下的代碼便是根據 settings.socketpath 的值來決定啟用哪種方式.
 30     */
 31     /**
 32         第一種, unix 域套接字.
 33     */
 34     /* create unix mode sockets after dropping privileges */
 35     if (settings.socketpath != NULL) {
 36         errno = 0;
 37         if (server_socket_unix(settings.socketpath,settings.access)) {
 38             vperror("failed to listen on UNIX socket: %s", settings.socketpath);
 39             exit(EX_OSERR);
 40         }
 41     }
 42 
 43     /**
 44         第二種, TCP/UDP.
 45     */
 46     /* create the listening socket, bind it, and init */
 47     if (settings.socketpath == NULL) {
 48         const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
 49         char temp_portnumber_filename[PATH_MAX];
 50         FILE *portnumber_file = NULL;
 51 
 52         // 讀取端口號文件
 53         if (portnumber_filename != NULL) {
 54             snprintf(temp_portnumber_filename,
 55                      sizeof(temp_portnumber_filename),
 56                      "%s.lck", portnumber_filename);
 57 
 58             portnumber_file = fopen(temp_portnumber_filename, "a");
 59             if (portnumber_file == NULL) {
 60                 fprintf(stderr, "Failed to open \"%s\": %s\n",
 61                         temp_portnumber_filename, strerror(errno));
 62             }
 63         }
 64 
 65         // TCP
 66         errno = 0;
 67         if (settings.port && server_sockets(settings.port, tcp_transport,
 68                                            portnumber_file)) {
 69             vperror("failed to listen on TCP port %d", settings.port);
 70             exit(EX_OSERR);
 71         }
 72 
 73         /*
 74          * initialization order: first create the listening sockets
 75          * (may need root on low ports), then drop root if needed,
 76          * then daemonise if needed, then init libevent (in some cases
 77          * descriptors created by libevent wouldn't survive forking).
 78          */
 79 
 80         // UDP
 81         /* create the UDP listening socket and bind it */
 82         errno = 0;
 83         if (settings.udpport && server_sockets(settings.udpport, udp_transport,
 84                                               portnumber_file)) {
 85             vperror("failed to listen on UDP port %d", settings.udpport);
 86             exit(EX_OSERR);
 87         }
 88 
 89         if (portnumber_file) {
 90             fclose(portnumber_file);
 91             rename(temp_portnumber_filename, portnumber_filename);
 92         }
 93     }
 94 
 95     // ......
 96     
 97     
 98     // 第五步:主線程進入事件循環
 99     /* enter the event loop */
100     // 進入事件循環
101     if (event_base_loop(main_base, 0) != 0) {
102         retval = EXIT_FAILURE;
103     }
104 
105     // ......
106 
107 }

  LIBEVENT_THREAD 結構體:

1 // 多個線程, 每個線程一個 event_base 2 typedef struct { 3 pthread_t thread_id; /* unique ID of this thread */ 4 struct event_base *base; /* libevent handle this thread uses */ 5 6 // event 結構體, 用於管道讀寫事件的監聽 7 struct event notify_event; /* listen event for notify pipe */ 8 9 // 讀寫管道文件描述符 10 int notify_receive_fd; /* receiving end of notify pipe */ 11 int notify_send_fd; /* sending end of notify pipe */ 12 13 // 線程的狀態 14 struct thread_stats stats; /* Stats generated by this thread */ 15 16 // 這個線程需要處理的連接隊列 17 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ 18 cache_t *suffix_cache; /* suffix cache */ 19 uint8_t item_lock_type; /* use fine-grained or global item lock */ 20 } LIBEVENT_THREAD; View Code

  第三步工作線程的詳細啟動過程:

  1 /*
  2  * thread.c
  3  *
  4  * 初始化線程子系統, 創建工作線程
  5  * Initializes the thread subsystem, creating various worker threads.
  6  *
  7  * nthreads  Number of worker event handler threads to spawn
  8  *   需准備的線程數
  9  * main_base Event base for main thread
 10  *   分發線程
 11  */
 12 void thread_init(int nthreads, struct event_base *main_base) {
 13     int         i;
 14     int         power;
 15 
 16     // 互斥量初始化
 17     pthread_mutex_init(&cache_lock, NULL);
 18     pthread_mutex_init(&stats_lock, NULL);
 19 
 20     pthread_mutex_init(&init_lock, NULL);
 21     //條件同步
 22     pthread_cond_init(&init_cond, NULL);
 23 
 24     pthread_mutex_init(&cqi_freelist_lock, NULL);
 25     cqi_freelist = NULL;
 26 
 27     /* Want a wide lock table, but don't waste memory */
 28     if (nthreads < 3) {
 29         power = 10;
 30     } else if (nthreads < 4) {
 31         power = 11;
 32     } else if (nthreads < 5) {
 33         power = 12;
 34     } else {
 35         // 2^13
 36         /* 8192 buckets, and central locks don't scale much past 5 threads */
 37         power = 13;
 38     }
 39 
 40     // hashsize = 2^n
 41     item_lock_count = hashsize(power);
 42 
 43     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
 44     if (! item_locks) {
 45         perror("Can't allocate item locks");
 46         exit(1);
 47     }
 48     // 初始化
 49     for (i = 0; i < item_lock_count; i++) {
 50         pthread_mutex_init(&item_locks[i], NULL);
 51     }
 52     //item_lock_type_key設置為線程的私有變量的key
 53     pthread_key_create(&item_lock_type_key, NULL);
 54     pthread_mutex_init(&item_global_lock, NULL);
 55 
 56 
 57     // LIBEVENT_THREAD 是結合 libevent 使用的結構體, event_base, 讀寫管道
 58     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
 59     if (! threads) {
 60         perror("Can't allocate thread descriptors");
 61         exit(1);
 62     }
 63 
 64     // main_base 是分發任務的線程, 即主線程
 65     dispatcher_thread.base = main_base;
 66     dispatcher_thread.thread_id = pthread_self();
 67 
 68     // 管道, libevent 通知用的
 69     // 一個 LIBEVENT_THREAD 結構體對象對應由一條子線程維護
 70     // 子線程通過讀管道來接收主線程的命令(例如主線程接收到新連接,會往子線程的讀管道寫入字符'c',子線程接收到命令就會做出相應的處理)
 71     for (i = 0; i < nthreads; i++) {
 72         int fds[2];
 73         if (pipe(fds)) {
 74             perror("Can't create notify pipe");
 75             exit(1);
 76         }
 77 
 78         // 讀管道
 79         threads[i].notify_receive_fd = fds[0];
 80         // 寫管道
 81         threads[i].notify_send_fd = fds[1];
 82 
 83         // 初始化線程信息數據結構, 其中就將 event 結構體的回調函數設置為 thread_libevent_process(),此時線程還沒有創建
 84         setup_thread(&threads[i]);
 85         /* Reserve three fds for the libevent base, and two for the pipe */
 86         stats.reserved_fds += 5;
 87     }
 88 
 89     /* Create threads after we've done all the libevent setup. */
 90     // 創建並初始化線程, 線程的代碼都是 work_libevent()
 91     for (i = 0; i < nthreads; i++) {
 92         // 調用 pthread_attr_init() 和 pthread_create() 來創建子線程
 93         // 子線程的函數入口 worker_libevent ,負責啟動子線程的事件循環
 94         create_worker(worker_libevent, &threads[i]);
 95     }
 96 
 97     /* Wait for all the threads to set themselves up before returning. */
 98     pthread_mutex_lock(&init_lock);
 99     // wait_for_thread_registration() 是 pthread_cond_wait 的調用
100     wait_for_thread_registration(nthreads);
101     pthread_mutex_unlock(&init_lock);
102 }
103 
104 
105 
106 
107 /*
108  * Set up a thread's information.
109  */
110  // 填充 LIBEVENT_THREAD 結構體, 其中包括:
111  //     填充 struct event
112  //     初始化線程工作隊列
113  //     初始化互斥量
114  //     等
115 static void setup_thread(LIBEVENT_THREAD *me) {
116     // 子線程的事件機制,每條子線程都有一個事件機制
117     me->base = event_init();
118     if (! me->base) {
119         fprintf(stderr, "Can't allocate event base\n");
120         exit(1);
121     }
122 
123     /* Listen for notifications from other threads */
124     // 在線程數據結構初始化的時候, 為 me->notify_receive_fd 讀管道注冊讀事件, 回調函數是 thread_libevent_process()
125     // 為子線程的事件機制添加事件
126     event_set(&me->notify_event, me->notify_receive_fd,
127               EV_READ | EV_PERSIST, thread_libevent_process, me);
128     event_base_set(me->base, &me->notify_event);
129 
130     if (event_add(&me->notify_event, 0) == -1) {
131         fprintf(stderr, "Can't monitor libevent notify pipe\n");
132         exit(1);
133     }
134     
135     // ......
136 }
137 
138 
139 
140 /*
141  * Worker thread: main event loop
142  * 線程函數入口, 啟動事件循環
143  */
144 static void *worker_libevent(void *arg) {
145     LIBEVENT_THREAD *me = arg;
146 
147     // ......
148     
149     // 進入事件循環
150     event_base_loop(me->base, 0);
151     return NULL;
152 }

  子線程讀管道回調函數:

1 /* 2 * Processes an incoming "handle a new connection" item. This is called when 3 * input arrives on the libevent wakeup pipe. 4 * 5 * 當管道有數據可讀的時候會觸發此函數的調用 6 */ 7 static void thread_libevent_process(int fd, short which, void *arg) { 8 LIBEVENT_THREAD *me = arg; 9 CQ_ITEM *item; 10 char buf[1]; 11 12 if (read(fd, buf, 1) != 1) 13 if (settings.verbose > 0) 14 fprintf(stderr, "Can't read from libevent pipe\n"); 15 16 switch (buf[0]) { 17 case 'c': 18 // 表示主線程把一個新的連接分發給該子線程處理 19 // 取出一個任務 20 item = cq_pop(me->new_conn_queue); 21 22 if (NULL != item) { 23 // 為新的請求建立一個連接結構體. 連接其實已經建立, 這裡只是為了填充連接結構體. 最關鍵的動作是在 libevent 中注冊了事件, 回調函數是 event_handler() 24 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 25 item->read_buffer_size, item->transport, me->base); 26 if (c == NULL) { 27 if (IS_UDP(item->transport)) { 28 fprintf(stderr, "Can't listen for events on UDP socket\n"); 29 exit(1); 30 } else { 31 if (settings.verbose > 0) { 32 fprintf(stderr, "Can't listen for events on fd %d\n", 33 item->sfd); 34 } 35 close(item->sfd); 36 } 37 } else { 38 c->thread = me; 39 } 40 cqi_free(item); 41 } 42 break; 43 44 /* we were told to flip the lock type and report in */ 45 case 'l': 46 me->item_lock_type = ITEM_LOCK_GRANULAR; 47 register_thread_initialized(); 48 break; 49 50 case 'g': 51 me->item_lock_type = ITEM_LOCK_GLOBAL; 52 register_thread_initialized(); 53 break; 54 } 55 } View Code

 

  第四步主要是初始化socket、綁定服務器端口和IP、為主線程事件機制添加監聽連接事件:

  1 // memcached.c
  2 // server_sockets()->server_socket()
  3 
  4 static int server_socket(const char *interface,
  5                          int port,
  6                          enum network_transport transport,
  7                          FILE *portnumber_file) {
  8                          
  9     // ......
 10 
 11     // getaddrinfo函數能夠處理名字到地址以及服務到端口這兩種轉換,返回的是一個addrinfo的結構(列表)指針而不是一個地址清單。
 12     error= getaddrinfo(interface, port_buf, &hints, &ai);
 13 
 14     if (error != 0) {
 15         if (error != EAI_SYSTEM)
 16           fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
 17         else
 18           perror("getaddrinfo()");
 19         return 1;
 20     }
 21 
 22     for (next= ai; next; next= next->ai_next) {
 23         conn *listen_conn_add;
 24 
 25         // new_socket() 申請了一個 UNIX 域套接字,通過調用socket()方法創建套接字,並設置把套接字為非阻塞
 26         if ((sfd = new_socket(next)) == -1) {
 27             
 28             // ......
 29             
 30         }// if
 31 
 32         
 33         // ......
 34         
 35 
 36         // bind() 綁定源IP的端口
 37         if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
 38             
 39             // ......
 40             
 41         } else {
 42             success++;
 43             // bind()調用成功後,調用listen()
 44             if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
 45                 
 46                 // ......
 47                 
 48             }
 49             
 50             // ......
 51             
 52         }
 53 
 54         // UDP 和 TCP 區分對待, UDP 沒有連接概念, 只要綁定服務器之後, 直接讀取 socket 就好了, 所以與它對應 conn 的初始狀態應該為 conn_read; 而 TCP 對應的 conn 初始狀態應該為 conn_listening
 55         if (IS_UDP(transport)) {
 56             // UDP
 57             int c;
 58 
 59             for (c = 0; c < settings.num_threads_per_udp; c++) {
 60                 /* this is guaranteed to hit all threads because we round-robin */
 61                 // 分發新的連接到線程池中的一個線程中
 62                 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
 63                                   UDP_READ_BUFFER_SIZE, transport);
 64             }
 65         } else {
 66             // TCP 要建立連接
 67             if (!(listen_conn_add = conn_new(sfd, conn_listening,
 68                                              EV_READ | EV_PERSIST, 1,
 69                                              transport, main_base))) {
 70                 fprintf(stderr, "failed to create listening connection\n");
 71                 exit(EXIT_FAILURE);
 72             }
 73 
 74             // 放在頭部, listen_conn 是頭指針
 75             listen_conn_add->next = listen_conn;
 76             listen_conn = listen_conn_add;
 77         }
 78     }
 79 
 80     freeaddrinfo(ai);
 81 
 82     /* Return zero iff we detected no errors in starting up connections */
 83     return success == 0;
 84 }
 85 
 86 
 87 
 88 
 89 // 填寫 struct conn 結構體, 包括 struct conn 中的 event 結構, 並返回
 90 conn *conn_new(const int sfd, enum conn_states init_state,
 91                 const int event_flags,
 92                 const int read_buffer_size, enum network_transport transport,
 93                 struct event_base *base) {
 94     // c 指向一個新的 conn 空間
 95     // 可能是出於性能的考慮, memcached 預分配了若干個 struct conn 空間
 96     {
 97         /* data */
 98     };
 99     conn *c = conn_from_freelist();
100 
101     if (NULL == c) {
102         // 可能分配失敗了, 因為默認數量有限. 進行新的擴展,conn_init()中初始數量是200
103         if (!(c = (conn *)calloc(1, sizeof(conn)))) {
104             fprintf(stderr, "calloc()\n");
105             return NULL;
106         }
107 
108         // ......
109         // 填充conn結構體
110         
111     }// if
112 
113     
114     // ......
115     
116     
117     // libevent 操作: 設置事件, 設置回調函數 event_handler()
118     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
119 
120     // libevent 操作:設置 c->event 的 event_base
121     event_base_set(base, &c->event);
122 
123     c->ev_flags = event_flags;
124 
125     // libevent 操作: 添加事件
126     if (event_add(&c->event, 0) == -1) {
127 
128         // ......
129         
130     }
131 
132     
133     // ......
134     
135 
136     return c;
137 }

 

 

 

  


java怎實現連接某一個memcached服務器

所謂的memcache驅動無非是提供了 與memcache socket連接和序列化的功能
 

linux系統中memcachedconnection連接狀態改變怎辦

在百度上問編程題!?若是C入門還有人回答......
死心吧
 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved