程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> php amqp 消息隊列 RabbitMQ 交換器類型 直連 (三)

php amqp 消息隊列 RabbitMQ 交換器類型 直連 (三)

編輯:關於PHP編程

1、AMQP_EX_TYPE_DIRECT:直連型

直連型又包括: 1對1 和1對N(N對1、 N對N)
\
接收端receive.php代碼如下
connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('logs');
$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {
    $queue->consume('callback');
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}

發送端send.php代碼如下
connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$exchange->publish('direct type test','logs');
var_dump("Send Message OK");

$connect->disconnect();

運行結果如圖所示
\



\ 創建receive_one.php和receive_two.php 並把send.php代碼改成如下代碼方便我們觀看 receive_one.php 和 receive_two.php 代碼相同 或者用dos運行多個接收端
connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {
    $queue->consume('callback');
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}




send.php
connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

for ($index = 1; $index < 5; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}

$exchange->delete();
$connect->disconnect();

運行結果如下 \

列隊會把消息分配給每一個接收端分配處理這裡看似完美但是如果想要更好的處理不同的任務就需要 公平調度
比如當1、3處理的都是簡單的人 2、4都是處理的復雜的任務 如果任務過多時 receive_one.php是空閒的而receive_two.php是任務繁重的 我們進行如下測試 send.php改成5改成50
for ($index = 1; $index < 50; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}

receive_two.php 加上 sleep(3)
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    sleep(3);
    $queue->nack($envelope->getDeliveryTag());
}

我們運行程序結果如下
\

receive_one全部運行完而receive_two才運行一個 之後receive_one一直空閒 我們可以通過 在接收端設置 $channel->setPrefetchCount(1);
任務沒人完成前不接收新的消息把消息發送給其他接收端
如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);
改成如下
$channel = new AMQPChannel($connect);
$channel->setPrefetchCount(1);




  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved