概述
.NET基於委托的APM(Asynchronous Programming Model)模型通過BeginInvoke, EndInvoke, AsyncCallback,IAsyncResult的組合使用,讓程序員可以方便的進行異步調用、異步回調和同步等待等操作。但.NET平台還沒有為線程的中止(abort)提供安全可靠的機制,也許正是基於這個原因APM並沒有包含異步調用的超時機制,而是把這個可能引起爭議的工作交給使用者自己來把握。
作為APM模型的補充,本文通過CancellableTask類提供了一個異步調用超時機制。CancellableTask類的設計有兩個主要的考慮:
1.保持APM風格,使用者依然可以使用熟悉的BeginInvoke, EndInvoke, IAsyncResult, AsyncCallback等;
2.提供基於Thread.Abort的默認超時處理,同時支持用戶自定義cancel回調。
使用
CancellableTask的構造函數包含workCallbak和cancelCallback(可選)兩參數,分別對應work回調和cancel回調。CancellableTask的BeginInvoke保持了APM的風格,可以看作是增加了timeout參數(單位:ms)的擴展版;而EndInvoke,AsyncCallback以及IAsyncResult的使用都和APM保持一致。Work委托產生的異常會在EndInvoke時拋出,同時若線程被超時中止,EndInvoke則會拋出ThreadAbortException異常。
下面是一段CancellableTask的使用示例:
class Program
{
static void Main(string[] args)
{
//默認超時直接abort線程
{
Console.WriteLine("[case 1]");
CancellableTask cancellableTask = new CancellableTask(Work);
State arg = new State { Loop = 20, Stop = false };
IAsyncResult asyncResult = cancellableTask.BeginInvoke(
arg,
(ar => Console.WriteLine("Async Callback")),
null,
10 * 1000);
asyncResult.AsyncWaitHandle.WaitOne();
try
{
object r = cancellableTask.EndInvoke(asyncResult);
Console.WriteLine("return " + r);
}
catch (ThreadAbortException)
{
Console.WriteLine("Thread Aborted");
}
catch (Exception exp)
{
Console.WriteLine(exp.ToString());
}
}
//自定義Cancel回調
{
Console.WriteLine(Environment.NewLine + "[case 2]");
CancellableTask cancellableTask = new CancellableTask(Work, Cancel);
State arg = new State { Loop = 20, Stop = false };
IAsyncResult asyncResult = cancellableTask.BeginInvoke(
arg,
(ar =>
{
try
{
object r = cancellableTask.EndInvoke(ar);
Console.WriteLine("return " + r);
}
catch (ThreadAbortException)
{
Console.WriteLine("Thread Aborted");
}
catch (Exception exp)
{
Console.WriteLine(exp.ToString());
}
}
),
arg,
10 * 1000);
}
Console.ReadLine();
}
static object Work(object arg)
{
State state = arg as State;
int i;
for (i = 0; i < state.Loop; i++)
{
if (state.Stop) break;
Console.WriteLine(i);
Thread.Sleep(1000);
}
return i;
}
static void Cancel(object state)
{
State st = state as State;
st.Stop = true;
}
}
internal class State
{
public int Loop { get; set; }
public bool Stop { get; set; }
}
實現
CancellableTask通過wrapper對workCallback進行包裝。Wrapper內部首先創建等待事件e,並通過ThreadPool.RegisterWaitForSingleObject注冊事件和WaitOrTimeout回調,然後調用workCallback。若workCallback提前返回,調用e.Set(),ThreadPool會調用WaitOrTimeout回調,isTimeout參數為false,不進行處理;否則,當workCallback超時未返回,ThreadPool會調用WaitOrTimeout回調,isTimeout參數為true。WaitOrTimeout回調在isTimeout情況下,首先判斷是否有自定義cancel回調,如果有則采用自定義回調;否則,默認情況下調用Thread.Abort終止work線程。下面是CancellableTask的實現細節:
public class CancellableTask
{
public delegate object WorkCallback(object arg);
public delegate void CancelCallback(object state);
protected class TimeoutState
{
internal Thread thread;
internal object state;
public TimeoutState(Thread thread, object state)
{
this.thread = thread;
this.state = state;
}
}
protected WorkCallback workCallback;
protected CancelCallback cancelCallback;
protected WorkCallback wrapper;
public CancellableTask(WorkCallback workCallback)
{
this.workCallback = workCallback;
}
public CancellableTask(WorkCallback workCallback, CancelCallback cancelCallback)
{
this.workCallback = workCallback;
this.cancelCallback = cancelCallback;
}
public IAsyncResult BeginInvoke(object arg, AsyncCallback asyncCallback, object state, int timeout)
{
wrapper = delegate(object argv)
{
AutoResetEvent e = new AutoResetEvent(false);
try
{
TimeoutState waitOrTimeoutState = new TimeoutState(Thread.CurrentThread, state);
ThreadPool.RegisterWaitForSingleObject(e, WaitOrTimeout, waitOrTimeoutState, timeout, true);
return workCallback(argv);
}
finally
{
e.Set();
}
};
IAsyncResult asyncResult = wrapper.BeginInvoke(arg, asyncCallback, state);
return asyncResult;
}
public object EndInvoke(IAsyncResult result)
{
return wrapper.EndInvoke(result);
}
protected void WaitOrTimeout(object state, bool isTimeout)
{
try
{
if (isTimeout)
{
TimeoutState waitOrTimeoutState = state as TimeoutState;
if (null != cancelCallback)
{
cancelCallback(waitOrTimeoutState.state);
}
else
{
waitOrTimeoutState.thread.Abort();
}
}
}
catch { }
}
}
總結
本文為.NET APM模型提供了異步超時機制擴展,一方面保持了APM編程風格,另一方面支持用戶自定義cancel回調。需要注意的是,默認的cancel方式Thread.Abort的安全性問題,使用時應注意資源釋放等。