程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java服務器端Socket線程池

Java服務器端Socket線程池

編輯:關於JAVA

import java.util.Vector;
import java.net.*;
import java.io.*;
public class ThreadPool {
public static final int MAX_THREADS = 100;
public static final int MAX_SPARE_THREADS = 50;
public static final int MIN_SPARE_THREADS = 10;
public static final int WORK_WAIT_TIMEOUT = 60 * 1000;
protected Vector pool; //存放空閒線程
protected MonitorRunnable monitor; //A monitor thread that monitors the pool for idel threads.
protected int maxThreads; //Max number of threads that you can open in the pool.
protected int minSpareThreads; //Min number of idel threads that you can leave in the pool.
protected int maxSpareThreads; //Max number of idel threads that you can leave in the pool.
protected int currentThreadCount; //Number of threads in the pool.
protected int currentThreadsBusy; //Number of busy threads in the pool.
protected boolean stopThePool; //Flag that the pool should terminate all the threads and stop.
/**
* Construct
*/
public ThreadPool() {
maxThreads = MAX_THREADS;
maxSpareThreads = MAX_SPARE_THREADS;
minSpareThreads = MIN_SPARE_THREADS;
currentThreadCount = 0;
currentThreadsBusy = 0;
stopThePool = false;
}
/**
* 啟動線程池
*/
public synchronized void start() {
adjustLimits(); //調整最大和最小線程數及最大和最小多余線程數.
openThreads(minSpareThreads); //打開初始線程
monitor = new MonitorRunnable(this); //Runnable對象實例 //A monitor thread that monitors the pool for idel threads.
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
public int getMaxThreads() {
return maxThreads;
}
public void setMinSpareThreads(int minSpareThreads) {
this.minSpareThreads = minSpareThreads;
}
public int getMinSpareThreads() {
return minSpareThreads;
}
public void setMaxSpareThreads(int maxSpareThreads) {
this.maxSpareThreads = maxSpareThreads;
}
public int getMaxSpareThreads() {
return maxSpareThreads;
}
/**
* 線程池管理方法.
* 當空閒隊列線程中沒有空閒線程時,則增加處理(空閒)線程數量.
* 如果線程數量已達到最大線程數,則新的連接進行等待.
* 當請求到來,且有空閒線程時調用處理線程進行具體業務處理.
* @param r ThreadPoolRunnable
*/
public void runIt(Socket cs) { //r 為task //有任務進入時調用
if (null == cs) {
throw new NullPointerException();
}
if (0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
ControlRunnable c = null; //任務處理實例.
synchronized (this) {
if (currentThreadsBusy == currentThreadCount) { //如果工作線程和當前線程數相等,說明沒有空閒線程.
if (currentThreadCount < maxThreads) { //如果當前線程數還沒有達到最大線程數.
int toOpen = currentThreadCount + minSpareThreads; //再增加minSpareThreads個線程量.
openThreads(toOpen); //打開線程新增空閒線程. //currentThreadCount數量增加
}
else { //如果當前數量達到了最大線程數.
while (currentThreadsBusy == currentThreadCount) { //當工作線程和當前線程數相等,說明沒有空閒線程.
try {
this.wait(); //連接線程進行等待.
}
catch (InterruptedException e) {
}
if (0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
}
}
}
c = (ControlRunnable) pool.lastElement(); //在有空閒線程的情況下,從空閒線程隊列中取出最後一個線程.
pool.removeElement(c); //從空閒隊列中刪除最後一個線程,用於處理其他事件.
currentThreadsBusy++; //對處理事件的線程數加1
}
System.out.println("系統調用一個Sokcet線程");
c.runIt(cs); //調用具體業務方法,告訴其有數據請求要處理,喚醒等待中的線程.
}
/**
* 關閉線程池
*/
public synchronized void shutdown() {
if (!stopThePool) { //如果線程池沒有關閉,(線程池關閉標識為假)
stopThePool = true;
monitor.terminate(); //關閉監視線程
monitor = null;
for (int i = 0; i < (currentThreadCount - currentThreadsBusy); i++) { //關閉空閒線程隊列
try {
( (ControlRunnable) (pool.elementAt(i))).terminate();
}
catch (Throwable t) {
}
}
currentThreadsBusy = currentThreadCount = 0;
pool = null;
notifyAll(); //喚醒所有在等待的線程.
}
}
/**
* 當線程大於最大多余線程時關閉多余的線程.
*/
protected synchronized void checkSpareControllers() {
if (stopThePool) { //如果連接池沒有關閉.
return;
}
if ( (currentThreadCount - currentThreadsBusy) > maxSpareThreads) { //如果空閒的線程數大於多余的最大線程數量.
int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads; //得出多余的線程數量
for (int i = 0; i < toFree; i++) { //關閉刪除空閒線程,從Vector中刪除
ControlRunnable c = (ControlRunnable) pool.firstElement();
pool.removeElement(c);
c.terminate(); //讓刪除的線程結束
currentThreadCount--; //處理線程隊列減少一個
}
}
}
/**
* 當線程處理完成後重新放到空閒線程隊列中.
* @param c ControlRunnable
*/
protected synchronized void returnController(ControlRunnable c) {
if (0 == currentThreadCount || stopThePool) { //如果線程池關閉或當前連接線程數量為0
c.terminate(); //關閉當前線程.
return;
}
currentThreadsBusy--; //處理線程隊列的數量減少一個
pool.addElement(c); //空閒線程隊列中增加一個
notifyAll(); //喚醒可能在等待連接的線程.
}
/**
* 當一個處理線程出現異常時,要重新開啟一個空閉線程.,並喚醒在等待空閒線程的線程.ThreadPool的runIt中等待的線程.
*/
protected synchronized void notifyThreadEnd() {
currentThreadsBusy--; //因從線程是在處理數據時出現異常,所處理線程隊列的數量要減一個.
currentThreadCount--; //因出現異常的線程關閉了.所開戶線程的數量要減少一個.
notifyAll(); //喚醒等待連接的阻塞線程.
openThreads(minSpareThreads); //重新打開minSpareThreads個線程.如currentThreadCount的數量大於minSpareThreads,則還是不開啟新線程.
}
/**
* 調整各種線程隊列數量
*/
protected void adjustLimits() {
if (maxThreads <= 0) { //如果最大線程數小於0
maxThreads = MAX_THREADS; //設置最大線程數為100
}
if (maxSpareThreads >= maxThreads) { //如果最大多余線程數大於最大線程數.
maxSpareThreads = maxThreads; //設置最大多余線程數為最大線程數.
}
if (maxSpareThreads <= 0) { //如果最大多余線程數小於0
if (1 == maxThreads) {
maxSpareThreads = 1; //如最大線程數為1的情況下,設置最大多余線程數為1.
}
else {
maxSpareThreads = maxThreads / 2; //設置最大多余線程數為最大線程數的一半.
}
}
if (minSpareThreads > maxSpareThreads) { //如果最小多余線程大於最大多余線程數
minSpareThreads = maxSpareThreads; //設置最小多余線程數為最大多余線程數.
}
if (minSpareThreads <= 0) { //如果最小多余線程數小於0
if (1 == maxSpareThreads) {
minSpareThreads = 1; //如最大線程數為1的情況下,則設置最小多余線程數為1.
}
else {
minSpareThreads = maxSpareThreads / 2; //否則設置最小多余線程數為最大多余線程數的一半.
}
}
}
/**
* 打開指定數量的空閒線程隊列
* @param toOpen int
*/
protected void openThreads(int toOpen) { //toOpen=minSpareThreads
if (toOpen > maxThreads) {
toOpen = maxThreads;
}
if (0 == currentThreadCount) { //如果當前線程池中的線程數量為0
pool = new Vector(toOpen); //創建一個有minSpareThreads數量的Vector
}
//因第二次增加時對第一次增加的線程不能重復增加.所要從currentThreadCount開始.
for (int i = currentThreadCount; i < toOpen; i++) { //先增加minSparethreads數量的線程.
pool.addElement(new ControlRunnable(this)); //Runnable實例對象,可用於創建線程
}
currentThreadCount = toOpen;
}
/**
* 監視線程,用於監聽當前空閒線程是否大於最大多余線程數量,如存在則關閉多余的空閒線程.
*/
class MonitorRunnable
implements Runnable {
ThreadPool p;
Thread t;
boolean shouldTerminate;
/**
* construct
* @param p ThreadPool
*/
MonitorRunnable(ThreadPool p) {
shouldTerminate = false;
this.p = p;
t = new Thread(this);
t.start();
}
public void run() {
while (true) {
try {
synchronized (this) {
this.wait(WORK_WAIT_TIMEOUT);
}
if (shouldTerminate) { //如果結束
break;
}
p.checkSpareControllers(); //檢查是否有多余線程.
}
catch (Throwable t) {
t.printStackTrace();
}
}
}
public void stop() {
this.terminate();
}
public synchronized void terminate() {
shouldTerminate = true;
this.notifyAll();
}
}
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved