程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> dubbo/dubbox 增加原生thrift及avro支持,

dubbo/dubbox 增加原生thrift及avro支持,

編輯:JAVA綜合教程

dubbo/dubbox 增加原生thrift及avro支持,


(facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近幾年來比較搶眼的高效序列化/rpc框架,dubbo框架雖然有thrift的支持,但是依賴的版本較早,只支持0.8.0,而且還對協議做一些擴展,並非原生的thrift協議。

github上雖然也有朋友對dubbo做了擴展支持原生thrift,但是代碼實在太多了,只需要一個類即可:

Thrift2Protocal.java:

package com.alibaba.dubbo.rpc.protocol.thrift2;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.lang.reflect.Constructor;

/**
 * 為dubbo-rpc添加"原生thrift"支持
 * by 楊俊明(http://yjmyzz.cnblogs.com/)
 */
public class Thrift2Protocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 33208;
    private static final Logger logger = LoggerFactory.getLogger(Thrift2Protocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        TProcessor tprocessor;
        TNonblockingServer.Args tArgs = null;
        String iFace = "$Iface";
        String processor = "$Processor";
        String typeName = type.getName();
        TNonblockingServerSocket transport;
        if (typeName.endsWith(iFace)) {
            String processorClsName = typeName.substring(0, typeName.indexOf(iFace)) + processor;
            try {
                Class<?> clazz = Class.forName(processorClsName);
                Constructor constructor = clazz.getConstructor(type);
                try {
                    tprocessor = (TProcessor) constructor.newInstance(impl);
                    transport = new TNonblockingServerSocket(url.getPort());
                    tArgs = new TNonblockingServer.Args(transport);
                    tArgs.processor(tprocessor);
                    tArgs.transportFactory(new TFramedTransport.Factory());
                    tArgs.protocolFactory(new TCompactProtocol.Factory());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
            }
        }

        if (tArgs == null) {
            logger.error("Fail to create thrift server(" + url + ") due to null args");
            throw new RpcException("Fail to create thrift server(" + url + ") due to null args");
        }
        final TServer thriftServer = new TNonblockingServer(tArgs);

        new Thread(new Runnable() {
            public void run() {
                logger.info("Start Thrift Server");
                thriftServer.serve();
                logger.info("Thrift server started.");
            }
        }).start();

        return new Runnable() {
            public void run() {
                try {
                    logger.info("Close Thrift Server");
                    thriftServer.stop();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {

        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            TSocket tSocket;
            TTransport transport;
            TProtocol protocol;
            T thriftClient = null;
            String iFace = "$Iface";
            String client = "$Client";

            String typeName = type.getName();
            if (typeName.endsWith(iFace)) {
                String clientClsName = typeName.substring(0, typeName.indexOf(iFace)) + client;
                Class<?> clazz = Class.forName(clientClsName);
                Constructor constructor = clazz.getConstructor(TProtocol.class);
                try {
                    tSocket = new TSocket(url.getHost(), url.getPort());
                    transport = new TFramedTransport(tSocket);
                    protocol = new TCompactProtocol(transport);
                    thriftClient = (T) constructor.newInstance(protocol);
                    transport.open();
                    logger.info("thrift client opened for service(" + url + ")");
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    throw new RpcException("Fail to create remoting client:" + e.getMessage(), e);
                }
            }
            return thriftClient;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
    }

}

重寫父類AbstractProxyProtocol的二個抽象方法doExport及doRefer即可,doExport用於對外暴露RPC服務,在這個方法裡啟動thrift server,dubbo service provider在啟動時會調用該方法。而doRefer用於dubbo service consumer發現服務後,獲取對應的rpc-client。 

參考這個思路,avro也很容易集成進來:

AvroProtocol.java

package com.alibaba.dubbo.rpc.protocol.avro;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.reflect.ReflectRequestor;
import org.apache.avro.ipc.reflect.ReflectResponder;

import java.net.InetSocketAddress;

/**
 * 為dubbo-rpc添加avro支持
 * by 楊俊明(http://yjmyzz.cnblogs.com/)
 */
public class AvroProtocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 40881;
    private static final Logger logger = LoggerFactory.getLogger(AvroProtocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        final Server server = new NettyServer(new ReflectResponder(type, impl),
                new InetSocketAddress(url.getHost(), url.getPort()));
        server.start();

        return new Runnable() {
            public void run() {
                try {
                    logger.info("Close Avro Server");
                    server.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {

        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(url.getHost(), url.getPort()));
            T ref = ReflectRequestor.getClient(type, client);
            logger.info("Create Avro Client");
            return ref;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
    }

}

不要忘記在META-INF/dubbo/internal下添加名為com.alibaba.dubbo.rpc.Protocal的文件,內容為:

avro=com.alibaba.dubbo.rpc.protocol.avro.AvroProtocol

接下來談談如何打包到dubbo的jar裡:  

dubbo-rpc/pom.xml裡,把二個新增的項目加進來:

    <modules>
        ...
        <module>dubbo-rpc-avro</module>
        ...
        <module>dubbo-rpc-thrift2</module>
        ...
            
    </modules>

然後dubbo/pom.xml裡:

   <artifactSet>
       <includes>
    ...
           <include>com.alibaba:dubbo-rpc-api</include>
           <include>com.alibaba:dubbo-rpc-avro</include>
          ...
           <include>com.alibaba:dubbo-rpc-thrift2</include>
           ...
       </includes>
   </artifactSet>    

dependencies節也要增加:

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo-rpc-thrift2</artifactId>
            <version>${project.parent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.thrift</groupId>
                    <artifactId>libthrift</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo-rpc-avro</artifactId>
            <version>${project.parent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-ipc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

這樣打包出來的dubbo-xxx.jar裡,就包括新增的Protocol。至於google的protobuf,目前處於3.x -beta階段,等以後出正式版了,再看情況整合起來。

以上代碼已經提交到github:https://github.com/yjmyzz/dubbox (版本號:2.8.4a)

最後,對dubbo/thrift/avro/rest這4種協議,做了下簡單的對比測試,測試用例很簡單:

  public String ping() {
        return "pong";
    }

客戶端調用ping方法,服務器返回字符串"pong",在mac book pro上做5萬次調用,結果如下:

dubbo RPC testing => 
 50000次RPC調用(dubbo協議),共耗時14778毫秒,平均3383.407715/秒
avro RPC testing => 
 50000次RPC調用(avro協議),共耗時10707毫秒,平均4669.842285/秒
thrift RPC testing => 
 50000次RPC調用(thrift協議),共耗時4667毫秒,平均10713.520508/秒
REST testing => 
 50000次REST調用,共耗時112699毫秒,平均443.659668/秒

這跟預期一致,REST走http協議,自然最慢,avro與dubbo底層的網絡通訊都是借助netty實現,在同一個數量級,但是avro的二進制序列化效率更高,所以略快,而thrift則是從裡到外,全都是facebook自己實現的,性能最優,完勝其它協議。

個人建議:對於一個服務接口,對外同時提供thrift、REST二種形式的服務實現,內部子系統之間用thrift方式調用(因為thrift跨語言,其實從外部進來的調用,也可以用thrift-rpc方式),一些不方便直接用thrift-client調用的場景,仍然走傳統的REST.

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved