程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> rt-thread的IPC機制之郵箱源碼分析

rt-thread的IPC機制之郵箱源碼分析

編輯:C++入門知識

郵箱服務是實時操作系統中一種典型的任務間通信方法,通常開銷比較低,效率較高,每一封郵件只能容納固定的4字節內容(針對32位處理系統,剛好能夠容納一個指針). 如下圖所示,線程或中斷服務例程把一則4字節長度(典型的是一個指針)的郵件發送到郵箱中。而一個或多個線程可以從郵箱中接收這些郵件進行處理。 RT-Thread采用的郵箱通信機制有點類型傳統意義上的管道,用於線程間通訊。它是線程,中斷服務,定時器向線程發送消息的有效手段。郵箱與線程對象等之間是相互獨立的。線程,中斷服務和定時器都可以向郵箱發送消息,但是只有線程能夠接收消息(因為當郵箱為空時,線程將有可能被掛起)。RT-Thread的郵箱中共可存放固定條數的郵件,郵箱容量在創建郵箱時設定,每個郵件大小為4字節,正好是一個指針的大小。當需要在線程間傳遞比較大的消息時,可以傳遞指向一個緩沖區的指針。當郵箱滿時,線程等不再發送新郵件,返回-RT EFULL。當郵箱空時,將可能掛起正在試圖接收郵件的線程,使其等待,當郵箱中有新郵件時,再喚醒等待在郵箱上的線程,使其能夠接收新郵件並繼續後續的處理。   1 郵箱控制塊 [cpp]   /**   * mailbox structure   */   struct rt_mailbox   {       struct rt_ipc_object parent; //繼承自IPC對象          rt_uint32_t         *msg_pool;//消息緩沖地址          rt_uint16_t          size;   //可存放的消息最大條數          rt_uint16_t          entry;    當前郵箱中存放的消息條數       rt_uint16_t          in_offset;   //消息存入的偏移位置       rt_uint16_t          out_offset;  //消息取出時的偏移位置          rt_list_t            suspend_sender_thread; //發送郵件的線程(當沒有取走時,發送線程會被掛起)   };   typedef struct rt_mailbox *rt_mailbox_t;     2 郵箱相關接口源碼分析 2.1 初始化 [cpp]  /**   * This function will initialize a mailbox and put it under control of resource   * management.   *   * @param mb the mailbox object   * @param name the name of mailbox   * @param msgpool the begin address of buffer to save received mail   * @param size the size of mailbox   * @param flag the flag of mailbox   *   * @return the operation status, RT_EOK on successful   */   rt_err_t rt_mb_init(rt_mailbox_t mb,                       const char  *name,                       void        *msgpool,//消息緩沖地址                       rt_size_t    size,//可容納的消息條數                       rt_uint8_t   flag)   {       RT_ASSERT(mb != RT_NULL);          /* init object */       rt_object_init(&(mb->parent.parent), RT_Object_Class_MailBox, name);//初始化內核對象          /* set parent flag */       mb->parent.parent.flag = flag;//設置標志          /* init ipc object */       rt_ipc_object_init(&(mb->parent));//初始化IPC對象          /* init mailbox */       mb->msg_pool   = msgpool;//設置消息緩沖地址       mb->size       = size;//設置最大可容納消息條數       mb->entry      = 0;//當前接收到的消息條數為0條       mb->in_offset  = 0;//入口消息偏移位置為0       mb->out_offset = 0;//出口消息偏移位置為0          /* init an additional list of sender suspend thread */       rt_list_init(&(mb->suspend_sender_thread));//初始化郵箱的發送線程掛起鏈表          return RT_EOK;   }     2.2 創建郵箱 [cpp]   /**   * This function will create a mailbox object from system resource   *   * @param name the name of mailbox   * @param size the size of mailbox   * @param flag the flag of mailbox   *   * @return the created mailbox, RT_NULL on error happen   */   rt_mailbox_t rt_mb_create(const char *name, rt_size_t size, rt_uint8_t flag)   {       rt_mailbox_t mb;          RT_DEBUG_NOT_IN_INTERRUPT;          /* allocate object */       mb = (rt_mailbox_t)rt_object_allocate(RT_Object_Class_MailBox, name);//動態分配郵箱內核對象       if (mb == RT_NULL)           return mb;          /* set parent */       mb->parent.parent.flag = flag;//設置內核標志          /* init ipc object */       rt_ipc_object_init(&(mb->parent));//初始化IPC對象          /* init mailbox */       mb->size     = size;//設置最大可容納消息條數       mb->msg_pool = rt_malloc(mb->size * sizeof(rt_uint32_t));//動態分配消息接收緩沖       if (mb->msg_pool == RT_NULL)       {           /* delete mailbox object */           rt_object_delete(&(mb->parent.parent));              return RT_NULL;       }       mb->entry      = 0;//默認郵箱內的消息條數為0       mb->in_offset  = 0;//入口偏移位置為0       mb->out_offset = 0;//出口偏移位置為0          /* init an additional list of sender suspend thread */       rt_list_init(&(mb->suspend_sender_thread));//初始化郵箱的發送線程掛起鏈表          return mb;   }   2.3 脫離郵箱 [cpp]   /**   * This function will detach a mailbox from resource management   *   * @param mb the mailbox object   *   * @return the operation status, RT_EOK on successful   */   rt_err_t rt_mb_detach(rt_mailbox_t mb)   {       /* parameter check */       RT_ASSERT(mb != RT_NULL);          /* resume all suspended thread */       rt_ipc_list_resume_all(&(mb->parent.suspend_thread));//喚醒所有掛起的接收線程       /* also resume all mailbox private suspended thread */       rt_ipc_list_resume_all(&(mb->suspend_sender_thread));//喚醒所有掛起的發送線程          /* detach mailbox object */       rt_object_detach(&(mb->parent.parent));//脫離郵箱對應的內核對象          return RT_EOK;   }     2.4 刪除郵箱 [cpp]   /**   * This function will delete a mailbox object and release the memory   *   * @param mb the mailbox object   *   * @return the error code   */   rt_err_t rt_mb_delete(rt_mailbox_t mb)   {       RT_DEBUG_NOT_IN_INTERRUPT;          /* parameter check */       RT_ASSERT(mb != RT_NULL);          /* resume all suspended thread */       rt_ipc_list_resume_all(&(mb->parent.suspend_thread));//喚醒所有掛起的接收線程          /* also resume all mailbox private suspended thread */       rt_ipc_list_resume_all(&(mb->suspend_sender_thread));//喚醒所有掛起的發送線程      #if defined(RT_USING_MODULE) && defined(RT_USING_SLAB)       /* the mb object belongs to an application module */       if (mb->parent.parent.flag & RT_OBJECT_FLAG_MODULE)           rt_module_free(mb->parent.parent.module_id, mb->msg_pool);//如果有模塊,則需要卸載模塊       else   #endif          /* free mailbox pool */       rt_free(mb->msg_pool);//刪除接收緩沖          /* delete mailbox object */       rt_object_delete(&(mb->parent.parent));//刪除郵箱對應的內核對象          return RT_EOK;   }     2.5 接收郵件 [cpp]   /**   * This function will receive a mail from mailbox object, if there is no mail   * in mailbox object, the thread shall wait for a specified time.   *   * @param mb the mailbox object   * @param value the received mail will be saved in   * @param timeout the waiting time   *   * @return the error code   */   rt_err_t rt_mb_recv(rt_mailbox_t mb, rt_uint32_t *value, rt_int32_t timeout)   {       struct rt_thread *thread;       register rt_ubase_t temp;       rt_uint32_t tick_delta;          /* parameter check */       RT_ASSERT(mb != RT_NULL);          /* initialize delta tick */       tick_delta = 0;       /* get current thread */       thread = rt_thread_self();//獲取當前線程          RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mb->parent.parent)));          /* disable interrupt */       temp = rt_hw_interrupt_disable();//關中斷          /* for non-blocking call */       if (mb->entry == 0 && timeout == 0)//如果當前郵箱中無信件且當前線程等待時間為0,則立即返回超時錯誤       {           rt_hw_interrupt_enable(temp);//開中斷              return -RT_ETIMEOUT;       }          /* mailbox is empty */       while (mb->entry == 0)//如果當前郵箱無信件       {           /* reset error number in thread */           thread->error = RT_EOK;//初始化error值為RT_EOK              /* no waiting, return timeout */           if (timeout == 0)//如果當前線程無等待時間,則立即返回超時錯誤           {               /* enable interrupt */               rt_hw_interrupt_enable(temp);                  thread->error = -RT_ETIMEOUT;                              return -RT_ETIMEOUT;           }              RT_DEBUG_NOT_IN_INTERRUPT;//確保當前不是在ISR中使用           /* suspend current thread */           rt_ipc_list_suspend(&(mb->parent.suspend_thread),//掛起當前接收線程,(當前線程的timeout!=0)                               thread,                               mb->parent.parent.flag);              /* has waiting time, start thread timer */           if (timeout > 0)           {               /* get the start tick of timer */               tick_delta = rt_tick_get();//得到當前tick                  RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_recv: start timer of thread:%s\n",                                           thread->name));                  /* reset the timeout of thread timer and start it */               rt_timer_control(&(thread->thread_timer),//設置定時器                                RT_TIMER_CTRL_SET_TIME,                                &timeout);               rt_timer_start(&(thread->thread_timer));//啟動定時器           }              /* enable interrupt */           rt_hw_interrupt_enable(temp);//開中斷              /* re-schedule */           rt_schedule();//重新調度              /* resume from suspend state */           if (thread->error != RT_EOK)//如果沒有正常收到郵件,參見互斥鎖相關說明           {               /* return error */               return thread->error;//則返回錯誤           }              /* disable interrupt */           temp = rt_hw_interrupt_disable();//開中斷              /* if it's not waiting forever and then re-calculate timeout tick */           if (timeout > 0)//如果時間參數不是RT_WAITING_FOREVER           {               tick_delta = rt_tick_get() - tick_delta;//得到當前已經過了多長時間               timeout -= tick_delta;//計算剩下等待時間               if (timeout < 0)//如果所有等待時間都已耗盡,則直接就timeout設置為0,再進入下一循環                   timeout = 0;           }       }          /* fill ptr */       *value = mb->msg_pool[mb->out_offset];//保存接收到的郵件內容          /* increase output offset */       ++ mb->out_offset;//出品偏移加1       if (mb->out_offset >= mb->size)//如果指向緩沖末尾,則改為指向首地址,由此得出郵箱的接收緩沖為一個ring buffer           mb->out_offset = 0;       /* decrease message entry */       mb->entry --;//取出郵件後,郵件個數應減1          /* resume suspended thread */       if (!rt_list_isempty(&(mb->suspend_sender_thread)))//如果郵箱的掛起發送線程不為空       {           rt_ipc_list_resume(&(mb->suspend_sender_thread));//則喚醒第一個掛起的發送線程              /* enable interrupt */           rt_hw_interrupt_enable(temp);//開中斷              RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));              rt_schedule();//重新調度              return RT_EOK;       }          /* enable interrupt */       rt_hw_interrupt_enable(temp);//開中斷          RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));          return RT_EOK;   }   接收郵件函數會判斷當前郵箱是否為空且時間參數為0, 如果是則直接返回,然後進行一個while循環,在這個while循環中的判斷條件是郵箱為空,為什麼要這麼做呢?因此程序將在這個 while循環內一直等待郵件的到達,只有郵件到達才會跳出這個循環(如果等待超時則直接函數返回了),另一個是原因呆會再解釋。進入while循環後,程序再次判斷當前的等待時間參數是否為0,如果是則直接返回,為什麼這裡還要這樣判斷呢?這裡是有一定的用意的,且接著往下看。接著掛起當前線程,再次判斷時間參數是否大於0,這裡主要是掃除timeout=RT_WAITING_FOREVER的情況,因為RT_WAITING_FOREVER宏定義為-1.在if語句內記錄當前的時間點,然後設置一定時器並開啟,接著重新調度。在重新調度後,系統可能切換到其它線程,假設一段時間內,系統再次切換回來,原因可能有多種,1 郵箱被脫離,此時當前線程thread->error=-RT_ERROR.2 定時器時間到達,但是郵件還未到達,此時thread->error=-RT_ETIMEOUT;3:郵件到達,本線程在發送郵件函數中被喚醒(注:發送郵件函數中只是喚醒第一條等待郵件的線程),此時,thread->error還是保持原值RT_EOK不變;4:其它原因假設一段時間後線程切換回來,此時error的值也一直保持原樣RT_EOK不變.因此,在重新調度了線程之後,才會有一個if語句通過判斷thread->error的值是否為RT_EOK來判斷當前線程是否被發送郵件函數喚醒。如果不是,則直接返回錯誤。接下來,按原則上說,當前線程一定是被發送郵件函數喚醒,因此,當前一定會存在接收的郵件,但是接下來的幾行代碼卻是再次判斷時間參數大於0的情況下,計算還剩余多多時間,然後返回到while循環接著循環...等等,不是當前已經接收到了郵件麼?那麼為什麼不直接取郵件,而是還要進行下一次循環?這不是浪費時間麼?這裡給出的答案是,第一,確實原則上這時應該是收到郵件才會執行到這,但是,如果真的來了郵件的話,判斷的唯一標准是while循環的判斷條件,即郵箱內的接收信件條數不能為空,或為空,則判斷循環,或不為空,則自然不會進行到while循環中了。但如果這時發現原來郵箱還是為空,那麼當前線程則應該繼續等待了,此時就應該計算出下一次循環中需要等待的剩下時間,好讓下一循環進行精確等待這段時間。     接下來就是取出接收到的郵件,並更新郵箱的進出口偏移位置及郵箱中的郵件數減1,這樣操作過後,不要忘記郵箱內可能還保留著因之前郵箱空間不中而掛起的發送線程,這個時候由於讀取郵件操作,那麼肯定是至少有一個空出的位置,那麼是時候喚醒可能掛起的發送線程了(如果存在的話)。最後重新調度一下。 2.6 發送郵件 [cpp]   /**   * This function will send a mail to mailbox object. If the mailbox is full,   * current thread will be suspended until timeout.   *   * @param mb the mailbox object   * @param value the mail   * @param timeout the waiting time   *   * @return the error code   */   rt_err_t rt_mb_send_wait(rt_mailbox_t mb,                            rt_uint32_t  value,                            rt_int32_t   timeout)   {       struct rt_thread *thread;       register rt_ubase_t temp;       rt_uint32_t tick_delta;          /* parameter check */       RT_ASSERT(mb != RT_NULL);          /* initialize delta tick */       tick_delta = 0;       /* get current thread */       thread = rt_thread_self();//得到當前線程          RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mb->parent.parent)));          /* disable interrupt */       temp = rt_hw_interrupt_disable();//關中斷          /* for non-blocking call */       if (mb->entry == mb->size && timeout == 0)//如果郵箱滿且無等待時間參數為0       {           rt_hw_interrupt_enable(temp);              return -RT_EFULL;       }          /* mailbox is full */       while (mb->entry == mb->size)//如果郵箱滿       {           /* reset error number in thread */           thread->error = RT_EOK;              /* no waiting, return timeout */           if (timeout == 0)           {               /* enable interrupt */               rt_hw_interrupt_enable(temp);                  return -RT_EFULL;           }              RT_DEBUG_NOT_IN_INTERRUPT;//確保不是在ISR中使用本函數           /* suspend current thread */           rt_ipc_list_suspend(&(mb->suspend_sender_thread),//掛起當前發送線程                               thread,                               mb->parent.parent.flag);              /* has waiting time, start thread timer */           if (timeout > 0)//等待時間大於0           {               /* get the start tick of timer */               tick_delta = rt_tick_get();//得到當前的tick                  RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_send_wait: start timer of thread:%s\n",                                           thread->name));                  /* reset the timeout of thread timer and start it */               rt_timer_control(&(thread->thread_timer),//設置定時器並啟動它                                RT_TIMER_CTRL_SET_TIME,                                &timeout);               rt_timer_start(&(thread->thread_timer));           }              /* enable interrupt */           rt_hw_interrupt_enable(temp);//開中斷              /* re-schedule */           rt_schedule();//重新調度              /* resume from suspend state */           if (thread->error != RT_EOK)//如果此時不是被接收線程喚醒,即郵箱已以可用空間了           {               /* return error */               return thread->error;           }              /* disable interrupt */           temp = rt_hw_interrupt_disable();//關中斷              /* if it's not waiting forever and then re-calculate timeout tick */           if (timeout > 0)           {               tick_delta = rt_tick_get() - tick_delta;//計算已耗時間               timeout -= tick_delta;//計算剩余時間               if (timeout < 0)                   timeout = 0;           }       }          /* set ptr */       mb->msg_pool[mb->in_offset] = value;//開始存放郵件       /* increase input offset */       ++ mb->in_offset;       if (mb->in_offset >= mb->size)           mb->in_offset = 0;       /* increase message entry */       mb->entry ++;          /* resume suspended thread */       if (!rt_list_isempty(&mb->parent.suspend_thread))//喚醒接收線程       {           rt_ipc_list_resume(&(mb->parent.suspend_thread));              /* enable interrupt */           rt_hw_interrupt_enable(temp);              rt_schedule();              return RT_EOK;       }          /* enable interrupt */       rt_hw_interrupt_enable(temp);          return RT_EOK;   }     與接收函數類似,發送函數在郵箱滿的時候會掛起發送線程。其它的參考接收函數,基本上差不多。 2.7 郵箱控制 [cpp]   /**   * This function can get or set some extra attributions of a mailbox object.   *   * @param mb the mailbox object   * @param cmd the execution command   * @param arg the execution argument   *   * @return the error code   */   rt_err_t rt_mb_control(rt_mailbox_t mb, rt_uint8_t cmd, void *arg)   {       rt_ubase_t level;       RT_ASSERT(mb != RT_NULL);          if (cmd == RT_IPC_CMD_RESET)//重圍郵箱       {           /* disable interrupt */           level = rt_hw_interrupt_disable();//關中斷              /* resume all waiting thread */           rt_ipc_list_resume_all(&(mb->parent.suspend_thread));//喚醒所有掛起的接收線程           /* also resume all mailbox private suspended thread */           rt_ipc_list_resume_all(&(mb->suspend_sender_thread));//喚醒所有的發送線程              /* re-init mailbox */           mb->entry      = 0;           mb->in_offset  = 0;           mb->out_offset = 0;              /* enable interrupt */           rt_hw_interrupt_enable(level);              rt_schedule();              return RT_EOK;       }          return -RT_ERROR;   }     郵箱控制控制函數目前只支持重圍操作,此操作過程與初始化過程基本上類似.   4 小結 郵箱相關源碼主要是在發送與接收上。發送時,由於當前郵箱可能空間已滿,放不下要發送的郵件,此時,不得不掛起當前發送線程(如果存在時間參數的話),只要在接收函數讀取出一條郵件時才會喚醒它。同理,如果當前郵箱為空,則接收函數會掛起當前的接收線程,直到有新的郵件到達(在發送函數中喚醒接收線程)或等待超時。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved