嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連接數量,嗯,那就順便把這部分來看了。。。
在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的同步工具,進而來實現了一個用於控制連接數量的閉鎖。。LimitLatch。。
這裡就需對AbstractQueuedSynchronizer有一些初步的了解。。。
首先它concurrent類庫中提供的一個用於構建自己的同步工具的一個工具類。。可以通過繼承他來快速的完成一個同步類的實現
(1)acquireSharedInterruptibly()方法,用於以共享的方式來獲取鎖,如果暫時無法獲取,將會將線程掛起到隊列,進行阻塞,對於這個方法是否最終能獲取鎖,是通過tryAcquireShared()方法的返回來定義的,這個方法需要自己實現。。。如果能獲取鎖,那麼返回1,否則返回-1.。。
(2)releaseShared()方法。以共享的方法釋放一個鎖,這樣前面提到的掛起的線程將會喚醒,進而重新嘗試獲取鎖。。。
好啦,接下來就來看看LimitLatch的定義吧,直接上代碼好了,。,。代碼還是很簡單的。。
//其實是通過AbstractQueuedSynchronizer來構建的
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
//構建Sync類型,實現基本的同步,以及阻塞。。
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
//用於增加計數,如果計數增加之後小於最大的,那麼返回1,不會阻塞,否則將會返回-1阻塞
protected int tryAcquireShared(int ignored) { //調用acquiredShared方法的時候會調用這個方法來返回狀態,如果返回1,那麼表示獲取成功,返回-1表示獲取失敗,將會阻塞
long newCount = count.incrementAndGet(); //先增加計數
if (!released && newCount > limit) { //如果當前已經超過了最大的限制
// Limit exceeded
count.decrementAndGet(); //減少計數
return -1; //返回-1,將阻塞當前線程
} else {
return 1;
}
}
@Override
//用於減少計數
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync; //同步對象
private final AtomicLong count; //計數器
private volatile long limit; //最大的數量
private volatile boolean released = false; //是否全部釋放
/**
* Instantiates a LimitLatch object with an initial limit.
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit; //最大限制
this.count = new AtomicLong(0);
this.sync = new Sync(); //sync 對象
}
/**
* Returns the current count for the latch
* @return the current count for latch
*/
public long getCount() {
return count.get();
}
/**
* Obtain the current limit.
*/
public long getLimit() {
return limit;
}
/**
* Sets a new limit. If the limit is decreased there may be a period where
* more shares of the latch are acquired than the limit. In this case no
* more shares of the latch will be issued until sufficient shares have been
* returned to reduce the number of acquired shares of the latch to below
* the new limit. If the limit is increased, threads currently in the queue
* may not be issued one of the newly available shares until the next
* request is made for a latch.
*
* @param limit The new limit
*/
public void setLimit(long limit) {
this.limit = limit;
}
/**
* Acquires a shared latch if one is available or waits for one if no shared
* latch is current available.
*/
//增加計數,如果太大,那麼等等待
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
/**
* Releases a shared latch, making it available for another thread to use.
* @return the previous counter value
*/
//減少計數
public long countDown() {
sync.releaseShared(0); //釋放
long result = getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
}
return result;
}
/**
* Releases all waiting threads and causes the {@link #limit} to be ignored
* until {@link #reset()} is called.
*/
//通過將released設置為true,將會釋放所有的線程,知道reset了
public boolean releaseAll() {
released = true;
return sync.releaseShared(0);
}
/**
* Resets the latch and initializes the shared acquisition counter to zero.
* @see #releaseAll()
*/
//重制
public void reset() {
this.count.set(0);
released = false;
}
/**
* Returns true if there is at least one thread waiting to
* acquire the shared lock, otherwise returns false.
*/
//當前是否有線程等待
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Provide access to the list of threads waiting to acquire this limited
* shared latch.
*/
//獲取所有等待的線程
public Collection getQueuedThreads() {
return sync.getQueuedThreads();
}
}
代碼應該還是很簡單的吧,而且注釋也算是說的比較清楚。。。其實是構建了一個繼承自AbstractQueuedSynchronizer的Sync對象,通過它來進行真正的同步功能。。。然後通過一個原子的整數計數器,和一個最大值,來判斷當前是否可以獲取鎖
好啦,這裡來看看Tomcat是如何通過LimitLatch來控制連接數量的吧,先來看看NioEndpoint的啟動方法:
//啟動當前的endpoint
public void startInternal() throws Exception {
if (!running) {
running = true; //設置表示為,表示已經看是運行了
paused = false; //沒有暫停
// Create worker collection
if ( getExecutor() == null ) { //如果沒有executor,那麼創建
createExecutor(); //創建executor
}
initializeConnectionLatch(); //初始化閉鎖,用於控制連接的數量
// Start poller threads
pollers = new Poller[getPollerThreadCount()]; //根據設置的poller數量來創建poller對象的數組
for (int i=0; i
這裡調用了initializeConnectionLatch方法來初始化閉鎖,來看看吧:
//初始化閉鎖,用於控制連接的數量
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null; //這個是無限的鏈接數量
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections()); //根據最大的鏈接數量來創建
}
return connectionLimitLatch;
}
我們知道在Connector的配置中可以設置最大的鏈接數量,其實這裡也就是通過這個數量來構建LimitLatch對象的。。。
嗯,Tomcat是從哪裡獲取連接呢,這個就要從Accecptor看了。。。
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) { //如果暫停了
state = AcceptorState.PAUSED; //更改當前acceptor的狀態
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) { //如果沒有運行,那麼這裡直接跳過
break;
}
state = AcceptorState.RUNNING; //設置當前acceptor的狀態是running
try {
//if we have reached max connections, wait
countUpOrAwaitConnection(); //增減閉鎖的計數,如果connection數量已經達到了最大,那麼暫停一下,這裡用到的是connectionLimitLatch鎖,可以理解為一個閉鎖吧
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept(); //調用serversocket的accept方法
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection(); //出了異常,並沒有獲取鏈接,那麼這裡減少閉鎖的計數
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) { //這裡主要是將socket加入到poller對象上面去,而且還要設置參數
countDownConnection(); //加入poller對象失敗了的話,那麼將閉鎖的計數減低
closeSocket(socket); //關閉剛剛 創建的這個socket
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED; //設置acceptor的狀態為ended
}
這裡讀一下Accecptor,的run方法可以知道,每次在調用serverSocketChannel的accept方法之前都會調用countUpOrAwaitConnection方法來增加閉鎖的計數,如果有問題,那就會調用countDownConnection方法來降低閉鎖的計數。。。
其實這裡通過這兩個方法就知道他們是干嘛的了,先來看看countUpOrAwaitConnection吧:
//這裡用於增加閉鎖的計數
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait(); //增加閉鎖的counter
}
沒啥意思吧,就是調用剛剛創建的閉鎖的countUpOrAwait方法,接下來來看看countDownConnection方法吧:
//用於減少閉鎖的計數
protected long countDownConnection() {
if (maxConnections==-1) return -1;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." );
}
return result;
} else return -1;
}
這個也沒啥意思吧。。。就是調用閉鎖的countDown方法。。。
嗯,到這裡整個Tomcat如何控制連接的數量就算是比較清楚了吧。。。
最後,我們知道是通過調用endpoint的cancelledKey方法來關閉一個連接的,來看看它的實現吧:
//取消一個注冊
public void cancelledKey(SelectionKey key, SocketStatus status) {
try {
if ( key == null ) return;//nothing to do
KeyAttachment ka = (KeyAttachment) key.attachment();
if (ka != null && ka.isComet() && status != null) {
ka.setComet(false);//to avoid a loop
if (status == SocketStatus.TIMEOUT ) {
if (processSocket(ka.getChannel(), status, true)) {
return; // don't close on comet timeout
}
} else {
// Don't dispatch if the lines below are canceling the key
processSocket(ka.getChannel(), status, false);
}
}
key.attach(null); //將附件設置為null
if (ka!=null) handler.release(ka); //可以取消這個attachment了
else handler.release((SocketChannel)key.channel());
if (key.isValid()) key.cancel(); //取消key
if (key.channel().isOpen()) { //如果channel還是打開的,那麼需要關閉channel
try {
key.channel().close();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.channelCloseFail"), e);
}
}
}
try {
if (ka!=null) {
ka.getSocket().close(true); //關閉sockt
}
} catch (Exception e){
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.socketCloseFail"), e);
}
}
try {
if (ka != null && ka.getSendfileData() != null
&& ka.getSendfileData().fchannel != null
&& ka.getSendfileData().fchannel.isOpen()) {
ka.getSendfileData().fchannel.close();
}
} catch (Exception ignore) {
}
if (ka!=null) {
ka.reset();
countDownConnection(); //降低用於維護連接數量的閉鎖
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) log.error("",e);
}
}
這裡可以看到調用了countDownConnection方法來降低閉鎖的計數。。
最後總結:Tomcat通過在acceptor中對閉鎖的獲取來控制總連接的數量,如果連接數量達到了最大的限制,那麼將會被阻塞。。直到有連接關閉為止。。。這樣acceptor的線程就又被喚醒了。。。