程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> ZeroMQ指南-第1章-基礎-放出消息

ZeroMQ指南-第1章-基礎-放出消息

編輯:C++入門知識

放出消息 第二個經典模式是單向數據分發,服務器推送更新到一組客戶端。讓我們看一個推送天氣情況變化的例子,包含地區編碼、溫度、和相對濕度。我們會生成隨機值來模擬真實氣象站。 這是服務器代碼,這個程序我們使用5556端口。 wuserver: Weather update server in C [cpp]   //   // Weather update server   // Binds PUB socket to tcp://*:5556   // Publishes random weather updates   //   #include "zhelpers.h"      int main (void)   {       // Prepare our context and publisher       void *context = zmq_ctx_new ();       void *publisher = zmq_socket (context, ZMQ_PUB);       int rc = zmq_bind (publisher, "tcp://*:5556");       assert (rc == 0);       rc = zmq_bind (publisher, "ipc://weather.ipc");       assert (rc == 0);          // Initialize random number generator       srandom ((unsigned) time (NULL));       while (1) {           // Get values that will fool the boss           int zipcode, temperature, relhumidity;           zipcode = randof (100000);           temperature = randof (215) - 80;           relhumidity = randof (50) + 10;              // Send message to all subscribers           char update [20];           sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);           s_send (publisher, update);       }       zmq_close (publisher);       zmq_ctx_destroy (context);       return 0;   }   更新流既無開始也無結束,像一個永不結束的天氣預報。 圖 4 – 發布-訂閱 這是客戶端程序,監聽更新流並捕獲符合特定地區編碼的所有消息,默認為紐約市因為那是個冒險的好地方: wuclient: Weather update client in C [cpp]   //   // Weather update client   // Connects SUB socket to tcp://localhost:5556   // Collects weather updates and finds avg temp in zipcode   //   #include "zhelpers.h"      int main (int argc, char *argv [])   {       void *context = zmq_ctx_new ();          // Socket to talk to server       printf ("Collecting updates from weather server…\n");       void *subscriber = zmq_socket (context, ZMQ_SUB);       int rc = zmq_connect (subscriber, "tcp://localhost:5556");       assert (rc == 0);          // Subscribe to zipcode, default is NYC, 10001       char *filter = (argc > 1) ? argv [1] : "10001 ";       rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));       assert (rc == 0);          // Process 100 updates       int update_nbr;       long total_temp = 0;       for (update_nbr = 0; update_nbr < 100; update_nbr++) {           char *string = s_recv (subscriber);              int zipcode, temperature, relhumidity;           sscanf (string, "%d %d %d",                   &zipcode, &temperature, &relhumidity);           total_temp += temperature;           free (string);       }       printf ("Average temperature for zipcode '%s' was %dF\n",               filter, (int) (total_temp / update_nbr));          zmq_close (subscriber);       zmq_ctx_destroy (context);       return 0;   }   注意當你使用一個訂閱套接字時你必須使用zmq_setsockopt()和SUBSCRIBE設置一個訂閱,就像這段代碼中那樣。如果你不設置任何訂閱,就得不到任何消息。這是初學者的常見錯誤。訂閱者可以設置很多訂閱,會合並到一起。就是說,如果一個更新匹配任意一個訂閱,訂閱者都會接收。訂閱者也可以取消特定的訂閱。一個訂閱通常是一個可打印字符串,但也不是必須的。參考zmq_setsockopt()看這是怎麼工作的。 發布訂閱套接字對是異步的。客戶端在循環中(或單次如果有必要)做zmq_msg_recv()。嘗試發送消息到訂閱套接字將導致錯誤。同樣的,服務按所需頻率做zmq_msg_send(),但絕不能對發布套接字做zmq_msg_recv()。 理論上,ØMQ套接字不在乎哪一端來連接哪一端來綁定。但是在實踐中會有未公開的差異,待會我會提及。現在,綁定發布並連接訂閱,除非你的網絡設計導致這無法實現。 還有一個關於發布訂閱套接字的重要事項:你無法精確知道訂閱者什麼時候開始獲取消息。即使你先啟動一個訂閱者,過一會再啟動發布者,訂閱者總是會錯過發布者發送的第一條消息。這是因為訂閱者連接到發布者時(占用了短暫但非零的時間),發布者可能已經將消息發送出去了。 這種“遲鈍加入者”症狀擊中了很多人、很多次,我們需要詳細解釋一下。記住ØMQ是異步I/O的,也就是在後台。比如說你有兩個節點按這個順序這麼做: 訂閱者連接到一個端點並接收和計數消息 發布者綁定到一個端點並立刻發送1000條消息 那麼訂閱者很可能不會接收到任何東西。你可以眨眨眼,檢查一下是否設置了正確的過濾器,再試試,然而訂閱者還是不會接收任何東西。 建立TCP連接牽涉到大概幾毫秒的來回握手,這取決於網絡狀況和節點之間跳躍的次數。這個時間裡,ØMQ已經可以發送好多消息了。為了證明,假設花費5毫秒來建立連接,而相同的鏈路可以搞定1M每秒的消息。在訂閱者連接到發布者的5毫秒裡,發布者僅耗費1毫秒就將這1K的消息發送出去了。 在第2章 - 套接字與模式中我們會解釋如何讓發布者和訂閱者同步,以便無需等待訂閱者(們)都已連接並就緒時才開始發布數據。有一個簡單的笨辦法來延遲發布者,通過睡眠sleep。但是在真實程序中可不要這麼做,因為這極為脆弱、不雅而緩慢。先用sleep向自己證明到底發生了什麼,然後等到第2章再看正確做法。 同步的另一種替代方案是簡單的假設發布的數據流是無限的,沒有開始也沒有結束。還假設訂閱者不在乎它啟動之前發生過什麼。這就是我們建造的例子天氣客戶端的方式。 客戶端訂閱到選定的地區編碼並收集1000個更新。也就是大概1000萬服務器更新,如果地區編碼是隨機分發的。你可以先啟動客戶端,再啟動服務器,而客戶端會持續工作。你可以隨時停止並重啟服務器,而客戶端會持續工作。當客戶端收集到了1000個更新,它計算出平均值,打印輸出,然後退出。 關於發布訂閱模式的一些要點: 訂閱者可以每次調用“connect”來連接到一個以上的發布者。數據會交錯到達(“公平隊列”)以避免發布者相互淹沒。 如果一個發布者沒有已連接的訂閱者,那麼它直接丟棄所有消息。 如果你使用TCP時一個訂閱者很慢,消息會在發布者那裡排上隊。我們待會看看這種情況下如何用“高水位線”來保護發布者。 從ØMQ 3.x開始,使用已連接協議(tcp: 或 ipc:)將在發布者方面進行過濾,使用epgm://協議時,會在訂閱者一方進行過濾。在ØMQ2.x中所有過濾都在訂閱者方面進行。 如下是在我的筆記本電腦中接收過濾10M消息花費的時間,電腦配置是2011-era Intel i5,體面但也沒啥特殊的。 [plain]  $ time wuclient   Collecting updates from weather server...   Average temperature for zipcode '10001 ' was 28F      real    0m4.470s   user    0m0.000s   sys     0m0.008s  

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved