memcached是什麼呢?memcached是一個優秀的、高性能的內存緩存工具。
memcached具有以下的特點:
本文主要講解memcached的連接模型,memcached由一條主線程(連接線程)監聽連接,然後把成功的連接交給子線程(工作線程)處理讀寫操作。N條【啟動memcached通過-t命令指定】子線程(工作線程)負責讀寫數據,一條子線程(工作線程)維護著多個連接。一個conn結構體對象對應著一個連接,主線程(連接線程)成功連接後,會把連接的內容賦值到一個conn結構體對象,並把這個conn結構體對象傳遞給一條子線程(工作線程)處理。

conn結構體:

1 //memcached.c
2 int main{
3
4 // ......
5
6 // 第一步:初始化主線程的事件機制
7 /* initialize main thread libevent instance */
8 // libevent 事件機制初始化
9 main_base = event_init();
10
11 // ......
12
13 // 第二步:初始化 N 個 (初始值200,當連接超過200個的時候會往上遞增) conn結構體對象
14 // 空閒連接數組初始化
15 conn_init();
16
17 // ......
18
19
20 // 第三步:啟動工作線程
21 /* start up worker threads if MT mode */
22 thread_init(settings.num_threads, main_base);
23
24 // ......
25
26 // 第四步:初始化socket,綁定監聽端口,為主線程的事件機制設置連接監聽事件(event_set、event_add)
27 /**
28 memcached 有可配置的兩種模式: unix 域套接字和 TCP/UDP, 允許客戶端以兩種方式向 memcached 發起請求. 客戶端和服務器在同一個主機上的情況下可以用 unix 域套接字, 否則可以采用 TCP/UDP 的模式. 兩種模式是不兼容的.
29 以下的代碼便是根據 settings.socketpath 的值來決定啟用哪種方式.
30 */
31 /**
32 第一種, unix 域套接字.
33 */
34 /* create unix mode sockets after dropping privileges */
35 if (settings.socketpath != NULL) {
36 errno = 0;
37 if (server_socket_unix(settings.socketpath,settings.access)) {
38 vperror("failed to listen on UNIX socket: %s", settings.socketpath);
39 exit(EX_OSERR);
40 }
41 }
42
43 /**
44 第二種, TCP/UDP.
45 */
46 /* create the listening socket, bind it, and init */
47 if (settings.socketpath == NULL) {
48 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
49 char temp_portnumber_filename[PATH_MAX];
50 FILE *portnumber_file = NULL;
51
52 // 讀取端口號文件
53 if (portnumber_filename != NULL) {
54 snprintf(temp_portnumber_filename,
55 sizeof(temp_portnumber_filename),
56 "%s.lck", portnumber_filename);
57
58 portnumber_file = fopen(temp_portnumber_filename, "a");
59 if (portnumber_file == NULL) {
60 fprintf(stderr, "Failed to open \"%s\": %s\n",
61 temp_portnumber_filename, strerror(errno));
62 }
63 }
64
65 // TCP
66 errno = 0;
67 if (settings.port && server_sockets(settings.port, tcp_transport,
68 portnumber_file)) {
69 vperror("failed to listen on TCP port %d", settings.port);
70 exit(EX_OSERR);
71 }
72
73 /*
74 * initialization order: first create the listening sockets
75 * (may need root on low ports), then drop root if needed,
76 * then daemonise if needed, then init libevent (in some cases
77 * descriptors created by libevent wouldn't survive forking).
78 */
79
80 // UDP
81 /* create the UDP listening socket and bind it */
82 errno = 0;
83 if (settings.udpport && server_sockets(settings.udpport, udp_transport,
84 portnumber_file)) {
85 vperror("failed to listen on UDP port %d", settings.udpport);
86 exit(EX_OSERR);
87 }
88
89 if (portnumber_file) {
90 fclose(portnumber_file);
91 rename(temp_portnumber_filename, portnumber_filename);
92 }
93 }
94
95 // ......
96
97
98 // 第五步:主線程進入事件循環
99 /* enter the event loop */
100 // 進入事件循環
101 if (event_base_loop(main_base, 0) != 0) {
102 retval = EXIT_FAILURE;
103 }
104
105 // ......
106
107 }
LIBEVENT_THREAD 結構體:

第三步工作線程的詳細啟動過程:
1 /*
2 * thread.c
3 *
4 * 初始化線程子系統, 創建工作線程
5 * Initializes the thread subsystem, creating various worker threads.
6 *
7 * nthreads Number of worker event handler threads to spawn
8 * 需准備的線程數
9 * main_base Event base for main thread
10 * 分發線程
11 */
12 void thread_init(int nthreads, struct event_base *main_base) {
13 int i;
14 int power;
15
16 // 互斥量初始化
17 pthread_mutex_init(&cache_lock, NULL);
18 pthread_mutex_init(&stats_lock, NULL);
19
20 pthread_mutex_init(&init_lock, NULL);
21 //條件同步
22 pthread_cond_init(&init_cond, NULL);
23
24 pthread_mutex_init(&cqi_freelist_lock, NULL);
25 cqi_freelist = NULL;
26
27 /* Want a wide lock table, but don't waste memory */
28 if (nthreads < 3) {
29 power = 10;
30 } else if (nthreads < 4) {
31 power = 11;
32 } else if (nthreads < 5) {
33 power = 12;
34 } else {
35 // 2^13
36 /* 8192 buckets, and central locks don't scale much past 5 threads */
37 power = 13;
38 }
39
40 // hashsize = 2^n
41 item_lock_count = hashsize(power);
42
43 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
44 if (! item_locks) {
45 perror("Can't allocate item locks");
46 exit(1);
47 }
48 // 初始化
49 for (i = 0; i < item_lock_count; i++) {
50 pthread_mutex_init(&item_locks[i], NULL);
51 }
52 //item_lock_type_key設置為線程的私有變量的key
53 pthread_key_create(&item_lock_type_key, NULL);
54 pthread_mutex_init(&item_global_lock, NULL);
55
56
57 // LIBEVENT_THREAD 是結合 libevent 使用的結構體, event_base, 讀寫管道
58 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
59 if (! threads) {
60 perror("Can't allocate thread descriptors");
61 exit(1);
62 }
63
64 // main_base 是分發任務的線程, 即主線程
65 dispatcher_thread.base = main_base;
66 dispatcher_thread.thread_id = pthread_self();
67
68 // 管道, libevent 通知用的
69 // 一個 LIBEVENT_THREAD 結構體對象對應由一條子線程維護
70 // 子線程通過讀管道來接收主線程的命令(例如主線程接收到新連接,會往子線程的讀管道寫入字符'c',子線程接收到命令就會做出相應的處理)
71 for (i = 0; i < nthreads; i++) {
72 int fds[2];
73 if (pipe(fds)) {
74 perror("Can't create notify pipe");
75 exit(1);
76 }
77
78 // 讀管道
79 threads[i].notify_receive_fd = fds[0];
80 // 寫管道
81 threads[i].notify_send_fd = fds[1];
82
83 // 初始化線程信息數據結構, 其中就將 event 結構體的回調函數設置為 thread_libevent_process(),此時線程還沒有創建
84 setup_thread(&threads[i]);
85 /* Reserve three fds for the libevent base, and two for the pipe */
86 stats.reserved_fds += 5;
87 }
88
89 /* Create threads after we've done all the libevent setup. */
90 // 創建並初始化線程, 線程的代碼都是 work_libevent()
91 for (i = 0; i < nthreads; i++) {
92 // 調用 pthread_attr_init() 和 pthread_create() 來創建子線程
93 // 子線程的函數入口 worker_libevent ,負責啟動子線程的事件循環
94 create_worker(worker_libevent, &threads[i]);
95 }
96
97 /* Wait for all the threads to set themselves up before returning. */
98 pthread_mutex_lock(&init_lock);
99 // wait_for_thread_registration() 是 pthread_cond_wait 的調用
100 wait_for_thread_registration(nthreads);
101 pthread_mutex_unlock(&init_lock);
102 }
103
104
105
106
107 /*
108 * Set up a thread's information.
109 */
110 // 填充 LIBEVENT_THREAD 結構體, 其中包括:
111 // 填充 struct event
112 // 初始化線程工作隊列
113 // 初始化互斥量
114 // 等
115 static void setup_thread(LIBEVENT_THREAD *me) {
116 // 子線程的事件機制,每條子線程都有一個事件機制
117 me->base = event_init();
118 if (! me->base) {
119 fprintf(stderr, "Can't allocate event base\n");
120 exit(1);
121 }
122
123 /* Listen for notifications from other threads */
124 // 在線程數據結構初始化的時候, 為 me->notify_receive_fd 讀管道注冊讀事件, 回調函數是 thread_libevent_process()
125 // 為子線程的事件機制添加事件
126 event_set(&me->notify_event, me->notify_receive_fd,
127 EV_READ | EV_PERSIST, thread_libevent_process, me);
128 event_base_set(me->base, &me->notify_event);
129
130 if (event_add(&me->notify_event, 0) == -1) {
131 fprintf(stderr, "Can't monitor libevent notify pipe\n");
132 exit(1);
133 }
134
135 // ......
136 }
137
138
139
140 /*
141 * Worker thread: main event loop
142 * 線程函數入口, 啟動事件循環
143 */
144 static void *worker_libevent(void *arg) {
145 LIBEVENT_THREAD *me = arg;
146
147 // ......
148
149 // 進入事件循環
150 event_base_loop(me->base, 0);
151 return NULL;
152 }
子線程讀管道回調函數:

第四步主要是初始化socket、綁定服務器端口和IP、為主線程事件機制添加監聽連接事件:
1 // memcached.c
2 // server_sockets()->server_socket()
3
4 static int server_socket(const char *interface,
5 int port,
6 enum network_transport transport,
7 FILE *portnumber_file) {
8
9 // ......
10
11 // getaddrinfo函數能夠處理名字到地址以及服務到端口這兩種轉換,返回的是一個addrinfo的結構(列表)指針而不是一個地址清單。
12 error= getaddrinfo(interface, port_buf, &hints, &ai);
13
14 if (error != 0) {
15 if (error != EAI_SYSTEM)
16 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
17 else
18 perror("getaddrinfo()");
19 return 1;
20 }
21
22 for (next= ai; next; next= next->ai_next) {
23 conn *listen_conn_add;
24
25 // new_socket() 申請了一個 UNIX 域套接字,通過調用socket()方法創建套接字,並設置把套接字為非阻塞
26 if ((sfd = new_socket(next)) == -1) {
27
28 // ......
29
30 }// if
31
32
33 // ......
34
35
36 // bind() 綁定源IP的端口
37 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
38
39 // ......
40
41 } else {
42 success++;
43 // bind()調用成功後,調用listen()
44 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
45
46 // ......
47
48 }
49
50 // ......
51
52 }
53
54 // UDP 和 TCP 區分對待, UDP 沒有連接概念, 只要綁定服務器之後, 直接讀取 socket 就好了, 所以與它對應 conn 的初始狀態應該為 conn_read; 而 TCP 對應的 conn 初始狀態應該為 conn_listening
55 if (IS_UDP(transport)) {
56 // UDP
57 int c;
58
59 for (c = 0; c < settings.num_threads_per_udp; c++) {
60 /* this is guaranteed to hit all threads because we round-robin */
61 // 分發新的連接到線程池中的一個線程中
62 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
63 UDP_READ_BUFFER_SIZE, transport);
64 }
65 } else {
66 // TCP 要建立連接
67 if (!(listen_conn_add = conn_new(sfd, conn_listening,
68 EV_READ | EV_PERSIST, 1,
69 transport, main_base))) {
70 fprintf(stderr, "failed to create listening connection\n");
71 exit(EXIT_FAILURE);
72 }
73
74 // 放在頭部, listen_conn 是頭指針
75 listen_conn_add->next = listen_conn;
76 listen_conn = listen_conn_add;
77 }
78 }
79
80 freeaddrinfo(ai);
81
82 /* Return zero iff we detected no errors in starting up connections */
83 return success == 0;
84 }
85
86
87
88
89 // 填寫 struct conn 結構體, 包括 struct conn 中的 event 結構, 並返回
90 conn *conn_new(const int sfd, enum conn_states init_state,
91 const int event_flags,
92 const int read_buffer_size, enum network_transport transport,
93 struct event_base *base) {
94 // c 指向一個新的 conn 空間
95 // 可能是出於性能的考慮, memcached 預分配了若干個 struct conn 空間
96 {
97 /* data */
98 };
99 conn *c = conn_from_freelist();
100
101 if (NULL == c) {
102 // 可能分配失敗了, 因為默認數量有限. 進行新的擴展,conn_init()中初始數量是200
103 if (!(c = (conn *)calloc(1, sizeof(conn)))) {
104 fprintf(stderr, "calloc()\n");
105 return NULL;
106 }
107
108 // ......
109 // 填充conn結構體
110
111 }// if
112
113
114 // ......
115
116
117 // libevent 操作: 設置事件, 設置回調函數 event_handler()
118 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
119
120 // libevent 操作:設置 c->event 的 event_base
121 event_base_set(base, &c->event);
122
123 c->ev_flags = event_flags;
124
125 // libevent 操作: 添加事件
126 if (event_add(&c->event, 0) == -1) {
127
128 // ......
129
130 }
131
132
133 // ......
134
135
136 return c;
137 }
所謂的memcache驅動無非是提供了 與memcache socket連接和序列化的功能
在百度上問編程題!?若是C入門還有人回答......
死心吧