程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Apache thrift RPC 雙向通信,apachethrift

Apache thrift RPC 雙向通信,apachethrift

編輯:JAVA綜合教程

Apache thrift RPC 雙向通信,apachethrift


      在上一篇介紹Apache thrift 安裝和使用,寫了一個簡單的demo,講解thrift服務的發布和客戶端調用,但只是單向的客戶端發送消息,服務端接收消息。而客戶端卻得不到服務器的響應。

在不涉及語言平台的制約,WebService可勝任做這些服務端的處理。

     基於大部分業務需求,更需要服務端能夠響應處理數據。下面我通過一個demo案例,介紹下Apache thrift 雙向通信的使用。

一.首先我們還是需要安裝好Apache thrift。這裡不再贅述,戳這裡查看我上篇文章的介紹:http://www.cnblogs.com/sumingk/articles/6073105.html

二.其次准備好thrift 所需的jar包:

    

三.新建一個Java web項目,編寫thrift腳本,命名為student.thrift  如下:

namespace java com.zhj.student

typedef i32 int  
typedef i16 short
typedef i64 long

//Student Entity
struct Student { 
   1: string name
} 


service Zthrift { 
   oneway void send(1:Student msg)
}

四.執行student.thrift 文件,thrift  --gen java  student.thrift (該文件我還是放在c盤根目錄下執行),隨後生產gen-java文件,如下:

五.將新生成的兩文件拷入項目中,其中Student.java 是實體類,Zthrift.java是生成的類。

六.編寫thrift服務端類。

package com.zhj.server;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.zhj.student.Student;
import com.zhj.student.Zthrift;
import com.zhj.student.Zthrift.Iface;

public class ZServer {

        public static void main(String[] args){
            try {
                TServerSocket tServerSocket=new TServerSocket(9999);
                TThreadPoolServer.Args targs=new TThreadPoolServer.Args(tServerSocket);
                TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory();
                //獲取processFactory
                TProcessorFactory tProcessorFactory= getProcessorFactory();
                targs.protocolFactory(factory);
                targs.processorFactory(tProcessorFactory);
                TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(targs);
                System.out.println("start server...");
                tThreadPoolServer.serve();
                
            } catch (TTransportException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
        
        /**
         * 內部類獲取 getProcessorFactory
         * @return
         */
        public static int tt= 0;
        public static TProcessorFactory getProcessorFactory(){
            
            TProcessorFactory tProcessorFactory=new TProcessorFactory(null){
                public TProcessor getProcessor(final TTransport tTransport){
                    Thread thread = new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                            try {
                                
                                System.out.println("服務端休眠5秒後,執行響應......");
                                //延時五秒回復(延遲執行給客戶端發送消息)
                                Thread.sleep(5000);
                                tt +=100;
                                System.out.println("延時五秒回復時,tt = " +tt);
                                 //這裡可以把client提取作為成員變量來多次使用
                                Zthrift.Client client = new Zthrift.Client(new TBinaryProtocol(tTransport));
                                //給客戶端響應消息
                                client.send(new Student("....test"));
                                
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            } catch (TException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    });
                    thread.start();
                    
                    return new Zthrift.Processor<Iface>(new Iface() {
                        
                        @Override
                        public void send(Student msg) throws TException {
                            // TODO Auto-generated method stub
                            tt+=10;
                            System.out.println("接收客戶端消息時,tt = " +tt);
                            //接受客戶端消息
                             System.out.println("....."+msg.toString());
                        }
                    });
                    
                }
            };
            return tProcessorFactory;
        }
}

此處,內部類使用比較頻繁,閱讀會有些困難。Zthrift,Processor構造方法需要傳入一個Iface 接口,該接口有一個接收客戶端的方法send(), msg 是一個Student對象。

 七.實現的客戶端調用。如下:

package com.zhj.client;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import com.zhj.student.Student;
import com.zhj.student.Zthrift.Iface;
import com.zhj.student.Zthrift;

public class ZClient {

    public static void main(String[]args){
        final TSocket tSocket=new TSocket("127.0.0.1",9999);
        Zthrift.Client client=new Zthrift.Client(new TBinaryProtocol(tSocket));
        try {
            tSocket.open();
            runMethod(tSocket);
            //向服務端發送消息
            client.send(new Student("小明1"));
           
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public static void runMethod(final TSocket tSocket){
        Thread thread = new Thread(new Runnable() {
            
            @Override
            public void run() {
                Zthrift.Processor<Iface> mp = new Zthrift.Processor<Zthrift.Iface>(new Iface() {
                    
                    @Override
                    public void send(Student msg) throws TException {
                        // TODO Auto-generated method stub
                        Long start = System.currentTimeMillis();
                        try {
                            while(true){
                                //具體接收時間待定
                                if((System.currentTimeMillis()-start)>0.1*60*1000){
                                    System.out.println("響應消息超時...");
                                    break;
                                }
                                else {
                                    System.out.println("收到服務端響應消息: "+msg);
                                }
                                //休眠兩秒
                                Thread.sleep(2000L);
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    
                });
                
                try {
                    while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket))){
                        //阻塞式方法,不需要內容
                        System.out.println("走阻塞式方法");
                        //關閉tScoket
                        // tSocket.close();
                    }
                } catch (TException e) {
                    System.out.println("連接已斷開...");
                    e.printStackTrace();
                }
            }
        });
        thread.start();
    }
}

在這裡,我加入了一個超時響應的死循環,用於接收服務端返回的消息,控制台可以查看服務端給的響應消息。

八.運行服務端和客戶端main方法,控制台打印如下:

  

代碼閱讀有些困難,有困難或不合理之處,請小伙伴指出。Thank you!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved