程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 自己動手寫RPC框架到dubbo的服務動態注冊,服務路由,負載均衡功能實現,rpcdubbo

自己動手寫RPC框架到dubbo的服務動態注冊,服務路由,負載均衡功能實現,rpcdubbo

編輯:JAVA綜合教程

自己動手寫RPC框架到dubbo的服務動態注冊,服務路由,負載均衡功能實現,rpcdubbo


  序:RPC就是發送socket告訴服務端我要調你的哪一個類的哪一個方法然後獲得處理的結果。服務注冊和路由就是借助第三方存儲介質存儲服務信息讓服務消費者調用。

  RPC即遠程過程調用,它的實現方式有很多,比如webservice等。框架調多了,煩了,沒激情了,我們就該問自己,這些框架的作用到底是什麼,來找回當初的激情。
  一般來說,我們寫的系統就是一個單機系統,一個web服務器一個數據庫服務,但是當這單台服務器的處理能力受硬件成本的限制,是不能無限的提升處理性能的。這個時候我們使用RPC將原來的本地調用轉變為調用遠端的服務器上的方法,給系統的處理能力和吞吐量帶來了提升。
  RPC的實現包括客戶端和服務端,即服務的調用方和服務的提供方。服務調用方發送rpc請求到服務提供方,服務提供方根據調用方提供的參數執行請求方法,將執行的結果返回給調用方,一次rpc調用完成。

  先讓我們利用socket簡單的實現RPC,來看看他是什麼鬼樣子。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6056763.html

可接網站開發,java開發。

新浪微博:intsmaze劉洋洋哥

微信:intsmaze

服務端代碼如下 

服務端的提供服務的方法

package cn.intsmaze.tcp.two.service;
public class SayHelloServiceImpl  {
    public String sayHello(String helloArg) {
        if(helloArg.equals("intsmaze"))
        {
            return "intsmaze";
        }
        else
        {
            return "bye bye";
        }
    }
}

  服務端啟動接收外部方法請求的端口類,它接收到來自客戶端的請求數據後,利用反射知識,創建指定類的對象,並調用對應方法,然後把執行的結果返回給客戶端即可。

package cn.intsmaze.tcp.two.service;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
public class Provider {

    public static void main(String[] args) throws Exception {

        ServerSocket server=new ServerSocket(1234);
        while(true)
        {
            Socket socket=server.accept();
            ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
            
            String classname=input.readUTF();//獲得服務端要調用的類名
            String methodName=input.readUTF();//獲得服務端要調用的方法名稱        
            Class<?>[] parameterTypes=(Class<?>[]) input.readObject();//獲得服務端要調用方法的參數類型
            Object[] arguments=(Object[]) input.readObject();//獲得服務端要調用方法的每一個參數的值        
            
            Class serviceclass=Class.forName(classname);//創建類
            Object object = serviceclass.newInstance();//創建對象
            Method method=serviceclass.getMethod(methodName, parameterTypes);//獲得該類的對應的方法
            
            Object result=method.invoke(object, arguments);//該對象調用指定方法
            
            ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
            output.writeObject(result);
            socket.close();
        }
    }
}  

服務調用者代碼

  調用服務的方法,主要就是客戶端啟動一個socket,然後向提供服務的服務端發送數據,其中的數據就是告訴服務端去調用哪一個類的哪一個方法,已經調用該方法的參數是多少,然後結束服務端返回的數據即可。

調用服務

package cn.intsmaze.tcp.two.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
public class consumer {
    
    @SuppressWarnings({ "unused", "rawtypes" })
    public static void main(String[] arg) throws Exception
    {
        //我們要想調用遠程提供的服務,必須告訴遠程我們要調用你的哪一個類,這裡我們可以在本地創建一個interface來獲取類的名稱,但是這樣我們必須
        //保證該interface和遠程的interface的所在包名一致。這種方式不好。所以我們還是通過硬編碼的方式吧。
     //雖然webservice就是這樣的,我個人覺得不是多好。      // String interfacename=SayHelloService.class.getName(); String classname="cn.intsmaze.tcp.two.service.SayHelloServiceImpl"; String method="sayHello"; Class[] argumentsType={String.class}; Object[] arguments={"intsmaze"}; Socket socket=new Socket("127.0.0.1",1234); ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(classname); output.writeUTF(method); output.writeObject(argumentsType); output.writeObject(arguments); ObjectInputStream input=new ObjectInputStream(socket.getInputStream()); Object result=input.readObject(); System.out.println(result); socket.close(); } }

   當然實際中出於性能考慮,往往采用非阻塞式I/O,避免無限的等待,帶來系統性能的消耗。

  上面的只是一個簡單的過程,當系統之間的調用變的復雜之後,該方式有如下不足:服務調用者代碼以硬編碼的方式指明所調用服務的信息(類名,方法名),當服務提供方改動所提供的服務的代碼後,服務調用者必須修改代碼進行調整,不然會導致服務調用者無法成功進行遠程方法調用導致系統異常,並且當服務提供者宕機下線了,服務調用者並不知道服務端是否存活,仍然會進行訪問,導致異常。

  一個系統中,服務提供者往往不是一個,而是多個,那麼服務消費者如何從眾多的服務者找到對應的服務進行RPC就是一個問題了,因為這個時候我們不能在在服務調用者代碼中硬編碼指出調用哪一個服務的地址等信息,因為我們可以想象,沒有一個統一的地方管理所有服務,那麼我們在錯綜復雜的系統之間無法理清有哪些服務,已經服務的調用關系,這簡直就是災難。

    這個時候就要進行服務的注冊,通過一個第三方的存儲介質,當服務的提供者上線時,通過代碼將所提供的服務的相關信息寫入到存儲介質中,寫入的主要信息以key-value方式:服務的名稱:(類名,方法名,參數類型,參數,IP地址,端口)。服務的調用者向遠程調用服務時,會先到第三方存儲介質中根據所要調用的服務名得到(類名,方法名,參數類型,參數,IP地址,端口)等參數,然後再向服務端發出調用請求。通過這種方式,代碼就變得靈活多變,不會再因為一個局部的變得引發全局架構的變動。因為一般的改動是不會變得服務的名稱的。這種方式其實就是soa架構,服務消費者通過服務名稱,從眾多服務中找到要調用的服務的相關信息,稱為服務的路由。

  下面通過一個靜態MAP對象來模擬第三方存儲的介質。

package cn.intsmaze.tcp.three;
import net.sf.json.JSONObject;
public class ClassWays {
    
    String classname;//類名
    
    String method;//方法
    
    Class[] argumentsType;//參數類型
     
    String ip;//服務的ip地址
    
    int port;//服務的端口
    
    get,set......
 }

  第三方存儲介質,這裡固定了服務提供者的相關信息,理想的模擬是,當服務啟動後,自動向該類的map集合添加信息。但是因為服務端和客戶端啟動時,是兩個不同的jvm進程,客戶端時無法訪問到服務端寫到靜態map集合的數據的。

package cn.intsmaze.tcp.three;
import java.util.HashMap;
import java.util.Map;
import net.sf.json.JSONObject;
public class ServiceRoute {
    
    public static Map<String,String> NAME=new HashMap<String, String>();
    
    public ServiceRoute()
    {
        ClassWays classWays=new ClassWays();
        Class[] argumentsType={String.class};
        classWays.setArgumentsType(argumentsType);
        classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");
        classWays.setMethod("sayHello");
        classWays.setIp("127.0.0.1");
        classWays.setPort(1234);
        JSONObject js=JSONObject.fromObject(classWays);
        NAME.put("SayHello", js.toString());
    } 
}

  接下來看服務端代碼的美麗面孔吧。

package cn.intsmaze.tcp.three.service;public class Provider {

    //服務啟動的時候,組裝相關信息,然後寫入第三方存儲機制,供服務的調用者去獲取
    public void reallyUse() {
        
        ClassWays classWays = new ClassWays();
        Class[] argumentsType = { String.class };
        classWays.setArgumentsType(argumentsType);
        classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");
        classWays.setMethod("sayHello");
        classWays.setIp("127.0.0.1");
        classWays.setPort(1234);
        
        JSONObject js=JSONObject.fromObject(classWays);
        
        //模擬第三方存儲介質,實際中應該是redis,mysql,zookeeper等。
        ServiceRoute.NAME.put("SayHello", js.toString());
    }

    public static void main(String[] args) throws Exception {

        ServerSocket server = new ServerSocket(1234);
        //實際中,這個地方應該調用如下方法,但是因為簡單的模擬服務的注冊,將注冊的信息硬編碼在ServiceRoute類中,這個類的構造方法裡面會自動注冊服務的相關信息。
        //server.reallyUse();
        while (true) {
            Socket socket = server.accept();
            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());

            String classname = input.readUTF();
            String methodName = input.readUTF();
            Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
            Object[] arguments = (Object[]) input.readObject();

            Class serviceclass = Class.forName(classname);

            Object object = serviceclass.newInstance();

            Method method = serviceclass.getMethod(methodName, parameterTypes);

            Object result = method.invoke(object, arguments);

            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
            output.writeObject(result);
            socket.close();
        }
    }
}

  服務的調用者代碼:



package cn.intsmaze.tcp.three.client;public class Consumer {
    
    public Object reallyUse(String provideName,Object[] arguments) throws Exception
    {
        //模擬從第三方存儲介質拿去數據
        ServiceRoute serviceRoute=new ServiceRoute();
        String js=serviceRoute.NAME.get(provideName);
        JSONObject obj = new JSONObject().fromObject(js);
        ClassWays classWays = (ClassWays)JSONObject.toBean(obj,ClassWays.class);
        
        String classname=classWays.getClassname();
        String method=classWays.getMethod();
        Class[] argumentsType=classWays.getArgumentsType();
        Socket socket=new Socket(classWays.getIp(),classWays.getPort());
        
        ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
        
        output.writeUTF(classname);
        output.writeUTF(method);
        output.writeObject(argumentsType);
        output.writeObject(arguments);
        
        ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
        Object result=input.readObject();
        socket.close();
        return result;
    }
    @SuppressWarnings({ "unused", "rawtypes" })
    public static void main(String[] arg) throws Exception
    {
        Consumer consumer=new Consumer();
        Object[] arguments={"intsmaze"};
        Object result=consumer.reallyUse("SayHello",arguments);
        System.out.println(result);
    }
}

  回到開始的問題現在我們保證了服務調用者對服務的調用的相關參數以動態的方式進行控制,通過封裝,服務調用者只需要指定每一次調用時的參數的值即可。但是當服務提供者宕機下線了,服務調用者並不知道服務端是否存活,仍然會進行訪問,導致異常。這個時候我們該如何考慮解決了?

  剩下的我就不寫代碼示例了,代碼只是思想的表現形式,就像開發語言一直變化,但是思想是不變的。

  服務下線我們應該把該服務從第三方存儲刪除,在服務提供方寫代碼進行刪除控制,也就是服務下線前訪問第三方刪除自己提供的服務。這樣當然行不通的,因為服務宕機時,才不會說,我要宕機了,服務提供者你快去第三方存儲介質刪掉該服務信息。所以這個時候我們就要在第三方存儲介質上做手腳,比如服務提供方並不是直接把服務信息寫入第三方存儲介質,而是與一個第三方系統進行交互,第三方系統把接收到來自服務提供者的服務信息寫入第三方存儲介質中,然後在服務提供者和第三方系統間建立一個心跳檢測,當第三方系統檢測到服務提供者宕機後,就會自動到第三方介質中刪除對應服務信息。

  這個時候我們就可以選擇zookeeper作為第三方存儲介質,服務啟動會到zookeeper上面創建一個臨時目錄,該目錄存儲該服務的相關信息,當服務端宕機了,zookeeper會自動刪除該文件夾,這個時候就實現了服務的動態上下線了。

  這個地方其實就是dubbo的一大特色功能:服務配置中心——動態注冊和獲取服務信息,來統一管理服務名稱和其對於的服務器的信息。服務提供者在啟動時,將其提供的服務名稱,服務器地址注冊到服務配置中心,服務消費者通過配置中心來獲得需要調用服務的機器。當服務器宕機或下線,相應的機器需要動態地從服務配置中心移除,並通知相應的服務消費者。這個過程中,服務消費者只在第一次調用服務時需要查詢服務配置中心,然後將查詢到的信息緩存到本地,後面的調用直接使用本地緩存的服務地址信息,而不需要重新發起請求到服務配置中心去獲取相應的服務地址,直到服務的地址列表有變更(機器上線或者下線)。

  zookeeper如何知道的?zookeeper其實就是會和客戶端直接有一個心跳檢測來判斷的,zookeeper功能很簡單的,可以自己去看對應的書籍即可。

  隨著業務的發展,服務調用者的規模發展到一定的階段,對服務提供方也帶來了巨大的壓力,這個時候服務提供方就不在是一台機器了,而是一個服務集群了。

  服務調用者面對服務提供者集群如何高效選擇服務提供者集群中某一台機器?

  一說到集群,我們都會想到反向代理nginx,所以我們就會采用nginx的配置文件中存儲集群中的所有IP和端口信息。然後把第三方存儲介質中存儲的服務信息——key-value:服務的名稱:(類名,方法名,參數類型,參數,IP地址,端口)IP地址改為集群的代理地址,然後服務消費者根據服務名稱獲得服務信息後組裝請求把數據發送到nginx,再由nginx負責轉發請求到對應的服務提供者集群中的一台。

  這確實是可以滿足的,但是如果吹毛求疵就會發現他所暴露的問題!

  一:使用nginx進行負載均衡,一旦nginx宕機,那麼依賴他的服務均將失效,這個時候服務的提供者並沒有宕機。

  二:這是一個內部系統的調用,服務調用者集群數量遠遠小於外部系統的請求數量,那麼我們將所有的服務消費者到服務提供者的請求都經過nginx,帶來不必要的效率開銷。

  改進方案:將服務提供者集群的所有信息都存儲到第三方系統(如zookeeper)中對應服務名稱下,表現形式為——服務名:[{機器IP:(類名,方法名,參數類型,參數,IP地址,端口)}...]。這樣服務消費者向第三方存儲系統(如zookeeper)獲得服務的所有信息(服務集群的地址列表),然後服務調用者就從這個列表中根據負載均衡算法選擇一個進行訪問。

  這個時候我們可能會思考,負載均衡算法我們是參考nginx把IP地址的分配選擇在第三方系統(如zookeeper)上進行實現還是在服務調用者端進行實現?負載均衡算法部署在第三方系統(如zookeeper),服務消費者把服務名稱發給第三方系統,第三方系統根據服務名然後根據負載均衡算法從該服務的地址信息列表中選擇一個返回給服務消費者,服務消費者獲得所調用服務的具體信息後,直接向服務的提供者發送請求。但是正如我所說,這只是一個內部系統,請求的數量往往沒有多大的變化,而且實現起來要在服務消費者直接調用zookeeper系統前面編寫一個中間件作為一個中間,不免過於麻煩。我們完全可以在服務的消費者處嵌入負載均衡算法,服務消費者獲取服務的地址信息列表後,運算負載均衡算法從所得的地址信息列表中選擇一個地址信息發送請求的數據。更進一步,服務消費者第一次執行負載均衡算法後就把選擇的地址信息存儲到本地緩存,以後再次訪問就直接從本地拿去,不再到第三方系統中獲取了。

  基於第三方系統實現服務的負載均衡的方案已經實現,那麼我們來解決下一個問題,服務的上線和下線如何告知服務的消費者,避免服務消費者訪問異常?

  前面我們說了,服務提供者利用zookeeper系統的特性,可以實現服務的注冊和刪除,那麼同樣,我們也可以讓服務的消費者監聽zookeeper上對應的服務目錄,當服務目錄變動後,服務消費者則重新到zookeeper上獲取新的服務地址信息,然後運算負載均衡算法選擇一個新的服務進行請求。

  如果有沒有講明白的可以留言,我進行更正。基本上一個RPC就是這樣,剩下的一些基於RPC的框架無非就是實現了多些協議,以及一些多種語言環境的考慮和效率的提升。

   覺得不錯點個推薦吧,看在我花了一天時間把自己的知識整理分析,謝謝喽。當然這還是沒有寫好,等我下周有時間再添加圖片進行完善,關於這個架構的設計歡迎大家討論,共同成長。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved