前言:前面我們講解的都是本地服務器,現在如果需要遠程計算機上運行一個函數,等待結果。這就是一個不同的故事了,這種模式通常被稱為遠程過程調用或者RPC。
本章教程我們使用RabbitMQ搭建一個RPC系統,一個客戶端和一個可擴展的RPC服務器,現在我們開始吧。
一般做rpc在RabbitMQ是比較容易的,一個客戶端發送一個請求信息和一個響應信息的服務器回復,為了得到一個響應,我們需要發送一個回調隊列地址請求。如下

Message屬性:
AMQP協議一共預定義了14個屬性,但是大多數屬性很少使用,下面幾個可能用的比較多
deliveryMode:有2個值,一個是持久,另一個表示短暫(第二篇說過)
contentType:內容類型:用來描述編碼的MIME類型。例如,經常使用JSON編碼是將此屬性設置為一個很好的做法:application/json。
replyTo:經常使用的是回調隊列的名字
correlationid:RPC響應請求的相關應用
在隊列上接收到一個響應,但它並不清楚響應屬於哪一個,當我們使用CorrelationId屬性的時候,我們就可以將它設置為每個請求的唯一值,稍後當我們在回調隊列中接收消息的時候,我們會看到這個屬性,如果我們看到一個未知的CorrelationId,我們就可以安全地忽略信息-它不屬於我們的請求。為什麼我們應該忽略未知的消息在回調隊列中,而不是失敗的錯誤?這是由於服務器端的一個競爭條件的可能性。比如還未發送了一個確認信息給請求,但是此時RPC服務器掛了。如果這種情況發生,將再次重啟RPC服務器處理請求。這就是為什麼在客戶端必須處理重復的反應。

我們的rpc工作方式如下:
1:當客戶端啟動時,它創建一個匿名的獨占回調隊列。
2:對於rpc請求,客戶端發送2個屬性,一個是replyTo設置回調隊列,另一是correlationId為每個隊列設置唯一值
3:請求被發送到一個rpc_queue隊列中
4:rpc服務器是等待隊列的請求,當收到一個請求的時候,他就把消息返回的結果返回給客戶端,使請求結束。
5:客戶端等待回調隊列上的數據,當消息出現的時候,他檢查correlationId,如果它和從請求返回的值匹配,就進行響應。
RPCServer.Java
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 1);
}
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println("RPCServer Awating RPC request");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new AMQP.BasicProperties.Builder().
correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("RPCServer fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
服務器代碼比較簡單
1:建立連接,通道,隊列
2:我們可能運行多個服務器進程,為了分散負載服務器壓力,我們設置channel.basicQos(1);
3:我們用basicconsume訪問隊列。然後進入循環,在其中我們等待請求消息並處理消息然後發送響應。
RPCClient.java
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws IOException, InterruptedException {
String response;
String corrID = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(corrID).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrID)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] args) throws Exception {
RPCClient rpcClient = null;
String response;
try {
rpcClient = new RPCClient();
System.out.println("RPCClient Requesting fib(20)");
response = rpcClient.call("20");
System.out.println("RPCClient Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rpcClient != null) {
rpcClient.close();
}
}
}
}
客戶端代碼解讀
1:建立一個連接和通道,並聲明了一個唯一的“回調”隊列的答復
2:我們訂閱回調隊列,這樣就可以得到RPC的響應
3:定義一個call方法用於發送當前的回調請求
4:生成一個唯一的correlationid,然後通過while循環來捕獲合適的回應
5:我們請求信息,發送2個屬性,replyTo 和correlationId
6:然後就是等待直到有合適的回應到達
7:while循環是做一個非常簡單的工作,對於每一個響應消息,它檢查是否有correlationid然後進行匹配。然後是就進行響應。
8:最後把響應返回到客戶端。