程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> rabbitMQ第四篇:遠程調用,rabbitmq第四篇調用

rabbitMQ第四篇:遠程調用,rabbitmq第四篇調用

編輯:JAVA綜合教程

rabbitMQ第四篇:遠程調用,rabbitmq第四篇調用


 前言:前面我們講解的都是本地服務器,現在如果需要遠程計算機上運行一個函數,等待結果。這就是一個不同的故事了,這種模式通常被稱為遠程過程調用或者RPC。

本章教程我們使用RabbitMQ搭建一個RPC系統,一個客戶端和一個可擴展的RPC服務器,現在我們開始吧。

Callback queue

一般做rpc在RabbitMQ是比較容易的,一個客戶端發送一個請求信息和一個響應信息的服務器回復,為了得到一個響應,我們需要發送一個回調隊列地址請求。如下

Message屬性:

AMQP協議一共預定義了14個屬性,但是大多數屬性很少使用,下面幾個可能用的比較多

deliveryMode:有2個值,一個是持久,另一個表示短暫(第二篇說過)

contentType:內容類型:用來描述編碼的MIME類型。例如,經常使用JSON編碼是將此屬性設置為一個很好的做法:application/json。

replyTo:經常使用的是回調隊列的名字

correlationid:RPC響應請求的相關應用

Correlation Id

在隊列上接收到一個響應,但它並不清楚響應屬於哪一個,當我們使用CorrelationId屬性的時候,我們就可以將它設置為每個請求的唯一值,稍後當我們在回調隊列中接收消息的時候,我們會看到這個屬性,如果我們看到一個未知的CorrelationId,我們就可以安全地忽略信息-它不屬於我們的請求。為什麼我們應該忽略未知的消息在回調隊列中,而不是失敗的錯誤?這是由於服務器端的一個競爭條件的可能性。比如還未發送了一個確認信息給請求,但是此時RPC服務器掛了。如果這種情況發生,將再次重啟RPC服務器處理請求。這就是為什麼在客戶端必須處理重復的反應。

需求

 我們的rpc工作方式如下:

1:當客戶端啟動時,它創建一個匿名的獨占回調隊列。

2:對於rpc請求,客戶端發送2個屬性,一個是replyTo設置回調隊列,另一是correlationId為每個隊列設置唯一值

3:請求被發送到一個rpc_queue隊列中

4:rpc服務器是等待隊列的請求,當收到一個請求的時候,他就把消息返回的結果返回給客戶端,使請求結束。

5:客戶端等待回調隊列上的數據,當消息出現的時候,他檢查correlationId,如果它和從請求返回的值匹配,就進行響應。

編碼

RPCServer.Java

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) {
            return 0;
        }
        if (n == 1) {
            return 1;
        }
        return fib(n - 1) + fib(n - 1);
    }

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

        System.out.println("RPCServer Awating RPC request");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new AMQP.BasicProperties.Builder().
                    correlationId(props.getCorrelationId()).build();

            String message = new String(delivery.getBody(), "UTF-8");
            int n = Integer.parseInt(message);

            System.out.println("RPCServer fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

服務器代碼比較簡單

1:建立連接,通道,隊列

2:我們可能運行多個服務器進程,為了分散負載服務器壓力,我們設置channel.basicQos(1);

3:我們用basicconsume訪問隊列。然後進入循環,在其中我們等待請求消息並處理消息然後發送響應。

RPCClient.java

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws IOException, InterruptedException {
        String response;
        String corrID = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(corrID).replyTo(replyQueueName).build();
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrID)) {
                response = new String(delivery.getBody(), "UTF-8");
                break;
            }
        }
        return response;
    }

    public void close() throws Exception {
        connection.close();
    }

    public static void main(String[] args) throws Exception {
        RPCClient rpcClient = null;
        String response;
        try {
            rpcClient = new RPCClient();
            System.out.println("RPCClient  Requesting fib(20)");
            response = rpcClient.call("20");
            System.out.println("RPCClient  Got '" + response + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (rpcClient != null) {
                rpcClient.close();
            }
        }
    }
}

客戶端代碼解讀

1:建立一個連接和通道,並聲明了一個唯一的“回調”隊列的答復

2:我們訂閱回調隊列,這樣就可以得到RPC的響應

3:定義一個call方法用於發送當前的回調請求

4:生成一個唯一的correlationid,然後通過while循環來捕獲合適的回應

5:我們請求信息,發送2個屬性,replyTo 和correlationId

6:然後就是等待直到有合適的回應到達

7:while循環是做一個非常簡單的工作,對於每一個響應消息,它檢查是否有correlationid然後進行匹配。然後是就進行響應。

8:最後把響應返回到客戶端。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved