程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> Muduo 網絡編程示例(三)定時器

Muduo 網絡編程示例(三)定時器

編輯:關於C++

程序中的時間

程序中對時間的處理是個大問題,我打算單獨寫一篇文章來全面地討論這個問 題。文章暫定名《〈程序中的日期與時間〉第二章 計時與定時》,跟《〈程序中的日期與時間〉第一 章 日期計算》放到一個系列,這個系列預計會有四篇文章。

在這篇博客裡裡我先簡要談談與編 程直接相關的內容,把更深入的內容留給上面提到的日期與時間專題文章。

在一般的服務端程序設計中,與時間有關的常見任務有:

1. 獲取當前時間,計算時間間隔 ;

2. 時區轉換與日期計算;把紐約當地時間轉換為上海當地時間;2011-02-05 之後第 100 天 是幾月幾號星期幾?等等

3. 定時操作,比如在預定的時間執行一項任務,或者在一段延時之後 執行一項任務。

其中第 2 項看起來復雜,其實最簡單。日期計算用 Julian Day Number,時區 轉換用 tz database;惟一麻煩一點的是夏令時,但也可以用 tz database 解決。這些操作都是純函 數,很容易用一套單元測試來驗證代碼的正確性。需要特別注意的是,用 tzset/localtime_r 來做時 區轉換在多線程環境下可能會有問題;對此我的解決辦法是寫一個 TimeZone class,以避免影響全局 ,將來在日期與時間專題中會講到。以下本文不考慮時區,均為 UTC 時間。

真正麻煩的是第 1 項和第 3 項。一方面,Linux 有一大把令人眼花缭亂的與時間相關的函數和結構體,在程序中該如何 選用?另一方面,計算機中的時鐘不是理想的計時器,它可能會漂移或跳變;最後,民用的 UTC 時間 與閏秒的關系也讓定時任務變得復雜和微妙。當然,與系統當前時間有關的操作也讓單元測試變得困難 。

Linux 時間函數

Linux 的計時函數,用於獲得當前時間:

* time(2) / time_t (秒 )

* ftime(3) / struct timeb (毫秒)

* gettimeofday(2) / struct timeval (微 秒)

* clock_gettime(2) / struct timespec (納秒)

* gmtime / localtime / timegm / mktime / strftime / struct tm (這些與當前時間無關)

定時函數,用於讓程序等 待一段時間或安排計劃任務:

* sleep

* alarm

* usleep

* nanosleep

* clock_nanosleep

* getitimer / setitimer

* timer_create / timer_settime / timer_gettime / timer_delete

* timerfd_create / timerfd_gettime / timerfd_settime

我的取捨如下:

* (計時)只使用 gettimeofday 來獲取當前時間。

* (定時)只使用 timerfd_* 系列函數來處理定時。

gettimeofday 入選原因:(這也 是 muduo::Timestamp class 的主要設計考慮)

1. time 的精度太低,ftime 已被廢棄, clock_gettime 精度最高,但是它系統調用的開銷比 gettimeofday 大。

2. 在 x86-64 平台上 ,gettimeofday 不是系統調用,而是在用戶態實現的(搜 vsyscall),沒有上下文切換和陷入內核的 開銷。

3. gettimeofday 的分辨率 (resolution) 是 1 微秒,足以滿足日常計時的需要。 muduo::Timestamp 用一個 int64_t 來表示從 Epoch 到現在的微秒數,其范圍可達上下 30 萬年。

timerfd_* 入選的原因:

1. sleep / alarm / usleep 在實現時有可能用了信號 SIGALRM,在多線程程序中處理信號是個相當麻煩的事情,應當盡量避免。(近期我會寫一篇博客仔細 講講“多線程、RAII、fork() 與信號”)

2. nanosleep 和 clock_nanosleep 是線 程安全的,但是在非阻塞網絡編程中,絕對不能用讓線程掛起的方式來等待一段時間,程序會失去響應 。正確的做法是注冊一個時間回調函數。

3. getitimer 和 timer_create 也是用信號來 deliver 超時,在多線程程序中也會有麻煩。timer_create 可以指定信號的接收方是進程還是線程, 算是一個進步,不過在信號處理函數(signal handler)能做的事情實在很受限。

4. timerfd_create 把時間變成了一個文件描述符,該“文件”在定時器超時的那一刻變得可 讀,這樣就能很方便地融入到 select/poll 框架中,用統一的方式來處理 IO 事件和超時事件,這也 正是 Reactor 模式的長處。我在一年前發表的《Linux 新增系統調用的啟示》中也談到這個想法,現 在我把這個想法在 muduo 網絡庫中實現了。

5. 傳統的 Reactor 利用 select/poll/epoll 的 timeout 來實現定時功能,但 poll 和 epoll 的定時精度只有毫秒,遠低於 timerfd_settime 的定時 精度。

必須要說明,在 Linux 這種非實時多任務操作系統中,在用戶態實現完全精確可控的計 時和定時是做不到的,因為當前任務可能會被隨時切換出去,這在 CPU 負載大的時候尤為明顯。但是 ,我們的程序可以盡量提高時間精度,必要的時候通過控制 CPU 負載來提高時間操作的可靠性,在程 序在 99.99% 的時候都是按預期執行的。這或許比換用實時操作系統並重新編寫並測試代碼要經濟一些 。

關於時間的精度(accuracy)問題我留到專題博客文章中討論,它與分辨率(resolution)不完全是一 回事兒。時間跳變和閏秒的影響與應對也不在此處展開討論了。

Muduo 的定時器接口

Muduo EventLoop 有三個定時器函數:

  1: typedef boost::function<void()> TimerCallback;
   2: 
   3: ///
   4: /// Reactor, at most one per thread.
   5: ///
   6: /// This is an interface class, so don't expose too much details.
   7: class EventLoop : boost::noncopyable
   8: {
   9:  public:
  10:   // ...
  11: 
  12:   // timers
  13: 
  14:   ///
  15:   TimerId runAt(const Timestamp& time, const TimerCallback& cb);
  16: 
  17:   ///
  18:   /// Runs callback after @c delay seconds.
  19:   /// Safe to call from other threads.
  20:   TimerId runAfter(double delay, const TimerCallback& cb);
  21: 
  22:   ///
  23:   /// Runs callback every @c interval seconds.
  24:   /// Safe to call from other threads.
  25:   TimerId runEvery(double interval, const TimerCallback& cb);
  26: 
  27:   /// Cancels the timer.
  28:   /// Safe to call from other threads.
  29:   // void cancel(TimerId timerId);
  30: 
  31:   // ...
  32: };

* runAt 在指定的時間調用 TimerCallback

* runAfter 等一段時間調用 TimerCallback

* runEvery 以固定的間隔反復調用 TimerCallback

* cancel 取消 timer,目前未實現

回調函數在 EventLoop 對象所在的線程發生,與 onMessage() onConnection() 等網絡事件函數在同一個線程。

Muduo 的 TimerQueue 采用了最簡單的實現( 鏈表)來管理定時器,它的效率比不上常見的 binary heap 的做法,如果程序中大量(10 個以上)使 用重復觸發的定時器,或許值得考慮改用更高級的實現。我目前還沒有在一個程序裡用過這麼多定時器 ,暫時也不打算優化 TimerQueue。

Boost.Asio Timer 示例

Boost.Asio 教程裡以 Timer 和 Daytime 為例介紹 asio 的基本使用,daytime 已經在前文“示例一”中介紹過 ,這裡著重談談 Timer。Asio 有 5 個 Timer 示例,muduo 把其中四個重新實現了一遍,並擴充了第 5 個示例。

1. 阻塞式的定時,muduo 不支持這種用法,無代碼。

2. 非阻塞定時,見 examples/asio/tutorial/timer2

3. 在 TimerCallback 裡傳遞參數,見 examples/asio/tutorial/timer3

4. 以成員函數為 TimerCallback,見 examples/asio/tutorial/timer4

5. 在多線程中回調,用 mutex 保護共享變量,見 examples/asio/tutorial/timer5

6. 在多線程中回調,縮小臨界區,把不需要互斥執行的代碼 移出來,見 examples/asio/tutorial/timer6

為節省篇幅,這裡只列出 timer4:

 

 1: #include <muduo/net/EventLoop.h>
   2: 
   3: #include <iostream>
   4: #include <boost/bind.hpp>
   5: #include <boost/noncopyable.hpp>
   6: 
   7: class Printer : boost::noncopyable
   8: {
   9:  public:
  10:   Printer(muduo::net::EventLoop* loop)
  11:     : loop_(loop),
  12:       count_(0)
  13:   {
  14:     loop_->runAfter(1, boost::bind(&Printer::print, this));
  15:   }
  16: 
  17:   ~Printer()
  18:   {
  19:     std::cout << "Final count is " << count_ << "n";
  20:   }
  21: 
  22:   void print()
  23:   {
  24:     if (count_ < 5)
  25:     {
  26:       std::cout << count_ << "n";
  27:       ++count_;
  28: 
  29:       loop_->runAfter(1, boost::bind(&Printer::print, this));
  30:     }
  31:     else
  32:     {
  33:       loop_->quit();
  34:     }
  35:   }
  36: 
  37: private:
  38:   muduo::net::EventLoop* loop_;
  39:   int count_;
  40: };
  41: 
  42: int main()
  43: {
  44:   muduo::net::EventLoop loop;
  45:   Printer printer(&loop);
  46:   loop.loop();
  47: }

查看本欄目

最後我再強調一遍,在非阻塞服務端編程中,絕對不能用  sleep 或類似的辦法來 讓程序原地停留等待,這會讓程序失去響應,因為主事件循環被掛起了,無法處理 IO 事件。這就像在 Windows 編程中絕對不能在消息循環裡執行耗時的代碼一樣,會讓程序界面失去響應。Reactor 模式的 網絡編程確實有些類似傳統的消息驅動的 Windows 編程。對於“定時”任務,就把它變成 一個特定的消息,到時候觸發相應的消息處理函數就行了。

Boost.Asio 的 timer 示例只用到 了 EventLoop::runAfter,我再舉一個 EventLoop::runEvery 的例子。

Java Netty 示例

Netty 是一個非常好的 Java NIO 網絡庫,它附帶的示例程序有 echo 和 discard 兩個簡單網絡協 議,與前文不同,Netty 版的服務端有流量統計功能,這需要用到 EventLoop::runEvery。

這 裡列出 discard server 的代碼,其 client 的代碼類似前文的 chargen,為節省篇幅,請閱讀源碼 http://code.google.com/p/muduo/source/browse/trunk/examples/netty/ 。

Discard server 注冊了一個間隔為 3 秒的定時器,調用 DiscardServer::printThroughput 打印 出吞吐量。注意這段代碼用了整數的原子操作 AtomicInt64 來記錄收到的字節數和消息數,乍看之下 似乎沒有必要,其實 DiscardServer 可以配置成多線程服務器,muduo TcpServer 有一個內置的多線 程模型,可以通過 setThreadNum() 來開啟。這個話題留到以後再細說。

  1: #include <muduo/net/TcpServer.h>
   2: 
   3: #include <muduo/base/Atomic.h>
   4: #include <muduo/base/Logging.h>
   5: #include <muduo/base/Thread.h>
   6: #include <muduo/net/EventLoop.h>
   7: #include <muduo/net/InetAddress.h>
   8: 
   9: #include <boost/bind.hpp>
  10: 
  11: #include <utility>
  12: 
  13: #include <stdio.h>
  14: #include <unistd.h>
  15: 
  16: using namespace muduo;
  17: using namespace muduo::net;
  18: 
  19: int numThreads = 0;
  20: 
  21: class DiscardServer
  22: {
  23:  public:
  24:   DiscardServer(EventLoop* loop, const InetAddress& listenAddr)
  25:     : loop_(loop),
  26:       server_(loop, listenAddr, "DiscardServer"),
  27:       oldCounter_(0),
  28:       startTime_(Timestamp::now())
  29:   {
  30:     server_.setConnectionCallback(
  31:         boost::bind(&DiscardServer::onConnection, this, _1));
  32:     server_.setMessageCallback(
  33:         boost::bind(&DiscardServer::onMessage, this, _1, _2, _3));
  34:     server_.setThreadNum(numThreads);
  35:     loop->runEvery(3.0, boost::bind(&DiscardServer::printThroughput, this));
  36:   }
  37: 
  38:   void start()

  39:   {
  40:     LOG_INFO << "starting " << numThreads << " threads.";
  41:     server_.start();
  42:   }
  43: 
  44:  private:
  45:   void onConnection(const TcpConnectionPtr& conn)
  46:   {
  47:     LOG_TRACE << conn->peerAddress().toHostPort() << " -> "
  48:         << conn->localAddress().toHostPort() << " is "
  49:         << (conn->connected() ? "UP" : "DOWN");
  50:   }
  51: 
  52:   void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
  53:   {
  54:     size_t len = buf->readableBytes();
  55:     transferred_.add(len);
  56:     receivedMessages_.incrementAndGet();
  57:     buf->retrieveAll();
  58:   }
  59: 
  60:   void printThroughput()
  61:   {
  62:     Timestamp endTime = Timestamp::now();
  63:     int64_t newCounter = transferred_.get();
  64:     int64_t bytes = newCounter - oldCounter_;
  65:     int64_t msgs = receivedMessages_.getAndSet(0);
  66:     double time = timeDifference(endTime, startTime_);
  67:     printf("%4.3f MiB/s %4.3f Ki Msgs/s %6.2f bytes per msgn",
  68:         static_cast<double>(bytes)/time/1024/1024,
  69:         static_cast<double>(msgs)/time/1024,
  70:         static_cast<double>(bytes)/static_cast<double>(msgs));
  71: 
  72:     oldCounter_ = newCounter;
  73:     startTime_ = endTime;
  74:   }
  75: 
  76:   EventLoop* loop_;
  77:   TcpServer server_;
  78: 
  79:   AtomicInt64 transferred_;
  80:   AtomicInt64 receivedMessages_;
  81:   int64_t oldCounter_;
  82:   Timestamp startTime_;
  83: };
  84: 
  85: int main(int argc, char* argv[])
  86: {
  87:   LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
  88:   if (argc > 1)
  89:   {
  90:     numThreads = atoi(argv[1]);
  91:   }
  92:   EventLoop loop;
  93:   InetAddress listenAddr(2009);
  94:   DiscardServer server(&loop, listenAddr);
  95: 
  96:   server.start();
  97: 
  98:   loop.loop();
  99: }
運行方法,在同一台機器的兩個命令行窗口分別運行:
$ bin/netty_discard_server
$ bin/netty_discard_client 127.0.0.1 256

第一個窗口顯示吞吐量:

41.001 

MiB/s 73.387 Ki Msgs/s 572.10 bytes per msg
72.441 MiB/s 129.593 Ki Msgs/s 572.40 bytes per msg
77.724 MiB/s 137.251 Ki Msgs/s 579.88 bytes per msg

改變第二個命令的最後一個參數( 上面的 256),可以觀察不同的消息大小對吞吐量的影響。

練習 1:把二者的關系繪制成函數 曲線,看看有什麼規律,想想為什麼。

練習 2:在局域網的兩台機器上運行客戶端和服務端, 找出讓吞吐量達到最大的消息長度。這個數字與練習 1 中的相比是大還是小?為什麼?

有興趣 的讀者可以對比一下 Netty 的吞吐量,muduo 應該能輕松取勝。

discard client/server 測試 的是單向吞吐量,echo client/server 測試的是雙向吞吐量。這兩個服務端都支持多個並發連接,兩 個客戶端都是單連接的。本系列第 6 篇文章將會實現一個 pingpong 協議,用來測試 muduo 在多線程 大量連接情況下的表現。

(待續)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved