1、AMQP_EX_TYPE_DIRECT:直連型
直連型又包括: 1對1 和1對N(N對1、 N對N)
接收端receive.php代碼如下
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
發送端send.php代碼如下
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$exchange->publish('direct type test','logs');
var_dump("Send Message OK");
$connect->disconnect();
運行結果如圖所示

創建receive_one.php和receive_two.php 並把send.php代碼改成如下代碼方便我們觀看
receive_one.php 和 receive_two.php 代碼相同 或者用dos運行多個接收端
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
send.php
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
for ($index = 1; $index < 5; $index++) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}
$exchange->delete();
$connect->disconnect();
運行結果如下
列隊會把消息分配給每一個接收端分配處理這裡看似完美但是如果想要更好的處理不同的任務就需要
公平調度
比如當1、3處理的都是簡單的人 2、4都是處理的復雜的任務 如果任務過多時 receive_one.php是空閒的而receive_two.php是任務繁重的
我們進行如下測試
send.php改成5改成50
for ($index = 1; $index < 50; $index++) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}
receive_two.php 加上 sleep(3)
function callback($envelope, $queue) {
var_dump($envelope->getBody());
sleep(3);
$queue->nack($envelope->getDeliveryTag());
}
我們運行程序結果如下
receive_one全部運行完而receive_two才運行一個 之後receive_one一直空閒
我們可以通過 在接收端設置
$channel->setPrefetchCount(1);
任務沒人完成前不接收新的消息把消息發送給其他接收端
如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);
改成如下
$channel = new AMQPChannel($connect);
$channel->setPrefetchCount(1);