程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> MYSQL數據庫 >> MySQL綜合教程 >> Mysql源碼學習——Thread Manager

Mysql源碼學習——Thread Manager

編輯:MySQL綜合教程

 

一、前言

    上篇的Connection Manager中,曾提及對於一個新到來的Connection,服務器會創建一個新的線程來處理這個連接。

其實沒那麼簡單,為了提高系統效率,減少頻繁創建線程和中止線程的系統消耗,Mysql使用了線程緩沖區的概念,即如果

一個連接斷開,則並不銷毀承載其的線程,而是將此線程放入線程緩沖區,並處於掛起狀態,當下一個新的Connection到來

時,首先去線程緩沖區去查找是否有空閒的線程,如果有,則使用之,如果沒有則新建線程。本問主要介紹這個線程緩沖區,

首先介紹下基本的概念。

 

二、基本概念

    1.線程創建函數

    大家知道,Mysql現在是插件式的存儲引擎,只要實現規定的接口,就可實現自己的存儲引擎。故Mysql的線程創建除了

出現在主服務器框架外,存儲引擎也可能會進行線程的創建。通過設置斷點,在我調試的版本中,發現了兩個創建線程的函數。

pthread_create:Mysql自用的創建線程函數 os_thread_create:存儲引擎innobase的創建線程的函數

    os_thread_create是存儲引擎innobase的線程函數,先擱淺不研究了,重點看下pthread_create,首先看下其源碼。

 

int pthread_create(pthread_t *thread_id, pthread_attr_t *attr,            pthread_handler func, void *param) {   HANDLE hThread;   struct pthread_map *map;   DBUG_ENTER("pthread_create");     if (!(map=malloc(sizeof(*map))))     DBUG_RETURN(-1);

map->func=func; map->param=param;

  pthread_mutex_lock(&THR_LOCK_thread); #ifdef __BORLANDC__   hThread=(HANDLE)_beginthread((void(_USERENTRY *)(void *)) pthread_start,                    attr->dwStackSize ? attr->dwStackSize :                    65535, (void*) map); #else

hThread=(HANDLE)_beginthread((void( __cdecl *)(void *)) pthread_start, attr->dwStackSize ? attr->dwStackSize : 65535, (void*) map);

#endif   DBUG_PRINT("info", ("hThread=%lu",(long) hThread));   *thread_id=map->pthreadself=hThread;   pthread_mutex_unlock(&THR_LOCK_thread);     if (hThread == (HANDLE) -1)   {     int error=errno;     DBUG_PRINT("error",            ("Can't create thread to handle request (error %d)",error));     DBUG_RETURN(error ? error : -1);   }   VOID(SetThreadPriority(hThread, attr->priority)) ;   DBUG_RETURN(0); }

上面代碼首先構造了一個map結構體,成員分別是函數地址和傳入參數。然後調用操作系統的接口,_beginthread,但是執行函數並不是傳入的函數——func,而是pthread_start,參數為map。繼續跟蹤pthread_start。

pthread_handler_t pthread_start(void *param) {   pthread_handler

func=((struct pthread_map *) param)->func

;   void *func_param=((struct pthread_map *) param)->param;   my_thread_init();         /* Will always succeed in windows */   pthread_mutex_lock(&THR_LOCK_thread);   /* Wait for beginthread to return */   win_pthread_self=((struct pthread_map *) param)->pthreadself;   pthread_mutex_unlock(&THR_LOCK_thread);   free((char*) param);            /* Free param from create */   pthread_exit((void*) (*func)(func_param));   return 0;               /* Safety */ }

   可以看出,pthread_start中調用了map的func元素,作為真正執行的函數體。OK,創建線程的函數跟蹤到此!

   2.服務器啟動時創建了哪些函數?

   通過在兩個創建線程的地方設置斷點,總結了下,在服務器啟動時,創建了如下的線程。

pthread_create創建的線程

創建線程函數 線程執行函數

create_shutdown_thread

handle_shutdown

start_handle_manager

handle_manager

handle_connections_methods

handle_connections_sockets

 

innobase的os_thread_create創建的線程:

 

創建線程函數 線程執行函數

innobase_start_or_create_for_mysql

io_handler_thread(4個)

recv_recovery_from_checkpoint_finish

trx_rollback_or_clean_all_without_sess

innobase_start_or_create_for_mysql

srv_lock_timeout_thread

 

srv_error_monitor_thread

 

srv_monitor_thread

 

srv_master_thread

 

    還可以在調試過程中,通過暫停來看此時服務器中的線程,如下圖:

1

   

三、線程緩沖池

        Mysql支持線程緩存,在多線程連接模式下,如果連接斷開後,將這個線程放入空閒線程緩沖區,在下次有連接到來時,

先去緩沖池中查找是否有空閒線程,有則用之,無則創建。啟動時可以設置線程緩沖池的數目:

Mysqld.exe --thread_cache_size=10

      在一個連接斷開時,會調用cache_thread函數,將空閒的線程加入到cache中,以備後用。源碼如下:

static bool cache_thread() {   safe_mutex_assert_owner(&LOCK_thread_count);   if (

cached_thread_count < thread_cache_size

&&      ! abort_loop && !kill_cached_threads)  {    /* Don't kill the thread, just put it in cache for reuse */    DBUG_PRINT("info", ("Adding thread to cache"));    cached_thread_count++;    while (!abort_loop && ! wake_thread && ! kill_cached_threads)      (void) pthread_cond_wait(&COND_thread_cache, &LOCK_thread_count);    cached_thread_count--;    if (kill_cached_threads)

pthread_cond_signal(&COND_flush_thread_cache);

  if (wake_thread)   {     THD *thd;     wake_thread--;     thd= thread_cache.get();     thd->thread_stack= (char*) &thd;          // For store_globals     (void) thd->store_globals();     /*       THD::mysys_var::abort is associated with physical thread rather       than with THD object. So we need to reset this flag before using       this thread for handling of new THD object/connection.     */     thd->mysys_var->abort= 0;     thd->thr_create_utime= my_micro_time();     threads.append(thd);     return(1);   } } return(0); }

    上面我們的啟動參數設置線程緩沖區為10,此時對應代碼裡面的thread_cache_size = 10,cached_thread_count記錄

了此刻cache中的空閒線程數目,只有在cache未滿的情況下,才會將新的空閒線程加入緩沖池中。加入到緩沖區其實就是將線

程掛起,pthread_cond_wait函數便是線程等待函數,在此函數中,會調用WaitForMultipleObjects進行事件等待。具體源碼

如下:

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,                            struct timespec *abstime) {   int result;   long timeout;   union ft64 now;     if( abstime != NULL )   {     GetSystemTimeAsFileTime(&now.ft);       /*       Calculate time left to abstime       - subtract start time from current time(values are in 100ns units)       - convert to millisec by dividing with 10000     */     timeout= (long)((abstime->tv.i64 - now.i64) / 10000);          /* Don't allow the timeout to be negative */     if (timeout < 0)       timeout= 0L;       /*       Make sure the calucated timeout does not exceed original timeout       value which could cause "wait for ever" if system time changes     */     if (timeout > abstime->max_timeout_msec)       timeout= abstime->max_timeout_msec;     }   else   {     /* No time specified; don't expire */     timeout= INFINITE;   }     /*     Block access if previous broadcast hasn't finished.     This is just for safety and should normally not     affect the total time spent in this function.   */   WaitForSingleObject(cond->broadcast_block_event, INFINITE);     EnterCriticalSection(&cond->lock_waiting);   cond->waiting++;   LeaveCriticalSection(&cond->lock_waiting);     LeaveCriticalSection(mutex);

result= WaitForMultipleObjects(2, cond->events, FALSE, timeout);

  EnterCriticalSection(&cond->lock_waiting);   cond->waiting--;      if (cond->waiting == 0)   {     /*       We're the last waiter to be notified or to stop waiting, so       reset the manual event.     */     /* Close broadcast gate */     ResetEvent(cond->events[BROADCAST]);     /* Open block gate */     SetEvent(cond->broadcast_block_event);   }   LeaveCriticalSection(&cond->lock_waiting);      EnterCriticalSection(mutex);     return result == WAIT_TIMEOUT ? ETIMEDOUT : 0; }

    此處是等待時間,何處進行事件通知呢?我們再次來到上篇所提及的為新的連接創建線程的代碼中:

void create_thread_to_handle_connection(THD *thd) {   if (cached_thread_count > wake_thread)   {     /* Get thread from cache */     thread_cache.append(thd);     wake_thread++;

pthread_cond_signal(&COND_thread_cache);

  }   Else ... }

    上篇文章我們其實只講了ELSE分支,而忽略了IF分支。wake_thread代表了喚醒的線程數,即在線程緩沖區中被再次使用的

線程,如果cache中的總數>被重新使用的數目,說明還有空閒的線程,此時進入if分支,調用phtread_cond_signal喚醒上面掛起

的空閒線程。

 

線程管理就到此為止了,這裡只是介紹了下線程緩沖區的工作原理,並沒有具體去介紹如何利用EVENT進行線程的掛起和喚醒,這些都是借助了操作系統的特性,有興趣的可以自己研究下。這篇就到此為止,下節會介紹Mysql的用戶身份認證原理和實現。

 

PS. 男怕入錯行,夜半三更忙,一行又一行

 

 



摘自 心中無碼

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved