程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 線程池任務隊列

線程池任務隊列

編輯:C++入門知識

[delphi]
unit uPool; 
 
{***********************************************************************
 
                       線程池+任務隊列
 
         整個線程池調度圖
         ==========================================================
         |  -----   ----------------------                         |
         |  |空 |   | 任務隊列   ←----    | ⑴                     |
         |  |閒 |   ----------------------                         |
         |  |線 |     ↑空閒線程檢查隊列是否有任務                  |
         |  |程 |--①-- 有任務要執行時,加入到工作隊列              |
         |  |隊 |            |                                     |
         |  |列 |            ↓②               ----------------    |
         |  |   |   -----------------------    |  自動回收空   |   |
         |  |   |   |正在工作隊列          |   |  閒定時器     |   |
         |  |   |   -----------------------    ----------------    |
         |  |   |     ③     | 任務做完後              |           |
         |  ----- ←----------| 調度到空閒隊列          |           |
         |    |                                        |           |
         |    -----------------------------------------|           |
         |             ④定時回收空閒線程                          |
         |                                                         |
         ==========================================================
 
         使用方法:
 
         pool = TThreadPool.Create;
         pool.MinNums := 2; //最小線程
         pool.MaxNums := 6; //最大線程
         pool.TasksCacheSize := 10; //任務緩沖隊列
 
         上面創建好之後,就可以往池中放任務
 
         pool.AddWorkTask(Task);
 
         線程池就開始工作了。
         同時線程池支持對任務進行優先級排序,排序算法默認
         為快速序,也可以外問進行隊列排序
 
         這裡把任務和池分開了。
         使用任務時,需要繼承TWorkTask進開自己的任務設計。
         然後重寫exectask;方法。如果方法中要進行毫時循環,
         請見如下例子;
         for i := 0 to 5000 do
          begin
            if tk.WorkState = tsFinished then break;
              inc(k);
              //caption := inttostr(k);
            edit2.Text := inttostr(k);
          end;
 
         如:TWirteFileTask = Class(TWorkTask);
 
 
        作者:邊緣
        @RightCopy fsh
        QQ: 19985430
        date: 2012-09-22
        Email:[email protected]
***********************************************************************} 
 
interface 
 
uses 
    Classes,Windows,SysUtils,Messages,SyncObjs; 
 
Const 
    PRE_NUM = 5; 
    MAX_NUM = 100; 
    AUTO_FREE = 2; 
    MAX_TASKNUM = 100; 
    ONEMINUTE = 10000;//60000; 
 
  type 
    TLogLevel = (lDebug,lInfo,lError); 
 
    ILog = interface 
      procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug); 
    end; 
 
    TPoolLog = Class(TInterfacedObject,ILog) 
       private 
          procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug); 
       public 
          procedure OutputLog(Const Msg:String;Level:TLogLevel);virtual; 
    End; 
 
 
    Thandles = Array of Cardinal; 
 
    //任務級別  優先級高的任務先執行。 
    TTaskLevel = (tlLower,tlNormal,tlHigh); 
    TTaskState = (tsNone,tsDoing,tsWaiting,tsReStart,tsStop,tsFinished); 
    TWorkTask = Class 
       private 
          Work:TThread; 
          //任務ID 
          hTask:TCriticalSection; 
          FWorkId:Cardinal; 
          FWorkName:String; 
          FWorkLevel:TTaskLevel; //默認為普通 
          FWorkState : TTaskState; 
          procedure setWorkState(Const Value:TTaskState); 
       public 
          Constructor Create; 
          Destructor Destroy;override; 
          procedure execTask;virtual; abstract; 
          property WorkId:Cardinal read FWorkId write FWorkId; 
          property WorkName:String read FWorkName write FWorkName; 
          property WorkLevel:TTaskLevel read FWorkLevel write FWorkLevel; 
          property WorkState : TTaskState read FWorkState write setWorkState; 
    End; 
 
    TWorkTaskQueue = Array of TWorkTask; 
     
    TThreadPool = Class; 
 
    TWorkThreadState = (wtIdle,wtRunning,wtStop,wtFinished); 
    //工作線程(單個線程一次只能處理一個task) 
    TWorkThread = Class(TThread) 
      private 
        FPool:TThreadPool; 
        FState:TWorkThreadState; 
        procedure SetDefault; 
      protected 
        procedure Execute;override; 
      public 
        Constructor Create(Const pool:TThreadPool); 
        property State : TWorkThreadState read FState write FState; 
    End; 
 
    TWorkThreadQueue = Array of TWorkThread; 
 
    //查看緩沖情況事件 
    TListenCacheInfoEvent = procedure (Sender:TObject;Const IdleCount,BusyCount,TaskCount:Integer) of Object; 
    TTaskQueueFullEvent = procedure (Sender:TObject) of Object; 
    //任務處理完後 
    TTaskFinishedEvent = procedure (Const cTast:TWorkTask) of object; 
    //任務准備被處理前事件 
    TTaskWillDoBeforeEvent = procedure (Const thId:Cardinal;Const cTast:TWorkTask) of Object; 
    //外部排序任務隊列算法,默認為快速排序,可自行在外部定制算法。 
    TSortTaskQueueEvent = procedure (Sender:TObject;var taskQueue:TWorkTaskQueue) of object; 
 
    TThreadPool = Class 
     private 
       Log:TPoolLog; 
       //自動回收標識 
       FAuto:Boolean; 
       //定時等待控制 
       FWaitFlag:Boolean; 
       //表示正在用於等待回收到的線程 
       Waiting:TWorkThread; 
       //提取任務通知信號 
       entTaskNotify:Tevent; 
       //時間事件HANDLE 
       hTimeJump:Cardinal; 
       //是否排序任務隊列 
       FSorted:Boolean; 
       //對空閒隊列操作鎖 
       hIDleLock:TCriticalSection; 
       //對正在進行的線程鎖 
       hBusyLock:TCriticalSection; 
       //任務隊列鎖 
       hTaskLock:TCriticalSection; 
       //預設線程數 默認為5 發現忙不過來時才進行自增直到Max 
       FMinNums:Integer; 
       //最大限制線程數,默認為100 
       FMaxNums:Integer; 
       //任務隊列緩沖大小 默認100 
       FTasksCache:Integer; 
       //當線程空閒時間長達XX時自動回收 :單位為分鐘 
       FRecoverInterval:Integer; 
       //是否允許隊列中存在重復任務 (同一任務時要考慮線程同步),默認為否 
       FIsAllowTheSameTask:Boolean; 
       //任務隊列 (不釋放外部任務) 最大100個任務。當大於100個任務時,需要等待 
       //每抽取一個任務,立即從隊列中刪除。 
       TaskQueue:TWorkTaskQueue; 
       //工作線程 
       BusyQueue:TWorkThreadQueue; 
       //空閒線程 
       IdleQueue:TWorkThreadQueue; 
 
       //************************事件回調**********************// 
       //排序隊列回調 
       FOnSortTask:TSortTaskQueueEvent; 
       FOnTaskWillDo:TTaskWillDoBeforeEvent; 
       FOnTaskFinished:TTaskFinishedEvent; 
       FOnTaskFull:TTaskQueueFullEvent; 
       FOnListenInfo:TListenCacheInfoEvent; 
       //*****************************************************// 
        
       //************************Get/Set操作*******************// 
       procedure SetMinNums(Const Value:Integer); 
       function getTaskQueueCount: Integer; 
       function getBusyQueueCount: Integer; 
       function getIdleQueueCount: Integer; 
       //*****************************************************// 
 
       //***********************同步量處理********************// 
       procedure CreateLock; 
       procedure FreeLock; 
       //*****************************************************// 
 
       //設置初值 
       procedure SetDefault; 
       //處理回調 
       procedure DoTaskFull; 
 
       //********************線程隊列操作**********************// 
       //清空線程隊列 
       procedure ClearQueue(var Queue:TWorkThreadQueue); 
       //得到隊列的長度 
       function QueueSize(Const Queue:TWorkThreadQueue):Integer; 
       //調整隊列 
       procedure DelQueueOfIndex(var Queue:TWorkThreadQueue;Const Index:Integer); 
       //移動隊列; 
       procedure MoveQueue(Const wt:TWorkThread;flag:Integer); 
       //移除某個線程 
       procedure RemoveFromQueue(var Queue:TWorkThreadQueue;Const re:TWorkThread); 
       //*****************************************************// 
 
       //********************任務隊列操作**********************// 
       //排序隊列將優先級高的排前面。//可以交給外問進行排序算法 
       procedure SortTask(var Queue:TWorkTaskQueue); 
       //調整隊列 
       procedure DelTaskOfIndex(var Queue:TWorkTaskQueue;Const Index:Integer); 
       //獲取隊列大小 
       function TaskSzie(Const Queue:TWorkTaskQueue):Integer; 
       //*****************************************************// 
       //查找任務(如果有好的算法,哪更高效) 
       function FindTask(Const tsk:TWorkTask):Integer; 
       //快速排序 
       procedure QuikeSortTask(var Queue:TWorkTaskQueue;Const s,e:Integer); 
       //自動回收空閒線程 
       procedure RecoverIDle(Const wait:TWorkThread); 
       //交換任務 
       procedure switch(var Queue: TWorkTaskQueue; m, n: Integer); 
       //判斷當前運行線程是否使用在等待自動回收 
       function WaitAutoRecover(Const curThread:TWorkThread):Boolean; 
     protected 
       //求最小值 
       function Smaller(Const expresion:Boolean;Const tureValue,falseValue:Integer):Integer; 
       //按照先進選出進行提取任務 
       function PickupTask:TWorkTask; 
       //創建空閒線程 
       procedure CreateIdleThread(Const Nums:Integer = 1); 
       //添加到空閒線程隊列 
       procedure AddThreadToIdleQueue(Const idle:TWorkThread); 
       //添加到工作隊列 
       procedure AddThreadToBusyQueue(Const busy:TWorkThread); 
       //發送新任務到達信號 
       procedure PostNewTaskSign; 
        
     public 
       Constructor Create; 
       Destructor Destroy;override; 
       //***********************線程池管理方法******************************// 
       //停止執行的任務 
       procedure StopAll; 
       //開始任務 
       procedure StartAll; 
       //清空任務 
       procedure CleanTasks; 
       //運行中不能進行對調 
       function  SwitchTasks(Const aTask,bTask:TWorkTask):Boolean; 
       //移除某個任務 
       procedure RemoveTask(Const tk:TWorkTask);//只允許移除未執行的任務 
       //需要外部定時調用來得到動態數據效果 
       procedure ListenPool; 
       //******************************************************************// 
       //添加任務 
       function AddWorkTask(Const wtask:TWorkTask):Integer; 
 
       property MinNums:Integer read FMinNums write SetMinNums; 
       property MaxNums:Integer read FMaxNums write FMaxNums; 
       property TasksCacheSize:Integer read FTasksCache write FTasksCache; 
       property RecoverInterval:Integer read FRecoverInterval 
                write FRecoverInterval; 
       property IsAllowTheSameTask:Boolean read FIsAllowTheSameTask 
                write FIsAllowTheSameTask; 
       property Sorted:Boolean read FSorted write FSorted; 
       property TaskQueueCount:Integer read getTaskQueueCount; 
       property IdleQueueCount:Integer read getIdleQueueCount; 
       property BusyQueueCount:Integer read getBusyQueueCount; 
       property OnSortTask:TSortTaskQueueEvent read FOnSortTask write FOnSortTask; 
       property OnTaskWillDo:TTaskWillDoBeforeEvent read FOnTaskWillDo write FOnTaskWillDo; 
       property OnTaskFinished:TTaskFinishedEvent read FOnTaskFinished write FOnTaskFinished; 
       property OnTaskFull:TTaskQueueFullEvent read FOnTaskFull write FOnTaskFull; 
       property OnListenInfo:TListenCacheInfoEvent read FOnListenInfo write FOnListenInfo; 
    End; 
 
implementation 
 
{ TThreadPool } 
 
constructor TThreadPool.Create; 
var 
  tpError:Cardinal; 
begin 
   Log:=TPoolLog.Create; 
   SetDefault; 
   CreateLock; 
 
   tpError := 0; 
 
   entTaskNotify:=Tevent.create(nil,false,false, 'TaskNotify');//事件信號 
   hTimeJump := CreateEvent(nil,False,False,'Timer');//自動回收心跳事件 
   if hTimeJump = 0 then 
       tpError := GetLastError; 
        
   //the same name of sign exists. 
   Case tpError of 
     ERROR_ALREADY_EXISTS: 
                       begin 
                          hTimeJump := 0; 
                          Log.WriteLog('CreateTimerEvent Fail,the Same Name of Event Exists'); 
                       end; 
   End; 
   //預創建線程 
   CreateIdleThread(FMinNums); 
   Log.WriteLog('Thread Pool start run.',lInfo); 
end; 
 
destructor TThreadPool.Destroy; 
begin 
   ClearQueue(IdleQueue); 
   ClearQueue(BusyQueue); 
   FreeLock; 
   if hTimeJump > 0 then 
      CloseHandle(hTimeJump); 
   entTaskNotify.Free; 
   Log.Free; 
  inherited; 
  Log.WriteLog('Thread Pool end run.',lInfo); 
end; 
 
procedure TThreadPool.DoTaskFull; 
begin 
   if Assigned(FOnTaskFull) then 
      FOnTaskFull(self); 
end; 
 
procedure TThreadPool.SetDefault; 
begin 
   FMinNums := PRE_NUM; 
   FMaxNums := MAX_NUM; 
   FTasksCache := MAX_TASKNUM; 
   FRecoverInterval := AUTO_FREE; 
   FIsAllowTheSameTask := False; 
   FAuto :=False; 
   FWaitFlag := True; 
   Waiting := nil; 
   FSorted := False; 
end; 
 
procedure TThreadPool.CreateLock; 
begin 
   hIDleLock := TCriticalSection.Create; 
   hBusyLock := TCriticalSection.Create; 
   hTaskLock := TCriticalSection.Create; 
end; 
 
procedure TThreadPool.FreeLock; 
begin 
   hIDleLock.Free; 
   hBusyLock.Free; 
   hTaskLock.Free; 
end; 
 
function TThreadPool.getBusyQueueCount: Integer; 
begin 
   Result := QueueSize(BusyQueue); 
end; 
 
function TThreadPool.getIdleQueueCount: Integer; 
begin 
   Result := QueueSize(IdleQueue); 
end; 
 
function TThreadPool.getTaskQueueCount: Integer; 
begin 
   Result := TaskSzie(TaskQueue); 
end; 
 
procedure TThreadPool.CleanTasks; 
begin 
   hTaskLock.Enter; 
   SetLength(TaskQueue,0); 
   hTaskLock.Leave; 
end; 
 
procedure TThreadPool.ListenPool; 
begin 
   //正在執行任務的線程,空閒線程,隊列中任務數 
   if Assigned(FOnListenInfo) then 
      FOnListenInfo(self,IdleQueueCount,BusyQueueCount,TaskQueueCount); 
end; 
 
procedure TThreadPool.ClearQueue(var Queue: TWorkThreadQueue); 
var 
   i:Integer; 
   sc:Integer; 
begin 
   sc := Length(Queue); 
   for i := 0 to sc - 1 do 
   begin 
       TWorkThread(Queue[i]).Terminate; 
       PostNewTaskSign; 
       //TWorkThread(Queue[i]).Free; //如果FreeOnTerminate為TRUE就不要使用這句了。 
   end; 
   SetLength(Queue,0); 
end; 
 
procedure TThreadPool.SetMinNums(const Value: Integer); 
begin 
   if Value = 0 then 
      FMinNums := PRE_NUM 
   else if FMinNums > Value then 
        begin 
          //先清容再創建 
          FMinNums := Value; 
          ClearQueue(IDleQueue); 
        end 
   else 
     FMinNums := Value; 
 
   CreateIdleThread(FMinNums); 
   Log.WriteLog('Reset MinNums Numbers is ' + inttostr(FMinNums) + ' .',lInfo); 
end; 
 
 
function TThreadPool.Smaller(const expresion: Boolean; const tureValue, 
  falseValue: Integer): Integer; 
begin 
   if expresion then 
      result := tureValue 
   else 
      result := falseValue; 
end; 
 
procedure TThreadPool.DelQueueOfIndex(var Queue: TWorkThreadQueue; 
  const Index: Integer); 
var 
   i:integer; 
   ic:integer; 
begin 
   ic := Length(Queue); 
   for i := Index to ic - 1 do 
       Queue[i] := Queue[i+1]; 
 
   setLength(Queue,ic-1); 
end; 
 
procedure TThreadPool.DelTaskOfIndex(var Queue: TWorkTaskQueue; 
  const Index: Integer); 
var 
   i:integer; 
   ic:integer; 
begin 
   ic := length(Queue); 
   for i := Index to ic -1 do 
       Queue[i] := Queue[i+1]; 
 
   setLength(Queue,ic-1); 
end; 
 
procedure TThreadPool.MoveQueue(const wt: TWorkThread; flag: Integer); 
var 
    k:integer; 
begin 
   if flag = 0 then 
   begin 
     hIDleLock.Enter; 
     for k := Low(IdleQueue) to High(IdleQueue) do 
     begin 
        if IdleQueue[k]=wt then 
        begin 
           AddThreadToBusyQueue(wt); 
           DelQueueOfIndex(IdleQueue,k); 
        end; 
     end; 
     hIDleLock.Leave; 
   end 
   else 
   begin 
     hBusyLock.Enter; 
     for k := Low(BusyQueue) to High(BusyQueue) do 
     begin 
        if BusyQueue[k]=wt then 
        begin 
           AddThreadToIdleQueue(wt); 
           DelQueueOfIndex(BusyQueue,k); 
        end; 
     end; 
     hBusyLock.Leave; 
   end; 
end; 
 
function TThreadPool.SwitchTasks(const aTask, bTask: TWorkTask): Boolean; 
var 
   aIndex,bIndex:Integer; 
begin 
   Result := true; 
   hTaskLock.Enter; 
   aIndex := FindTask(aTask); 
   bIndex := FindTask(bTask); 
    
   if (aIndex = -1) or (bIndex = -1) then 
   begin 
      Result := false; 
      hTaskLock.Leave; 
      exit; 
   end; 
   switch(TaskQueue,aIndex,bIndex); 
   hTaskLock.Leave; 
end; 
 
function TThreadPool.TaskSzie(const Queue: TWorkTaskQueue): Integer; 
begin 
   Result := Length(Queue); 
end; 
 
function TThreadPool.WaitAutoRecover(const curThread: TWorkThread): Boolean; 
begin 
   Result := Waiting = curThread; 
end; 
 
procedure TThreadPool.CreateIdleThread(const Nums: Integer); 
var 
   WorkThread:TWorkThread; 
   i:integer; 
begin 
   hIDleLock.Enter; 
   for i := 0 to Nums - 1 do 
   begin 
     WorkThread := TWorkThread.Create(self); 
     WorkThread.FreeOnTerminate := true; 
     AddThreadToIdleQueue(WorkThread); 
   end; 
   hIDleLock.Leave; 
end; 
 
procedure TThreadPool.AddThreadToBusyQueue(const busy: TWorkThread); 
var 
   sz:integer; 
begin 
   sz := QueueSize(BusyQueue); 
   setLength(BusyQueue,sz + 1); 
   BusyQueue[sz] := busy; 
end; 
 
procedure TThreadPool.AddThreadToIdleQueue(const idle: TWorkThread); 
var 
   sz:integer; 
begin 
   sz := Length(IdleQueue); 
   setLength(IdleQueue,sz + 1); 
   IdleQueue[sz] := idle; 
end; 
 
function TThreadPool.PickupTask: TWorkTask; 
begin 
   //先排序再取 
   hTaskLock.enter; 
 
   if FSorted then 
      SortTask(TaskQueue); 
 
   if length(TaskQueue) > 0 then 
   begin 
      Result := TaskQueue[0]; 
      DelTaskOfIndex(TaskQueue,0); 
   end 
   else 
      Result := Nil; 
   hTaskLock.Leave; 
end; 
 
function TThreadPool.AddWorkTask(Const wtask: TWorkTask):Integer; 
var 
   sz,ic,bc:Integer; 
begin 
   sz := Length(TaskQueue); 
   if sz >= FTasksCache  then 
   begin 
      Result := -1; 
      DoTaskFull; 
      exit; 
   end; 
 
   setLength(TaskQueue,sz+1); 
   wtask.WorkState := tsWaiting; 
   TaskQueue[sz] := wtask; 
 
   Result := sz + 1; 
 
   //未達到最大線程數時增加 
   ic := IdleQueueCount; 
   bc := BusyQueueCount; 
 
   //最大只能ic + bc = MaxNums 
   if (ic = 0) and (ic+ bc < FMaxNums) then 
      CreateIdleThread(); 
       
   FAuto := True; 
   //通知線程去取任務 
   PostNewTaskSign; 
   Log.WriteLog('Add a task to queue.',lInfo); 
end; 
 
function TThreadPool.FindTask(const tsk: TWorkTask): Integer; 
var 
   l:Integer; 
begin 
   Result := -1; 
   for l := Low(TaskQueue) to High(TaskQueue) do 
       if TaskQueue[l] = tsk then 
       begin 
         Result := l; 
         Break; 
       end; 
end; 
 
procedure TThreadPool.PostNewTaskSign; 
begin 
   entTaskNotify.SetEvent; 
end; 
 
procedure TThreadPool.switch(var Queue:TWorkTaskQueue;m,n:Integer); 
var 
 tem:TWorkTask; 
begin 
  tem := Queue[m]; 
  Queue[m] := Queue[n]; 
  Queue[n] := tem; 
end; 
 
procedure TThreadPool.QuikeSortTask(var Queue: TWorkTaskQueue; const s, 
  e: Integer); 
var 
   key:Integer; 
   k,j:Integer; 
begin 
   key := ord(Queue[s].WorkLevel); 
 
   if s > e then exit; 
 
   k := s; 
   j := e; 
 
   while (k <> j) do 
   begin 
     while (k < j) and (ord(Queue[j].WorkLevel) <= key) do //如果排序從小到大時改為 >= 
         dec(j); 
     switch(Queue,k,j); 
 
     while (k < j) and (ord(Queue[k].WorkLevel) >= key) do //如果排序從小到大時改為 <= 
         inc(k); 
     Switch(Queue,j,k); 
   end; 
 
   if s < k-1 then 
      QuikeSortTask(Queue,s,k-1); 
   if k+1 < e then 
      QuikeSortTask(Queue,k+1,e); 
end; 
 
procedure TThreadPool.SortTask(var Queue: TWorkTaskQueue); 
var 
   f,l:Integer; 
   ic:Integer; 
begin 
   ic := Length(Queue); 
   if ic = 0 then exit; 
    
   if Assigned(FOnSortTask) then 
      FOnSortTask(self,Queue) 
   else 
   begin 
      f := 0; 
      l := ic-1; 
      QuikeSortTask(Queue,f,l); 
   end; 
end; 
 
procedure TThreadPool.StartAll; 
var 
   I:Integer; 
begin 
   hBusyLock.Enter; 
   for I := Low(BusyQueue) to High(BusyQueue) do 
   begin 
     BusyQueue[i].Resume; 
     BusyQueue[i].State := wtRunning; 
   end; 
   hBusyLock.Leave; 
 
   hIDleLock.Enter; 
   for I := Low(IdleQueue) to High(IdleQueue) do 
   begin 
     IdleQueue[i].Resume; 
     IdleQueue[i].State := wtRunning; 
   end; 
   hIDleLock.Leave; 
end; 
 
procedure TThreadPool.StopAll; 
var 
   I:Integer; 
begin 
   hBusyLock.Enter; 
   for I := Low(BusyQueue) to High(BusyQueue) do 
   begin 
     BusyQueue[i].Suspend; 
     BusyQueue[i].State := wtStop; 
   end; 
   hBusyLock.Leave; 
 
   hIDleLock.Enter; 
   for I := Low(IdleQueue) to High(IdleQueue) do 
   begin 
     IdleQueue[i].Suspend; 
     IdleQueue[i].State := wtStop; 
   end; 
   hIDleLock.Leave; 
end; 
 
function TThreadPool.QueueSize(const Queue: TWorkThreadQueue):Integer; 
begin 
  Result := Length(Queue); 
end; 
 
//每次只留單線程進行空閒回收等待 
procedure TThreadPool.RecoverIDle(Const wait:TWorkThread); 
var 
   k:Integer; 
begin 
   FAuto:=False; 
   //等待時間超時 
   FWaitFlag := False; 
   Waiting := wait; 
   hBusyLock.Enter; 
   RemoveFromQueue(BusyQueue,wait); 
   hBusyLock.Leave; 
   //補給一個空閒線程 
   CreateIdleThread(); 
   WaitforSingleObject(hTimeJump,FRecoverInterval*ONEMINUTE); 
 
   //滿足空閒時間到後並且空閒線程大於零,沒有線程在執行任務,及任務隊列為空 
   if (IdleQueueCount > 0) 
      and (BusyQueueCount = 0) //正在等待的是清空空閒線程 
      and (TaskQueueCount = 0) then 
   begin 
      hTaskLock.Enter; 
      //回收到最小設定線程 
      for k := High(IdleQueue) Downto FMinNums do 
      begin 
         TWorkThread(IdleQueue[k]).Terminate; 
         PostNewTaskSign; 
      end; 
      SetLength(IdleQueue,FMinNums); 
      hTaskLock.Leave; 
   end; 
   //定時完後線程釋放 
   wait.Terminate; 
   FWaitFlag := True; 
end; 
 
procedure TThreadPool.RemoveFromQueue(var Queue: TWorkThreadQueue; 
  const re: TWorkThread); 
var 
   index ,i: integer; 
begin 
   index := -1; 
   for i := Low(Queue) to High(Queue) do 
   begin 
       if Queue[i] = re then 
       begin 
          index := i; 
          break; 
       end; 
   end; 
    
   if Index<>-1 then 
      DelQueueOfIndex(Queue,index); 
end; 
 
procedure TThreadPool.RemoveTask(const tk: TWorkTask); 
var 
   index:Integer; 
begin 
   index := FindTask(tk); 
   if index = -1 then Exit; 
   hTaskLock.Enter; 
   DelTaskOfIndex(TaskQueue,index); 
   hTaskLock.Leave; 
end; 
 
{ TWorkThread } 
 
constructor TWorkThread.Create(const pool: TThreadPool); 
begin 
   FPool := pool; 
   SetDefault; 
   inherited Create(false); 
end; 
 
procedure TWorkThread.Execute; 
var 
  hd:Array[0..0] of Cardinal; 
  ret:Cardinal; 
  task:TWorkTask; 
  nc:Integer; 
begin 
   //不斷的在任務隊列中取任務 
   hd[0]:= fPool.entTaskNotify.Handle; 
   while not Terminated do 
   begin 
      //跟蹤時為什麼會暫停不了,是因為前面在設置MinNums時有信號增加 
      ret := WaitForMultipleObjects(1,@hd,false,INFINITE); 
 
      if Terminated then break; 
 
      Case ret - WAIT_OBJECT_0 of 
      WAIT_OBJECT_0: 
           begin 
                if state <> wtRunning then  
                begin 
                    try 
                      //抽取一個任務 
                      task := FPool.PickupTask; 
 
                      if assigned(task) then 
                      begin 
                         //任務啟動前 
                         if Assigned(fPool.FOnTaskWillDo) then 
                            fPool.FOnTaskWillDo(self.ThreadID,task); 
 
                         //需要線程同步,以防正在執行的任務被其它線程執行。 
                         task.hTask.Enter; 
                         //當有任務做時,將自己移到工作隊列中 
                         fPool.MoveQueue(self,0); 
                         state := wtRunning; 
                         //指定執行線程 
                         task.Work := self; 
                         task.WorkState := tsDoing; 
                         task.execTask; 
                         state := wtFinished; 
                         task.WorkState := tsFinished; 
                         task.Work := nil; 
                         task.hTask.leave; 
                         //任務完成 
                         if Assigned(fPool.FOnTaskFinished) then 
                            fPool.FOnTaskFinished(task); 
                      end; 
 
                    finally 
 
                    end; 
 
                end; 
           end; 
         WAIT_OBJECT_0 + 1:;//Terminate  don't to do something 
      End; 
 
      nc := fPool.TaskQueueCount; 
      if (nc > 0) then 
        fpool.PostNewTaskSign 
      else if (fPool.FAuto) and (fPool.FWaitFlag) then 
         fPool.RecoverIDle(self);//任務空閒,線程空閒時間大於設定時間時自動回收空閒線程 
 
      state := wtIdle; 
      //將自己移至空閒線程 
      if not fPool.WaitAutoRecover(self) then //如果當前正在等待自動回收線程的 
         fPool.MoveQueue(self,1) 
      else 
         fPool.Waiting := nil; 
   end; 
end; 
 
procedure TWorkThread.SetDefault; 
begin 
   FState := wtIdle; 
end; 
 
{ TWorkTask } 
 
constructor TWorkTask.Create; 
begin 
   hTask := TCriticalSection.Create; 
   WorkState := tsNone; 
   FWorkLevel := tlNormal; 
   Work := nil; 
end; 
 
destructor TWorkTask.Destroy; 
begin 
   WorkState := tsFinished; 
   if Assigned(Work) then 
      Work.Resume; 
   hTask.Free; 
  inherited; 
end; 
 
procedure TWorkTask.setWorkState(Const Value:TTaskState); 
begin 
 
   FWorkState := Value; 
 
   case value of 
     tsReStart: 
          begin 
            if Assigned(Work) and (Work.Suspended)  then 
            begin 
                FWorkState := tsDoing; 
                Work.Resume; 
            end; 
          end; 
     tsStop: 
          begin 
            if Assigned(Work) then 
                Work.Suspend; 
          end; 
   end; 
end; 
 
{ TPoolLog } 
 
procedure TPoolLog.OutputLog(const Msg: String; Level: TLogLevel); 
begin 
   // to implement at sub class. 
end; 
 
procedure TPoolLog.WriteLog(const Msg: String; Level: TLogLevel); 
var 
   dt:TDatetime; 
begin 
   dt := now; 
   OutputLog(datetimetostr(dt) + ' : ' + Msg,Level); 
end; 
 
end. 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved