程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> nio框架中的多個Selector結構

nio框架中的多個Selector結構

編輯:關於JAVA

隨著並發數量的提高,傳統nio框架采用一個Selector來支撐大量連接事件的 管理和觸發已經遇到瓶頸,因此現在各種nio框架的新版本都采用多個 Selector 並存的結構,由多個Selector均衡地去管理大量連接。這裡以Mina和Grizzly的實現為例。

在Mina 2.0中,Selector的管理是由 org.apache.mina.transport.socket.nio.NioProcessor來處理,每個 NioProcessor對象保存一個Selector,負責具體的select、wakeup、channel的 注冊和取消、讀寫事件的注冊和判斷、實際的IO讀寫操作等等,核心代碼如下:

public NioProcessor(Executor executor) {
         super(executor);
         try {
             // Open a new selector
             selector = Selector.open();
         } catch (IOException e) {
             throw new RuntimeIoException("Failed to  open a selector.", e);
         }
     }

     protected int select(long timeout) throws Exception  {
         return selector.select(timeout);
     }

     protected boolean isInterestedInRead(NioSession session)  {
         SelectionKey key = session.getSelectionKey ();
         return key.isValid() &&  (key.interestOps() & SelectionKey.OP_READ) != 0;
     }

     protected boolean isInterestedInWrite(NioSession  session) {
         SelectionKey key = session.getSelectionKey ();
         return key.isValid() &&  (key.interestOps() & SelectionKey.OP_WRITE) != 0;
     }
     protected int read(NioSession session, IoBuffer buf)  throws Exception {
         return session.getChannel().read(buf.buf());
     }

     protected int write(NioSession session, IoBuffer buf,  int length) throws Exception {
         if (buf.remaining() <= length) {
             return session.getChannel().write(buf.buf ());
         } else {
             int oldLimit = buf.limit();
             buf.limit(buf.position() + length);
             try {
                 return session.getChannel().write (buf.buf());
             } finally {
                 buf.limit(oldLimit);
             }
         }
     }

這些方法的調用都是通過AbstractPollingIoProcessor來處理,這個類裡可 以看到一個nio框架的核心邏輯,注冊、select、派發,具體因為與本文主題不 合,不再展開。NioProcessor的初始化是在NioSocketAcceptor的構造方法中調 用的:

public NioSocketAcceptor() {
         super(new DefaultSocketSessionConfig(),  NioProcessor.class);
         ((DefaultSocketSessionConfig) getSessionConfig ()).init(this);
     }

直接調用了父類AbstractPollingIoAcceptor的構造函數,在其中我們可以看 到,默認是啟動了一個SimpleIoProcessorPool來包裝NioProcessor:

protected AbstractPollingIoAcceptor(IoSessionConfig  sessionConfig,
             Class<? extends  IoProcessor<T>> processorClass) {
         this(sessionConfig, null, new  SimpleIoProcessorPool<T>(processorClass),
                 true);
     }

這裡其實是一個組合模式,SimpleIoProcessorPool和NioProcessor都實現了 Processor接口,一個是組合形成的Processor池,而另一個是單獨的類。調用的 SimpleIoProcessorPool的構造函數是這樣:

private static final int DEFAULT_SIZE =  Runtime.getRuntime().availableProcessors() + 1;
     public SimpleIoProcessorPool(Class<? extends  IoProcessor<T>> processorType) {
         this(processorType, null, DEFAULT_SIZE);
     }

可以看到,默認的池大小是cpu個數+1,也就是創建了cpu+1個的Selector對 象。它的重載構造函數裡是創建了一個數組,啟動一個 CachedThreadPool來運 行NioProcessor,通過反射創建具體的Processor對象,這裡就不再列出了。

Mina當有一個新連接建立的時候,就創建一個NioSocketSession,並且傳入 上面的SimpleIoProcessorPool,當連接初始化的時候將Session加入 SimpleIoProcessorPool:

protected NioSession accept(IoProcessor<NioSession>  processor,
             ServerSocketChannel handle) throws  Exception {
         SelectionKey key = handle.keyFor (selector);

         if ((key == null) || (!key.isValid()) || (! key.isAcceptable()) ) {
             return null;
         }
         // accept the connection from the client
         SocketChannel ch = handle.accept();

         if (ch == null) {
             return null;
         }
         return new NioSocketSession(this, processor,  ch);
     }

         private void processHandles(Iterator<H>  handles) throws Exception {
             while (handles.hasNext()) {
                 H handle = handles.next();
                 handles.remove();
                 // Associates a new created  connection to a processor,
                 // and get back a session
                 T session = accept(processor,  handle);

                 if (session == null) {
                     break;
                 }
                 initSession(session, null,  null);
                 // add the session to the  SocketIoProcessor
                 session.getProcessor().add (session);
             }
         }

加入的操作是遞增一個整型變量並且模數組大小後對應的NioProcessor注冊 到session裡:

private IoProcessor<T> nextProcessor() {
         checkDisposal();
         return pool[Math.abs (processorDistributor.getAndIncrement()) % pool.length];
     }
     if (p == null) {
             p = nextProcessor();
             IoProcessor<T> oldp =
                 (IoProcessor<T>)  session.setAttributeIfAbsent(PROCESSOR, p);
             if (oldp != null) {
                 p = oldp;
             }
     }

這樣一來,每個連接都關聯一個NioProcessor,也就是關聯一個Selector對 象,避免了所有連接共用一個Selector負載過高導致 server響應變慢的後果。 但是注意到NioSocketAcceptor也有一個Selector,這個Selector用來干什麼的 呢?那就是集中處理OP_ACCEPT事件的Selector,主要用於連接的接入,不跟處 理讀寫事件的Selector混在一起,因此Mina的默認open的 Selector是cpu+2個。

看完mina2.0之後,我們來看看Grizzly2.0是怎麼處理的,Grizzly還是比較 保守,它默認就是啟動兩個Selector,其中一個專門負責accept,另一個負責連 接的IO讀寫事件的管理。Grizzly 2.0中Selector的管理是通過SelectorRunner 類,這個類封裝了Selector對象以及核心的分發注冊邏輯,你可以將他理解成 Mina中的NioProcessor,核心的代碼如下:

protected boolean doSelect() {
         selectorHandler =  transport.getSelectorHandler();
         selectionKeyHandler =  transport.getSelectionKeyHandler();
         strategy = transport.getStrategy();

         try {
             if (isResume) {
                 // If resume SelectorRunner -  finish postponed keys
                 isResume = false;
                 if (keyReadyOps != 0) {
                     if (!iterateKeyEvents())  return false;
                 }

                 if (!iterateKeys()) return  false;
             }
             lastSelectedKeysCount = 0;

             selectorHandler.preSelect(this);

             readyKeys = selectorHandler.select (this);
             if (stateHolder.getState(false) ==  State.STOPPING) return false;

             lastSelectedKeysCount = readyKeys.size ();

             if (lastSelectedKeysCount != 0) {
                 iterator = readyKeys.iterator ();
                 if (!iterateKeys()) return  false;
             }
             selectorHandler.postSelect(this);
         } catch (ClosedSelectorException e) {
             notifyConnectionException(key,
                     "Selector was unexpectedly closed", e,
                     Severity.TRANSPORT,  Level.SEVERE, Level.FINE);
         } catch (Exception e) {
             notifyConnectionException(key,
                     "doSelect exception", e,
                     Severity.UNKNOWN,  Level.SEVERE, Level.FINE);
         } catch (Throwable t) {
             logger.log(Level.SEVERE,"doSelect  exception", t);
             transport.notifyException(Severity.FATAL,  t);
         }
         return true;
     }

基本上是一個reactor實現的樣子,在AbstractNIOTransport類維護了一個 SelectorRunner的數組,而Grizzly 用於創建tcp server的類TCPNIOTransport 正是繼承於AbstractNIOTransport類,在它的start方法中調用了 startSelectorRunners來創建並啟動SelectorRunner數組:

private static final int DEFAULT_SELECTOR_RUNNERS_COUNT  = 2;
  @Override
   public void start() throws IOException {
   if (selectorRunnersCount <= 0) {
                 selectorRunnersCount =  DEFAULT_SELECTOR_RUNNERS_COUNT;
             }
   startSelectorRunners();
}
  protected void startSelectorRunners() throws IOException  {
         selectorRunners = new SelectorRunner [selectorRunnersCount];

         synchronized(selectorRunners) {
             for (int i = 0; i <  selectorRunnersCount; i++) {
                 SelectorRunner runner =
                         new  SelectorRunner(this, SelectorFactory.instance().create());
                 runner.start();
                 selectorRunners[i] = runner;
             }
         }
     }

可見Grizzly並沒有采用一個單獨的池對象來管理SelectorRunner,而是直接 采用數組管理,默認數組大小是2。 SelectorRunner實現了Runnable接口,它的 start方法調用了一個線程池來運行自身。剛才我提到了說Grizzly的Accept 是 單獨一個Selector來管理的,那麼是如何表現的呢?答案在 RoundRobinConnectionDistributor類,這個類是用於派發注冊事件到相應的 SelectorRunner上,它的派發方式是這樣:

public Future<RegisterChannelResult>  registerChannelAsync(
             SelectableChannel channel, int  interestOps, Object attachment,
             CompletionHandler completionHandler)
             throws IOException {
         SelectorRunner runner = getSelectorRunner (interestOps);

         return transport.getSelectorHandler ().registerChannelAsync(
                 runner, channel, interestOps,  attachment, completionHandler);
     }

     private SelectorRunner getSelectorRunner(int  interestOps) {
         SelectorRunner[] runners =  getTransportSelectorRunners();
         int index;
         if (interestOps == SelectionKey.OP_ACCEPT ||  runners.length == 1) {
             index = 0;
         } else {
             index = (counter.incrementAndGet() %  (runners.length - 1)) + 1;
         }

         return runners[index];
     }

getSelectorRunner這個方法道出了秘密,如果是OP_ACCEPT,那麼都使用數 組中的第一個SelectorRunner,如果不是,那麼就通過取模運算的結果+1從後面 的SelectorRunner中取一個來注冊。

分析完mina2.0和grizzly2.0對Selector的管理後我們可以得到幾個啟示:

1、在處理大量連接的情況下,多個Selector比單個Selector好

2、多個Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理 OP_ACCEPT的Selector分離,也就是說處理接入應該要一個單獨的Selector對象 來處理,避免IO讀寫事件影響接入速度。

3、Selector的數目問題,mina默認是cpu+2,而grizzly總共就2個,我更傾 向於mina的策略,但是我認為應該對cpu個數做一個判斷,如果CPU個數超過8個 ,那麼更多的Selector線程可能帶來比較大的線程切換的開銷,mina默認的策略 並非合適,幸好可以設置這個數值。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved