程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> ZeroMQ指南-第1章-基礎-分而治之

ZeroMQ指南-第1章-基礎-分而治之

編輯:C++入門知識

分而治之 作為最終示例(你肯定對生動的代碼開始生厭並希望回頭去鑽研關於比較性、抽象性准則的語言學探討),讓我們來做一個小型超級計算。然後喝個咖啡。我們的超級計算程序是個非常典型的並行處理模型。我們有: 一個通風機(ventilator)來產生可以並行處理的任務 一組工人(worker)來處理任務 一個水槽(sink)來回收工人處理的結果 事實上,工人運行於超快的機子,沒准是GPU(圖形處理單元)來做困難運算。這是通風機代碼,生成100個任務,每個任務都是一條消息告訴工人休眠(sleep)幾毫秒。 taskvent: Parallel task ventilator in C [cpp]   //   // Task ventilator   // Binds PUSH socket to tcp://localhost:5557   // Sends batch of tasks to workers via that socket   //   #include "zhelpers.h"      int main (void)   {       void *context = zmq_ctx_new ();          // Socket to send messages on       void *sender = zmq_socket (context, ZMQ_PUSH);       zmq_bind (sender, "tcp://*:5557");          // Socket to send start of batch message on       void *sink = zmq_socket (context, ZMQ_PUSH);       zmq_connect (sink, "tcp://localhost:5558");          printf ("Press Enter when the workers are ready: ");       getchar ();       printf ("Sending tasks to workers…\n");          // The first message is "0" and signals start of batch       s_send (sink, "0");          // Initialize random number generator       srandom ((unsigned) time (NULL));          // Send 100 tasks       int task_nbr;       int total_msec = 0; // Total expected cost in msecs       for (task_nbr = 0; task_nbr < 100; task_nbr++) {           int workload;           // Random workload from 1 to 100msecs           workload = randof (100) + 1;           total_msec += workload;           char string [10];           sprintf (string, "%d", workload);           s_send (sender, string);       }   www.2cto.com     printf ("Total expected cost: %d msec\n", total_msec);       sleep (1); // Give 0MQ time to deliver          zmq_close (sink);       zmq_close (sender);       zmq_ctx_destroy (context);       return 0;   }   圖 5 - 並行管道   這是工人程序。接收消息,休眠指定的時間,然後表明自己完成任務: taskwork: Parallel task worker in C [cpp]  //   // Task worker   // Connects PULL socket to tcp://localhost:5557   // Collects workloads from ventilator via that socket   // Connects PUSH socket to tcp://localhost:5558   // Sends results to sink via that socket   //   #include "zhelpers.h"      int main (void)   {       void *context = zmq_ctx_new ();          // Socket to receive messages on       void *receiver = zmq_socket (context, ZMQ_PULL);       zmq_connect (receiver, "tcp://localhost:5557");          // Socket to send messages to       void *sender = zmq_socket (context, ZMQ_PUSH);       zmq_connect (sender, "tcp://localhost:5558");          // Process tasks forever       while (1) {           char *string = s_recv (receiver);           // Simple progress indicator for the viewer           fflush (stdout);           printf ("%s.", string);              // Do the work           s_sleep (atoi (string));           free (string);              // Send results to sink           s_send (sender, "");       }       zmq_close (receiver);       zmq_close (sender);       zmq_ctx_destroy (context);       return 0;   }   這是水槽程序。它收集這100個任務,然後計算整個處理消耗的時間,讓我們能夠證實如果有多個工人時他們真的是並行運轉的: tasksink: Parallel task sink in C [cpp]   //   // Task sink   // Binds PULL socket to tcp://localhost:5558   // Collects results from workers via that socket   //   #include "zhelpers.h"      int main (void)   {       // Prepare our context and socket       void *context = zmq_ctx_new ();       void *receiver = zmq_socket (context, ZMQ_PULL);       zmq_bind (receiver, "tcp://*:5558");          // Wait for start of batch       char *string = s_recv (receiver);       free (string);          // Start our clock now       int64_t start_time = s_clock ();          // Process 100 confirmations       int task_nbr;       for (task_nbr = 0; task_nbr < 100; task_nbr++) {           char *string = s_recv (receiver);           free (string);           if ((task_nbr / 10) * 10 == task_nbr)               printf (":");           else               printf (".");           fflush (stdout);       }       // Calculate and report duration of batch       printf ("Total elapsed time: %d msec\n",               (int) (s_clock () - start_time));          zmq_close (receiver);       zmq_ctx_destroy (context);       return 0;   }   批處理的平均消耗為5秒。當我們啟動1個、2個、4個工人時,我們從水槽取得的結果是這樣的: [plain]   #   1 worker   Total elapsed time: 5034 msec   #   2 workers   Total elapsed time: 2421 msec   #   4 workers   Total elapsed time: 1018 msec   讓我們更細致的查看這段代碼的某些方面: 工人們上游連接通風機,下游連接水槽。這意味著你可以任意添加工人。如果工人綁定到他們的端點,你會需要(a)更多的端點(b)每添加一個工人都得修改通風機或水槽。我們說通風機和水槽是結構中的“穩定”部分,而工人們是“動態”部分。 我們不得不在批次的開始與所有工人們都起來運行兩者間做出同步。這是一個ØMQ中特別常見的陷阱,也沒有簡單方案。“連接”方法需要一定時間。所以當一組工人連接到通風機,第一個成功連接的工人會在瞬間得到消息的全部負載,而其他人仍在連接。如果你總是不去同步批次的開始,系統完全不會並行運轉。試著移除等待看看。 通風機的推送(PUSH)套接字均勻的分發任務到工人們(假定批次開始送出之前他們都已連接)。這叫做負載均衡,我們會再詳細看看。 水槽的拉取(PULL)套接字均勻的收集工人的成果。這叫做公平隊列。 圖6 - 公平隊列 管道模式也表現出“遲鈍加入者”綜合症,導致了對推送套接字不能正確負載均衡的控訴。如果你使用推送和拉取,而且其中一個工人比其他人得到更多的消息,那是因為他的推送套接字比別人連接的更快,然後在其他人連接達成之前捕獲了一大堆消息。如果你想要正確的負載均衡,你可能想要看看第3章 - 高級請求應答模式中的小節:負載均衡模式。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved