程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java網絡服務器編程(NIO版)

Java網絡服務器編程(NIO版)

編輯:關於JAVA

從Java 1.4開始提供的NIO API常用於開發高性能網絡服務器,本文演示了如何用這個API開發一個TCP Echo Server。

Java網絡服務器編程 一文演示了如何使用Java的Socket API編寫一個簡單的TCP Echo Server。其阻塞式IO的處理方式雖然簡單,但每個客戶端都需要一個單獨的Thread來處理,當服務器需要同時處理大量客戶端時,這種做法不再可行。使用NIO API可以讓一個或有限的幾個Thread同時處理連接到服務器上的所有客戶端。(關於NIO API的一些介紹,可以在Java NIO API詳解一文中找到。)

NIO API允許一個線程通過Selector對象同時監控多個SelectableChannel來處理多路IO,NIO應用程序一般按下圖所示工作:

Figure 1

如Figure 1 所示,ClIEnt一直在循環地進行select操作,每次select()返回以後,通過selectedKeys()可以得到需要處理的SelectableChannel並對其一一處理。

這樣做雖然簡單但也有個問題,當有不同類型的SelectableChannel需要做不同的IO處理時,在圖中ClIEnt的代碼就需要判斷channel的類型然後再作相應的操作,這往往意味著一連串的if else。更糟糕的是,每增加一種新的channel,不但需要增加相應的處理代碼,還需要對這一串if else進行維護。(在本文的這個例子中,我們有ServerSocketChannel和SocketChannel這兩種channel需要分別被處理。)

如果考慮將channel及其需要的IO處理進行封裝,抽象出一個統一的接口,就可以解決這一問題。在Listing 1中的NiOSession就是這個接口。

NioSession的channel()方法返回其封裝的SelectableChannel對象,interestOps()返回用於這個channel注冊的interestOps。registered()是當SelectableChannel被注冊後調用的回調函數,通過這個回調函數,NioSession可以得到channel注冊後的SelectionKey。process()函數則是NiOSession接口的核心,這個方法抽象了封裝的SelectableChannel所需的IO處理邏輯。

Listing 1:

public interface NiOSession {

public SelectableChannel channel();

public int interestOps();

public void registered(SelectionKey key);

public void process();

}

和NioSession一起工作的是NioWorker這個類(Listing 2),它是NioSession的調用者,封裝了一個Selector對象和Figure 1中循環select操作的邏輯。理解這個類可以幫助我們了解該如何使用NiOSession這個接口。

NioWorker實現了Runnable接口,循環select操作的邏輯就在run()方法中。在NioWorker – NioSession這個框架中,NioSession在channel注冊的時候會被作為attachment送入register函數,這樣,在每次select()操作的循環中,對於selectedKeys()中的每一個SelectionKey,我們都可以通過attachment拿到其相對應的NiOSession然後調用其process()方法。

每次select()循環還有一個任務,就是將通過add()方法加入到這個NioWorker的NioSession注冊到Selector上。在Listing 2的代碼中可以看出,NioSession中的channel()被取出並注冊在Selector上,注冊所需的interestOps從NioSession中取出,NioSession本身則作為attachment送入register()函數。注冊成功後,NiOSession的registered()回調函數會被調用。

NioWorker的add()方法的作用是將一個NioSession加入到該NioWorker中,並wakeup當前的select操作,這樣在下一次的select()調用之前,這個NiOSession會被注冊。stop()方法則是讓一個正在run()的NioWorker停止。closeAllChannels()會關閉當前注冊的所有channel,這個方法可在NioWorker不再使用時用來釋放IO資源。

Listing 2:

public class NioWorker implements Runnable {

public NioWorker(Selector sel) {

_sel = sel;

_added = new HashSet();

}

public void run() {

try {

try {

while (_run) {

_sel.select();

Set selected = _sel.selectedKeys();

for (Iterator itr = selected.iterator(); itr.hasNext();) {

SelectionKey key = (SelectionKey) itr.next();

NioSession s = (NiOSession) key.attachment();

s.process();

itr.remove();

}

synchronized (_added) {

for (Iterator itr = _added.iterator(); itr.hasNext();) {

NioSession s = (NiOSession) itr.next();

SelectionKey key = s.channel().register(_sel, s.interestOps(), s);

s.registered(key);

itr.remove();

}

}

}

} finally {

_sel.close();

}

} catch (IOException ex) {

throw new Error(ex);

}

}

public void add(NiOSession s) {

synchronized (_added) {

_added.add(s);

}

_sel.wakeup();

}

public synchronized void stop() {

_run = false;

_sel.wakeup();

}

public void closeAllChannels() {

for (Iterator itr = _sel.keys().iterator(); itr.hasNext();) {

SelectionKey key = (SelectionKey) itr.next();

try {

key.channel().close();

} catch (IOException ex) {}

}

}

protected Selector _sel = null;

protected Collection _added = null;

protected volatile boolean _run = true;

}

在Echo Server這個例子中,我們需要一個ServerSocketChannel來接受新的TCP連接,對於每個TCP連接,我們還需要一個SocketChannel來處理這個TCP連接上的IO操作。把這兩種channel和上面的NioWorker – NioSession結構整合在一起,可以得到NiOServerSession和NioEchoSession這兩個類,它們分別封裝了ServerSocketChannel和SocketChannel及其對應的IO操作。下面這個UML類圖描述了這4個類的關系:

Figure 2

可以看到NioWorker和NioSession對新加入的兩個類沒有任何依賴性,NioServerSession和NioEchoSession通過實現NioSession這個接口為系統加入了新的功能。這樣的一個體系架構符合了Open-Close原則,新的功能可以通過實現NiOSession被加入而無需對原有的模塊進行修改,這體現了面向對象設計的強大威力。

NioServerSession的實現(Listing 3)相對比較簡單,其封裝了一個ServerSocketChannel以及從這個channel上接受新的TCP連接的邏輯。NioServerSession還需要一個NioWorker的引用,這樣每接受一個新的TCP連接,NiOServerSession就為其創建一個NioEchoSession的對象,並將這個對象加入到NioWorker中。

Listing 3:

public class NioServerSession implements NiOSession {

public NiOServerSession(ServerSocketChannel channel, NioWorker worker) {

_channel = channel;

_worker = worker;

}

public void registered(SelectionKey key) {}

public void process() {

try {

SocketChannel c = _channel.accept();

if (c != null) {

c.configureBlocking(false);

NioEchoSession s = new NioEchoSession(c);

_worker.add(s);

}

} catch (IOException ex) {

throw new Error(ex);

}

}

public SelectableChannel channel() {

return _channel;

}

public int interestOps(){

return SelectionKey.OP_ACCEPT;

}

protected ServerSocketChannel _channel;

protected NioWorker _worker;

}

NioEchoSession的行為要復雜一些,NioEchoSession會先從TCP連接中讀取數據,再將這些數據用同一個連接寫回去,並重復這個步驟直到客戶端把連接關閉為止。我們可以把“Reading”和“Writing”看作NioEchoSession的兩個狀態,這樣可以用一個有限狀態機來描述它的行為,如下圖所示:

Figure 3

接下來的工作就是如何實現這個有限狀態機了。在這個例子中,我們使用State模式來實現它。下面這張UML類圖描述了NioEchoSession的設計細節。

Figure 4

NioEchoSession所處的狀態由EchoState這個抽象類來表現,其兩個子類分別對應了“Reading”和“Writing”這兩個狀態。NioEchoSession會將process()和interestOps()這兩個方法delegate給EchoState來處理,這樣,當NioEchoSession處於不同的狀態時,就會有不同的行為。

Listing 4是EchoState的實現。EchoState定義了process()和interestOps()這兩個抽象的方法來讓子類實現。NioEchoSession中的process()方法會被delegate到其當前EchoState的process()方法,NioEchoSession本身也會作為一個描述context的參數被送入EchoState的process()方法中。EchoState定義的interestOps()方法則會在NioEchoSession注冊和轉變State的時候被用到。

EchoState還定義了兩個靜態的方法來返回預先創建好的ReadState和WriteState,這樣做的好處是可以避免在NioEchoSession轉換state的時候創建一些不必要的對象從而影響性能。然而,這樣做要求state類必須是無狀態的,狀態需要保存在context類,也就是NioEchoSession中。

Listing 4:

public abstract class EchoState {

public abstract void process(NioEchoSession s) throws IOException;

public abstract int interestOps();

public static EchoState readState() {

return _read;

}

public static EchoState writeState() {

return _write;

}

protected static EchoState _read = new ReadState();

protected static EchoState _write = new WriteState();

}

Listing 5是NioEchoSession的實現。NioEchoSession包含有一個SocketChannel,這個channel注冊後得到的SelectionKey,一個用於存放數據的ByteBuffer和一個記錄當前state的EchoState對象。在初始化時,EchoState被初始化為一個ReadState。NioEchoSession把process()方法和interestOps()方法都delegate到當前的EchoState中。其setState()方法用於切換當前state,在切換state後,NioEchoSession會通過SelectionKey更新注冊的interestOps。close()方法用於關閉這個NioEchoSession對象。

Listing 5:

public class NioEchoSession implements NiOSession {

public NioEchoSession(SocketChannel c) {

_channel = c;

_buf = ByteBuffer.allocate(128);

_state = EchoState.readState();

}

public void registered(SelectionKey key) {

_key = key;

}

public void process() {

try {

_state.process(this);

} catch (IOException ex) {

close();

throw new Error(ex);

}

}

public SelectableChannel channel() {

return _channel;

}

public int interestOps() {

return _state.interestOps();

}

public void setState(EchoState state) {

_state = state;

_key.interestOps(interestOps());

}

public void close() {

try {

_channel.close();

} catch (IOException ex) {

throw new Error(ex);

}

}

protected SocketChannel _channel = null;

protected SelectionKey _key;

protected ByteBuffer _buf = null;

protected EchoState _state = null;

}

Listing 6和Listing 7分別是ReadState和WriteState的實現。ReadState在process()中會先從NioEchoSession的channel中讀取數據,如果未能讀到數據,NioEchoSession會繼續留在ReadState;如果讀取出錯,NioEchoSession會被關閉;如果讀取成功,NioEchoSession會被切換到WriteState。WriteState則負責將NioEchoSession中已經讀取的數據寫回到channel中,全部寫完後,NioEchoSession會被切換回ReadState。

Listing 6:

public class ReadState extends EchoState {

public void process(NioEchoSession s)

throws IOException

{

SocketChannel channel = s._channel;

ByteBuffer buf = s._buf;

int count = channel.read(buf);

if (count == 0) {

return;

}

if (count == -1) {

s.close();

return;

}

buf.flip();

s.setState(EchoState.writeState());

}

public int interestOps() {

return SelectionKey.OP_READ;

}

}

Listing 7:

public class WriteState extends EchoState {

public void process(NioEchoSession s)

throws IOException

{

SocketChannel channel = s._channel;

ByteBuffer buf = s._buf;

channel.write(buf);

if (buf.remaining() == 0) {

buf.clear();

s.setState(EchoState.readState());

}

}

public int interestOps() {

return SelectionKey.OP_WRITE;

}

}

NioEchoServer(Listing 8)被用來啟動和關閉一個TCP Echo Server,這個類實現了Runnable接口,調用其run()方法就啟動了Echo Server。其shutdown()方法被用來關閉這個Echo Server,注意shutdown()和run()的finally block中的同步代碼確保了只有當Echo Server被關閉後,shutdown()方法才會返回。

Listing 8:

public class NioEchoServer implements Runnable {

public void run() {

try {

ServerSocketChannel serv = ServerSocketChannel.open();

try {

serv.socket().bind(new InetSocketAddress(7));

serv.configureBlocking(false);

_worker = new NioWorker(Selector.open());

NioServerSession s = new NiOServerSession(serv, _worker);

_worker.add(s);

_worker.run();

} finally {

_worker.closeAllChannels();

synchronized (this) {

notify();

}

}

} catch (IOException ex) {

throw new Error(ex);

}

}

public synchronized void shutdown() {

_worker.stop();

try {

wait();

} catch (InterruptedException ex) {

throw new Error(ex);

}

}

protected NioWorker _worker = null;

}

最後,通過一個簡單的main()函數(Listing 9),我們就可以運行這個Echo Server了。

Listing 9:

public static void main(String [] args) {

new NioEchoServer().run();

}

我們可以通過telnet程序來檢驗這個程序的運行狀況:

1. 打開一個命令行,輸入 telnet localhost 7 來運行一個telnet程序並連接到Echo Server上。

2. 在telnet程序中輸入字符,可以看到輸入的字符被顯示在屏幕上。(這是因為Echo Server將收到的字符寫回到客戶端)

3. 多打開幾個telnet程序進行測試,可以看到Echo Server能通過NIO API用一個Thread服務多個客戶端。

作者:DaiJiaLin

mailto:[email protected]

http://blog.csdn.Net/DaiJiaLin

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved