程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> yanf4j引入了客戶端非阻塞API

yanf4j引入了客戶端非阻塞API

編輯:關於JAVA

yanf4j發布一個0.50-beta2版本,這個版本最重要的改進就是引入了客戶端 連接非阻塞API,主要最近的工作要用到,所以添加了。兩個核心類 TCPConnectorController和UDPConnectorController分別用於TCP和UDP的客戶端 連接控制。例如,現在的UDP echo client可以寫成:

//客戶端echo handler
     class EchoClientHandler extends 

HandlerAdapter {
        public void onReceive(Session udpSession, 

Object t) {
            DatagramPacket datagramPacket = 

(DatagramPacket) t;
            System.out.println

("recv:" + new String(datagramPacket.getData

()));
        }
        @Override
        public void 

onMessageSent(Session session, Object t) {
            

System.out.println("send:" + new String((byte[]) 

t));
        }
    }
       //連接代碼,並發送UDP包
       

 UDPConnectorController connector = new UDPConnectorController();
  

      connector.setSoTimeout(1000);
        connector.setHandler

(new EchoClientHandler());
        connector.connect(new 

InetSocketAddress(InetAddress.getByName(host),
                

port));
        for (int i = 0; i < 10000; i++) {
        

    String s = "hello " + i;
            

DatagramPacket packet = new DatagramPacket(s.getBytes(), s.length

());
            connector.send(packet);
        }

UDP不是面向連接的,因此connect方法僅僅是調用了底層 DatagramChannel.connect方法,用來限制接收和發送的packet的遠程端點。

再來看看TCPConnectorController的使用,同樣看Echo Client的實現:

//客戶端的echo handler
class EchoHandler extends HandlerAdapter<String> {
        @Override
        public void onConnected(Session session) {
            try {
                //一連接就發送NUM個字符串
                for (int i = 0; i < NUM; i++)
                    session.send(generateString(i));
             } catch (Exception e) {
             }
        }
        public String generateString(int len) {
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < MESSAGE_LEN; i++)
                sb.append(i);
            return sb.toString();
        }
        @Override
        public void onReceive(Session session, String t) {
            //打印接收到字符串
            if (DEBUG)
                System.out.println("recv:" + t);
        }
    }
//...連接API,TCPConnectorController示例
    Configuration configuration = new Configuration();
        configuration.setTcpSessionReadBufferSize(256 * 1024); // 設置

讀的緩沖區大小
    TCPConnectorController    connector = new TCPConnectorController

(configuration,
                new StringCodecFactory());
    connector.setHandler(new EchoHandler());
    connector.setCodecFactory(new StringCodecFactory());
    try {
            connector.Connect(new InetSocketAddress

("localhost", 8080));
    } catch (IOExceptione) {
            e.printStackTrace();
    }

注意,connect方法並不阻塞,而是立即返回,連接是否建立可以通過 TCPConnectorController.isConnected()方法來判斷,因此通常你可能會這樣使 用:

try {
            connector.Connect(new InetSocketAddress

("localhost", 8080));
            while(!connector.isConnected())
                ;
        } catch (Exception e) {
            e.printStackTrace();
        }

來強制確保後面對connector的使用是已經連接上的connector,然而更好的 做法是在Handler的onConnected()回調方法中處理邏輯,因為這個方法僅僅在連 接建立後才會被調用。

兩個ConnectorController都有系列send方法,用於發送數據:

TCPConnectorController.send(Object msg) throws InterruptedException
UDPConnectorController.send(DatagramPacket packet) throws InterruptedException
UDPConnectorController.send(SocketAddress targetAddr, Object msg) throws InterruptedException

0.50-beta2帶來的另一個修改就是Session接口添加setReadBufferByteOrder 方法,用於設置session接收緩沖區的字節序,默認是網絡字節序,也就是大端 法。這個方法建議在Handler的onSessionStarted回調方法中調用。

在0.50-beta最重要的修改是引入了session發送隊列緩沖區的流量控制選項 。默認情況下,session的發送緩沖隊列是無界的,隊列的push和pop也全然不會 阻塞。在設置了緩沖隊列的高低水位選項後即引入了發送流量控制,規則如下:

a)當發送隊列中的數據總量大於高水位標記(highWaterMark), Session.send將阻塞

b)在條件a的作用下,Session.send的阻塞將持續到發送隊列中的數據總量小 於於低水位標記(lowWaterMark)才解除。

緩沖隊列高低水位的設置通過Controller的下列方法設置:

public void setSessionWriteQueueHighWaterMark(int highWaterMark);

public void setSessionWriteQueueLowWaterMark(int lowWaterMark);

緩沖隊列的流量控制想法來自ACE的ACE_Message_Queue,是通過 com.google.code.yanf4j.util.MessageQueue類實現的。

0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send (Object msg,long timeout),在超過timeout時間後send仍然阻塞時即終止send 。注意,現在Session.send的這兩個方法都返回一個bool值來表示send成功與否 ,並且都將響應中斷(僅限啟動了流量控制選項)拋出InterruptedException。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved