由於最近需要用多線程處理一些問題,一開始我用了.net默認的ThreadPool, 感覺不是很適合。於是我自己實現了一個簡單的ThreadPool。
寫的比較簡單,有興趣的朋友一起看看,共同改進。
代碼主要由ThreadPoolEx,WorkItem,WorkQueue組成。
ThreadPoolEx
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Threading;
6using System.Collections;
7
8namespace NetDragon.ThreadPoolEx
9{
10 public class ThreadPoolEx
11 {
12 private WorkQueue _workQueue = new WorkQueue();
13
14 public int MaxThreadCount = 10;
15 public int MinThreadCount = 2;
16 private Hashtable _threadTable = null;
17
18 private int _threadCount = 0;
19 private int _inUseWorkThread = 0;
20
21 public double IdleTimeout = 10;
22
23 public ThreadPoolEx():this(10,2,2)
24 {
25 }
26
27 public ThreadPoolEx(int maxThreadCouont, int
minThreadCount, int idleTimeout)
28 {
29 MaxThreadCount = maxThreadCouont;
30
31 MinThreadCount = minThreadCount;
32
33 IdleTimeout = idleTimeout;
34
35 _threadTable = Hashtable.Synchronized(new
Hashtable(MaxThreadCount));
36 }
37
38 public void QueueUserWorkItem(WaitCallback
waitCallback, object objParams)
39 {
40 EnqueueWorkItem(waitCallback, objParams);
41 }
42
43 private void EnqueueWorkItem(WaitCallback
waitCallback,object objParams)
44 {
45 WorkItem workItem = new WorkItem() {
46
47 WorkCallback = waitCallback,
48 ObjParams = objParams
49 };
50
51 _workQueue.Push(workItem);
52
53 if (_inUseWorkThread + _waitWorkItem >
_threadTable.Count)
54 {
55 StartThread();
56 }
57 }
58
59 private void StartThread()
60 {
61 if (_threadTable.Count < MaxThreadCount)
62 {
63 ++_threadCount;
64
65 Thread thread = new Thread(ProcessWorkItems);
66
67 thread.IsBackground = true;
68
69 thread.Name = "ThreadPoolEx #" + _threadCount;
70
71 thread.Priority = ThreadPriority.Normal;
72
73 _threadTable[thread] = System.DateTime.Now;
74
75 thread.Start();
76 }
77 }
78
79 private void ProcessWorkItems()
80 {
81
82 try
83 {
84 while (true)
85 {
86 WorkItem workItem = _workQueue.Pop();
87
88 if (workItem == null)
89 {
90 bool isTimeout = CurThreadIsTimeOut();
91
92 if (isTimeout)
93 {
94 if (_threadTable.Count > MinThreadCount)
95 {
96 break;
97 }
98 }
99
100 System.Threading.Thread.Sleep(100);
101 }
102 else
103 {
104
105 try
106 {
107 _threadTable
[Thread.CurrentThread] = System.DateTime.Now;
108
Interlocked.Increment(ref _inUseWorkThread);
109
110
workItem.Execute();
111 }
112 catch (Exception)
113 {
114 // log
something
115 }
116 finally
117 {
118
Interlocked.Decrement(ref _inUseWorkThread);
119 }
120 }
121 }
122 }
123 catch (ThreadAbortException)
124 {
125 Thread.ResetAbort();
126 }
127 finally
128 {
129 if (_threadTable.Count >
MinThreadCount)
130 {
131 _threadTable.Remove
(Thread.CurrentThread);
132 }
133 }
134 }
135
136 private bool CurThreadIsTimeOut()
137 {
138 DateTime lastAliveTime = (DateTime)
_threadTable[Thread.CurrentThread];
139
140 DateTime curTime = System.DateTime.Now;
141
142 double waitSeconds = (curTime -
lastAliveTime).TotalSeconds;
143
144 if(waitSeconds > IdleTimeout)
145 {
146 return true;
147 }
148
149 return false;
150
151 }
152
153 private int _waitWorkItem
154 {
155 get
156 {
157 return _workQueue.Count;
158 }
159 }
160
161 public int ThreadCount
162 {
163 get
164 {
165 return _threadTable.Count;
166 }
167 }
168 }
169}
170
WorkQueue
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5
6namespace NetDragon.ThreadPoolEx
7{
8 class WorkQueue
9 {
10 private static object threadLock = new object();
11
12 private Queue<WorkItem> _workQueue = new
Queue<WorkItem>();
13
14 public WorkItem Pop()
15 {
16 lock (threadLock)
17 {
18 if (_workQueue.Count > 0)
19 {
20 return _workQueue.Dequeue();
21 }
22 return null;
23 }
24 }
25
26 public void Push(WorkItem workItem)
27 {
28 _workQueue.Enqueue(workItem);
29 }
30
31 public int Count
32 {
33 get
34 {
35 return _workQueue.Count;
36 }
37 }
38 }
39}
40
WorkItem
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Threading;
6
7namespace NetDragon.ThreadPoolEx
8{
9 class WorkItem
10 {
11 public WaitCallback WorkCallback;
12
13 public object ObjParams;
14
15 public void Execute()
16 {
17 WorkCallback(ObjParams);
18 }
19 }
20}
WorkQueue
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5
6namespace NetDragon.ThreadPoolEx
7{
8 class WorkQueue
9 {
10 private static object threadLock = new object();
11
12 private Queue<WorkItem> _workQueue = new Queue<WorkItem>();
13
14 public WorkItem Pop()
15 {
16 lock (threadLock)
17 {
18 if (_workQueue.Count > 0)
19 {
20 return _workQueue.Dequeue();
21 }
22 return null;
23 }
24 }
25
26
27 public void Push(WorkItem workItem)
28 {
29 lock (threadLock)
30 {
31 _workQueue.Enqueue(workItem);
32
33 }
34 }
35
36 public int Count
37 {
38 get
39 {
40 return _workQueue.Count;
41 }
42 }
43 }
44}
45
本文配套源碼