在上一篇介紹Apache thrift 安裝和使用,寫了一個簡單的demo,講解thrift服務的發布和客戶端調用,但只是單向的客戶端發送消息,服務端接收消息。而客戶端卻得不到服務器的響應。
在不涉及語言平台的制約,WebService可勝任做這些服務端的處理。
基於大部分業務需求,更需要服務端能夠響應處理數據。下面我通過一個demo案例,介紹下Apache thrift 雙向通信的使用。
一.首先我們還是需要安裝好Apache thrift。這裡不再贅述,戳這裡查看我上篇文章的介紹:http://www.cnblogs.com/sumingk/articles/6073105.html
二.其次准備好thrift 所需的jar包:

三.新建一個Java web項目,編寫thrift腳本,命名為student.thrift 如下:
namespace java com.zhj.student
typedef i32 int
typedef i16 short
typedef i64 long
//Student Entity
struct Student {
1: string name
}
service Zthrift {
oneway void send(1:Student msg)
}
四.執行student.thrift 文件,thrift --gen java student.thrift (該文件我還是放在c盤根目錄下執行),隨後生產gen-java文件,如下:

五.將新生成的兩文件拷入項目中,其中Student.java 是實體類,Zthrift.java是生成的類。
六.編寫thrift服務端類。
package com.zhj.server;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.zhj.student.Student;
import com.zhj.student.Zthrift;
import com.zhj.student.Zthrift.Iface;
public class ZServer {
public static void main(String[] args){
try {
TServerSocket tServerSocket=new TServerSocket(9999);
TThreadPoolServer.Args targs=new TThreadPoolServer.Args(tServerSocket);
TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory();
//獲取processFactory
TProcessorFactory tProcessorFactory= getProcessorFactory();
targs.protocolFactory(factory);
targs.processorFactory(tProcessorFactory);
TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(targs);
System.out.println("start server...");
tThreadPoolServer.serve();
} catch (TTransportException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 內部類獲取 getProcessorFactory
* @return
*/
public static int tt= 0;
public static TProcessorFactory getProcessorFactory(){
TProcessorFactory tProcessorFactory=new TProcessorFactory(null){
public TProcessor getProcessor(final TTransport tTransport){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("服務端休眠5秒後,執行響應......");
//延時五秒回復(延遲執行給客戶端發送消息)
Thread.sleep(5000);
tt +=100;
System.out.println("延時五秒回復時,tt = " +tt);
//這裡可以把client提取作為成員變量來多次使用
Zthrift.Client client = new Zthrift.Client(new TBinaryProtocol(tTransport));
//給客戶端響應消息
client.send(new Student("....test"));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
thread.start();
return new Zthrift.Processor<Iface>(new Iface() {
@Override
public void send(Student msg) throws TException {
// TODO Auto-generated method stub
tt+=10;
System.out.println("接收客戶端消息時,tt = " +tt);
//接受客戶端消息
System.out.println("....."+msg.toString());
}
});
}
};
return tProcessorFactory;
}
}
此處,內部類使用比較頻繁,閱讀會有些困難。Zthrift,Processor構造方法需要傳入一個Iface 接口,該接口有一個接收客戶端的方法send(), msg 是一個Student對象。
七.實現的客戶端調用。如下:
package com.zhj.client;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import com.zhj.student.Student;
import com.zhj.student.Zthrift.Iface;
import com.zhj.student.Zthrift;
public class ZClient {
public static void main(String[]args){
final TSocket tSocket=new TSocket("127.0.0.1",9999);
Zthrift.Client client=new Zthrift.Client(new TBinaryProtocol(tSocket));
try {
tSocket.open();
runMethod(tSocket);
//向服務端發送消息
client.send(new Student("小明1"));
} catch (TTransportException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void runMethod(final TSocket tSocket){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Zthrift.Processor<Iface> mp = new Zthrift.Processor<Zthrift.Iface>(new Iface() {
@Override
public void send(Student msg) throws TException {
// TODO Auto-generated method stub
Long start = System.currentTimeMillis();
try {
while(true){
//具體接收時間待定
if((System.currentTimeMillis()-start)>0.1*60*1000){
System.out.println("響應消息超時...");
break;
}
else {
System.out.println("收到服務端響應消息: "+msg);
}
//休眠兩秒
Thread.sleep(2000L);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
try {
while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket))){
//阻塞式方法,不需要內容
System.out.println("走阻塞式方法");
//關閉tScoket
// tSocket.close();
}
} catch (TException e) {
System.out.println("連接已斷開...");
e.printStackTrace();
}
}
});
thread.start();
}
}
在這裡,我加入了一個超時響應的死循環,用於接收服務端返回的消息,控制台可以查看服務端給的響應消息。
八.運行服務端和客戶端main方法,控制台打印如下:


代碼閱讀有些困難,有困難或不合理之處,請小伙伴指出。Thank you!