程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> 譯-PHP rabbitMQ Tutorial-2

譯-PHP rabbitMQ Tutorial-2

編輯:關於PHP編程

Work Queues (工作/任務隊列)

(using php-amqplib)

\

In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

上次呢,我們寫了個程序從指定的隊列中發送和接收消息。這回呢,我們要創建一個任務隊列,用它來把耗時的任務分發到多個“worker”手裡。

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

工作隊列(也叫任務隊列)的主要是思想是避免立即處理那種資源密集且需要處理很久才能完成的任務。相反呢,我們安排這種任務稍後完成。我們把任務封裝成消息的形式發送到一個隊列。一個後台運行的“worker”進程會取出這些任務,最終完成這個任務。當你運行多個“worker”時,任務便會共享於他們之間。

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

在一次較短的HTTP請求中是不可能處理一個復雜的任務的,這個觀念在web應用中尤其有用。

Preparation(准備!)

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the sleep() function. We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

上回說到,咱們發了個“Hello World"的消息。 這回發送一些字符串來代表復雜的任務。我們沒有像調整圖片大小或者pdf渲染等現實生活中的任務,那麼我們用sleep()函數來偽裝任務很耗時,進而模擬現實中的場景

We will slightly modify the send.php code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it new_task.php:

記得"send.php"吧,我們稍微修改下下,以便於它可以通過命令發送任意消息。 這個程序會向我們的工作隊列中安排任務,

所以就叫它new_task.php吧。

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => 2) # make message persistent
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

Our old receive.php script also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let's call it worker.php:

之前的receive.php也要做點調整:消息中的每個點號需要被當成一秒來模擬工作耗時。它會從隊列中取出並執行任務,所以就叫它worker.php吧。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Note that our fake task simulates execution time.

注意我們的假定任務需要模擬執行時間。

Run them as in tutorial one: 執行方式跟上回一樣(如下)

shell1$ php new_task.php "A very hard task which takes two seconds.."
shell2$ php worker.php

Round-robin dispatching(循環調度/分發)

One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.

輕松地工作並行能力是使用任務隊列的優勢之一。要是我們有一堆任務,那就增加更多的worker就好了,很容易衡量。

First, let's try to run two worker.php scripts at the same time. They will both get messages from the queue, but how exactly? Let's see.

首先,我們同事運行倆worker.php。它們都會從隊列中獲得消息,可究竟為喵呢? 接著看。

You need three consoles open. Two will run the worker.php script. These consoles will be our two consumers - C1 and C2.

打開三個控制台。其中倆運行worker.php。這倆控制台就是我們的兩個消費者,就叫C1和C2吧。

shell1$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
shell2$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C

In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:

第三個呢,我們用來發布任務。要是你已經運行了消費者(們),就發布一些消息吧。

shell3$ php new_task.php First message.
shell3$ php new_task.php Second message..
shell3$ php new_task.php Third message...
shell3$ php new_task.php Fourth message....
shell3$ php new_task.php Fifth message.....

Let's see what is delivered to our workers:

瞅瞅都啥發給我們的worker了。

shell1$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

默認嘞,RabbitMQ 會按順序發送每一條消息到下一個消費者,通常每個消費者會收到相同數量的消息。這種分發消息的方式就叫“round-robin”。試試三個或更多的woker.

Message acknowledgment(消息確認)

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

處理一個任務會要好幾秒。你可能會想要是其中一個消費者執行一個耗時的任務且死在半路上咋整。按我們現在的代碼來說,

一旦RabbitMQ派送一條信息到一個消費者,就會從內存中立即將它刪除。這種場景下,如果你K掉一個worker的進程,那麼它

正在處理的那條消息就會丟失。同樣所有派送到這個worker的消息,在未處理前都會丟失。

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

我們肯定不想丟掉任何任務呀。如果一個worker掛了,我們很希望這個任務被分配到其他的worker頭上。

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

為了保證消息決不丟失,RabbitMQ支持消息確認。啥意思呢?就是說消費者發送一條消息給RabbitMQ告訴它說,某條特定的

消息已經收到且處理了,這樣RabbitMQ就可以隨便刪除它啦。

If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

如果消費者沒法送(消息)確認就掛了,RabbitMQ就會認為消息沒有被完全處理,然後重新派送它到另一個消費者。那樣

的話你就能保證不會丟失消息啦,即使worker間歇性死掉都木事。

There aren't any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It's fine even if processing a message takes a very, very long time.

當且僅當worker連接掛掉的時候,RabbitMQ才會重新派送消息。所以不存在消息處理超時問題。即使要花好久好久好久好久

....處理一條消息都木事。

Message acknowledgments are turned off by default. It's time to turn them on by setting the fourth parameter to basic_consume to false (true means no ack) and send a proper acknowledgment from the worker, once we're done with a task.

默認情況下,消息確認是關閉的。是時候開啟他們了,一旦一項任務完成,通過設置basic_consume 第四個參數設置為false(true意思是關閉消息確認)來發送適當的確認信息。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

用這段代碼我們就能保證神馬都不會丟失,即使你在worker處理消息的時候用CTRL+C把它K掉。要是worker被干掉,不久

所有未確認的消息會被重新派送。

Forgotten acknowledgment(老年失憶)

It's a common mistake to miss the basic_ack. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

丟失basic_ack很常見。是一個很容犯的錯誤,可後果很嚴重!!!

當你的客戶端退出後,消息會被重新派送(看起來就像隨機重派一樣),RabbitMQ會吃掉你越來越多的內存,

因為它會不釋放任何未確認的消息哇!!!

In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:

要調試這種錯誤,你可以用rabbitmqctl來打印messages_unacknowledged 字段。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability(消息持久性)

We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.

我們已經學了怎麼保證消費者掛掉的時候任務不丟失。但是如果RabbitMQ服務掛了,我們的任務仍然會丟失。

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

當RabbitMQ退出或者掛掉,它會忘掉其中的隊列和消息,除非你告訴它表醬紫!!!

要保證消息不丟失,需要做兩件事:我們需要把隊列和消息都標記為持久的。

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable. To do so we pass the third parameter to queue_declare as true:
首先,我們需要保證RabbitMQ永遠不會丟失我們的隊列。為了這麼干,我們得聲明它為持久的。也就是把queue_declare

的第三個參數設置為true.

$channel->queue_declare('hello', false, true, false, false);

Although this command is correct by itself, it won't work in our present setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for example task_queue:

盡管這麼用是沒錯的,但當前的設置不會起作用。因為我們已經定義了一個叫hello的非持久的隊列。RabbitMQ不允許用不同的參數重新定義一個已存在的隊列,並返回錯誤給任何試圖那麼做的程序。不過這有一個快捷的解決方案-我們可以聲明一個不同名字的隊列,例如task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

This flag set to true needs to be applied to both the producer and consumer code.

生產者和消費者的代碼都需要將隊列聲明這裡設置為true(第三個參數)。

At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting the delivery_mode = 2 message property which AMQPMessage takes as part of the property array.

到這裡,我們就能保證及時RabbitMQ重啟task_queue這個隊列也不會丟失。現在呢,我們也得把消息標記為持久的,設置一個數組的屬性delivery_mode=2,作為AMQPMessage 的參數(第二個)就可以啦。

$msg = new AMQPMessage($data,
       array('delivery_mode' => 2) # make message persistent
       );

Note on message persistence(需要注意滴)

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in a transaction.

把消息設置為持久的並不能完全保證它不會丟失。盡快它也告訴RabbitMQ把消息保存到硬盤上了,但還是有

很小的一種可能性存在——就是RabbitMQ收到消息但還沒來得及保存到硬盤,還有,RabbitMQ不是為每條消息都做fsync操作——很可能只是保存到緩存中而不是真的寫到硬盤。這種持久性的保證並不夠堅實,但是對我們

簡單的任務隊列來講是綽綽有余了。如果你需要更堅實的持久性保證,可以把發送代碼包裝在一個事務當中。

Fair dispatch(公平分配)

You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

你可能發現了這個分發工作並沒按照我麼預想的運行。比如一種情況有兩個worker,當所有奇數的消息都非常耗時,就算是不耗時,一個worker會一直繁忙,而另一個幾乎不干活。好喵,RabbitMQ對於這種情況毫無所知,還會繼續均勻的分發消息。

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

發生這種情況的原因是當信息進入到隊列RabbitMQ只是進行分發。它不會去看某個消費者未確認的消息。它只會盲目地分發第N個消息到第N個消費者。

\

In order to defeat that we can use the basic_qos method with the prefetch_count = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

為了解決這種情況,我們可以用basic_qus方法,設置prefetch_count=1. 這會告訴RabbitMQ一次只給一個worker一個消息。

或者,換句話說,在一個worker處理完和確認上一個消息前,不要給他派送新消息。相反,RabbitMQ會把消息派送給下一個

閒置的worker.

$channel->basic_qos(null, 1, null);

Note about queue size(注意啦!!隊列大小)

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

如果所有worker都處於繁忙狀態,你的隊列是會被填滿的。要留心一下,要麼增加更多的worker,要麼有一些其他的策略。

Putting it all together(合體!!!again 哈哈)

Final code of our new_task.php file:(new_task.php的終極神碼)

channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => 2) # make message persistent
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>

(new_task.php source)

And our worker.php: (worker哦)

channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

(worker.php source)

Using message acknowledgments and prefetch you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.

你可以使用消息確認和預抓取來創建一個工作隊列。持久性設置使得即使RabbitMQ重啟了任務還會存活。

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.

下一扒,我們會學習咋給多個消費者發送相同的消息,就像廣播,嗯嗯!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved