程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> JAVA NIO Socket通道,javaniosocket通道

JAVA NIO Socket通道,javaniosocket通道

編輯:JAVA綜合教程

JAVA NIO Socket通道,javaniosocket通道


  DatagramChannel和SocketChannel都實現定義讀寫功能,ServerSocketChannel不實現,只負責監聽傳入的連接,並建立新的SocketChannel,本身不傳輸數據。 Socket通道被實例化時都會創建一個對等的socket,通過此方式創建的socket都會有關聯的通道,通過getChannel()獲取。   繼承於 SelectableChannel,所以socket可以在非阻塞模式下運行:   Readiness Selection:就緒選擇,查詢通道的機制,該機制可以判斷通道是否准備好執行下一個目標操作(讀,寫...),其價值在於潛在的大量通道可以同時進行就緒檢查,真正的就緒選擇需要由操作系統來做,處理IO請求,並通知各個線程數據准備情況。   Selector選擇器:提供了這種抽象(抽象接口),是的Java代碼能夠以可移植的方式,請求底層操作系統提供這種服務。   Selector選擇器類:管理著一個被注冊的通道集合的信息和他們的狀態,通道和選擇器是一起被注冊的,並且使用選擇器來更新通道狀態。   一個通道可以被注冊到多個選擇器上,但在每個選擇器上,只能注冊一次。   SelectionKey選擇鍵:封裝了通道和選擇器的注冊關系,選擇鍵被SelectableChannel.register()返回並提供標識這種注冊關系的標記。   通道在被注冊到選擇器之前必須設置為noblocking模式,正常狀態。   chanel.register(selector, keystate):通道注冊選擇器。 selector.select():阻塞操作,直到某一個channel的keystate發生。   selectionKey.cancel(),取消注冊關系。   通道關閉,相關的注冊鍵會自動取消,選擇器關閉,則所有注冊到該選擇器的通道都將被注銷,並且相關的鍵會立刻失效。   selectionkey包含兩個以整數型式進行編碼的比特掩碼,一個用於指示那些通道和選擇器組合所關心的操作,另一個表示通道准備好要執行的操作。當前的interest集合可以通過調用見對象的interestOps()方法來獲取,且永遠不會被選擇器改變,但可以調用interestOps()方法,傳入一個新的比特碼來改變。   readyOpts()獲取相關通道的已就緒的操作,ready集合是interest集合的子集,表示從上次調用select()之後已經就緒的操作。如下: if((key.readOps() & SelctionKey.OP_READ) != 0){     buffer.clear();     key.channel().read(buffer);     do().... }   附加參數:attach()   SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX, paramObj); 等價: SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX); key.attach(paramObj);   SelectionKey 多線程應用同步問題。   選擇器: Selector上的已注冊鍵集合中,會存在失效鍵、null,keys()返回,不可修改。   已選擇鍵集合,selectedKeys()返回,已經准備好的鍵集合,可能為空。   核心:選擇過程,是對select(),poll(),epoll()等本地調用(native call)或者類似的操作系統的本地調用的包裝(抽象),期間,將執行以下過程:   使用內部已取消的鍵的集合來延遲注銷,防止線程在取消鍵時阻塞及與正在進行的選擇操作沖突的優化,   三種形式的select: select(), select(timeout),selectNow()(非阻塞,立刻返回當前狀況)。         調用 Selector 對象的 wakeup( )方法將使得選擇器上的第一個還沒有返回的選擇操作立即回。如果當前沒有在進行中的選擇,那麼下一次對 select( )方法的一種形式的調用將立即返回。後續的選擇操作將正常進行。在選擇操作之間多次調用 wakeup( )方法與調用它一次沒有什麼不同。有時這種延遲的喚醒行為並不是您想要的。您可能只想喚醒一個睡眠中的線程,而使得後續的 選擇繼續正常地進行。您可以通過在調用 wakeup( )方法後調用 selectNow( )方法來繞過這個問題。         通常的做法是在選擇器上調用一次 select 操作(這將更新已選擇的鍵的集合),然後遍歷 selectKeys( )方法返回的鍵的集合。在按順序進行檢查每個鍵的過程中,相關的通道也根據鍵的就緒集合進行處理。然後鍵將從已選擇的鍵的集合中被移除(通過在 Iterator對象上調用 remove( )方法),然後檢查下一個鍵。完成後,通過再次調用 select( )方法重復這個循環。如下:  
 1 package org.windwant.nio;
 2 
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.net.ServerSocket;
 6 import java.nio.ByteBuffer;
 7 import java.nio.channels.SelectionKey;
 8 import java.nio.channels.Selector;
 9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.Iterator;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Executors;
14 
15 /**
16  * Created by windwant on 2016/10/27.
17  */
18 public class SocketChannelOpt {
19 
20     private static final String HOST = "localhost";
21     private static final int PORT = 8888;
22 
23     private static ExecutorService read = Executors.newFixedThreadPool(5);
24     private static ExecutorService write = Executors.newFixedThreadPool(5);
25 
26     public static void main(String[] args){
27         ServerSocketChannel serverSocketChannel = null;
28         ServerSocket serverSocket = null;
29         Selector selector = null;
30         try {
31             serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel
32             serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket
33             serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址
34             serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式
35             selector = Selector.open();//工廠方法創建Selector
36             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。
37             while (true){//循環檢查
38                 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合
39                     continue;
40                 }
41 
42                 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。
43                 while (it.hasNext()){
44                     SelectionKey selectionKey = it.next();
45                     //處理就緒狀態
46                     if (selectionKey.isAcceptable()){
47                         ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據
48                         SocketChannel socketChannel = schannel.accept();//就緒後的操作,剛到達的socket句柄
49                         if(null == socketChannel){
50                             continue;
51                         }
52                         socketChannel.configureBlocking(false);
53                         socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據
54                     }else if(selectionKey.isReadable()){
55                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
56                         ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
57 
58                         StringBuilder result = new StringBuilder();
59                         while (socketChannel.read(byteBuffer) > 0){//確保讀完
60                             byteBuffer.flip();
61                             result.append(new String(byteBuffer.array()));
62                             byteBuffer.clear();//每次清空 對應上面flip()
63                         }
64 
65                         System.out.println("server receive: " + result.toString());
66                         socketChannel.register(selector, SelectionKey.OP_WRITE);
67 
68                     }else if(selectionKey.isWritable()){
69                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
70                         String sendStr = "server send data: " + Math.random();
71                         ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
72                         while (send.hasRemaining()){
73                             socketChannel.write(send);
74                         }
75                         socketChannel.register(selector, SelectionKey.OP_READ);
76                         System.out.println(sendStr);
77                     }
78                     it.remove();
79                 }
80             }
81 
82         } catch (IOException e) {
83             e.printStackTrace();
84         }
85     }
86 }
      Selector多線程執行,同步需求。   一個線程監控通道的就緒狀態,一個線程池處理業務需求。                                              線程池也可以擴展為不同的業務處理線程池,如日志、業務、心跳。
  1 package org.windwant.nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.net.ServerSocket;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.SelectionKey;
  8 import java.nio.channels.Selector;
  9 import java.nio.channels.ServerSocketChannel;
 10 import java.nio.channels.SocketChannel;
 11 import java.util.Iterator;
 12 import java.util.concurrent.ExecutorService;
 13 import java.util.concurrent.Executors;
 14 
 15 /**
 16  * 線程處理讀取,寫出
 17  * Created by windwant on 2016/10/27.
 18  */
 19 public class TSocketChannelOpt {
 20 
 21     private static final String HOST = "localhost";
 22     private static final int PORT = 8888;
 23 
 24     private static ExecutorService read = Executors.newFixedThreadPool(5);
 25     private static ExecutorService write = Executors.newFixedThreadPool(5);
 26 
 27     public static void main(String[] args){
 28         ServerSocketChannel serverSocketChannel = null;
 29         ServerSocket serverSocket = null;
 30         Selector selector = null;
 31         try {
 32             serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel
 33             serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket
 34             serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址
 35             serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式
 36             selector = Selector.open();//工廠方法創建Selector
 37             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。
 38             while (true){//循環檢查
 39                 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合
 40                     continue;
 41                 }
 42 
 43                 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。
 44                 while (it.hasNext()){
 45                     SelectionKey selectionKey = it.next();
 46                     it.remove();
 47                     //處理就緒狀態
 48                     if (selectionKey.isAcceptable()){
 49                         ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據
 50                         SocketChannel socketChannel = schannel.accept();//就緒後的操作,剛到達的socket句柄
 51                         if(null == socketChannel){
 52                             continue;
 53                         }
 54                         socketChannel.configureBlocking(false);
 55                         socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據
 56                     }else if(selectionKey.isReadable()){
 57                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
 58                         read.execute(new MyReadRunnable(socketChannel));
 59 
 60 //                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
 61 //                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
 62 //
 63 //                        StringBuilder result = new StringBuilder();
 64 //                        while (socketChannel.read(byteBuffer) > 0){//確保讀完
 65 //                            byteBuffer.flip();
 66 //                            result.append(new String(byteBuffer.array()));
 67 //                            byteBuffer.clear();//每次清空 對應上面flip()
 68 //                        }
 69 //
 70 //                        System.out.println("server receive: " + result.toString());
 71                         socketChannel.register(selector, SelectionKey.OP_WRITE);
 72 
 73                     }else if(selectionKey.isWritable()){
 74                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
 75                         write.execute(new MyWriteRunnable(socketChannel));
 76 //                        String sendStr = "server send data: " + Math.random();
 77 //                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
 78 //                        while (send.hasRemaining()){
 79 //                            socketChannel.write(send);
 80 //                        }
 81 //                        System.out.println(sendStr);
 82                         socketChannel.register(selector, SelectionKey.OP_READ);
 83                     }
 84                 }
 85             }
 86 
 87         } catch (IOException e) {
 88             e.printStackTrace();
 89         }
 90     }
 91 
 92     static class MyReadRunnable implements Runnable {
 93 
 94         private SocketChannel channel;
 95 
 96         public MyReadRunnable(SocketChannel channel){
 97             this.channel = channel;
 98         }
 99 
100         @Override
101         public synchronized void  run() {
102             ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
103 
104             StringBuilder result = new StringBuilder();
105             try {
106                 while (channel.read(byteBuffer) > 0){//確保讀完
107                     byteBuffer.flip();
108                     result.append(new String(byteBuffer.array()));
109                     byteBuffer.clear();//每次清空 對應上面flip()
110                 }
111                 System.out.println("server receive: " + result.toString());
112             } catch (IOException e) {
113                 e.printStackTrace();
114             }
115 
116 
117         }
118     }
119 
120     static class MyWriteRunnable implements Runnable {
121 
122         private SocketChannel channel;
123 
124         public MyWriteRunnable(SocketChannel channel){
125             this.channel = channel;
126         }
127 
128         @Override
129         public void run() {
130             String sendStr = "server send data: " + Math.random();
131             ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
132             try {
133                 while (send.hasRemaining()) {
134                     channel.write(send);
135                 }
136                 System.out.println(sendStr);
137             }catch (Exception e){
138                 e.printStackTrace();
139             }
140 
141         }
142     }
143 }

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved