最近想在DIOCP中加入任務調度線程,DIOCP的工作線程作為生產者(producer)將接受到的數據對象,投遞到任務調度線程中,然後統一進行分配。然而這一切都需要一個隊列, 這幾天都在關注無鎖隊列。
首先是一個隊列,簡單的隊列就是,生產者把數據壓入隊列(push), 消費者通過隊列Pop出數據進行處理。
簡單的隊列就是提供Push,和Pop函數。
我們用一個鏈表來存儲數據。Head ->data01->data02...data_n->Tail, 每個數據塊的結構如下
type
PVarQueueBlock = ^TVarQueueBlock;
TVarQueueBlock = packed record
value:Variant;
next:PVarQueueBlock;
end;
1.在進行Push壓入數據時壓入將Tail.next指向新壓入的數據塊, 然後用新的數據塊做Tail
procedure TSimpleQueue.pushQueue(pvData: PVarQueueBlock);
begin
if FHead = nil then
begin
FHead = pvData;
FTail := FHead;
end else
begin
FTail.next := pvData;
FTail := pvData;
end;
Inc(FCount);
end;
2.在進行Pop數據時把Head數據塊取出,然後用Head數據塊指向的下一塊當作Head.
function TSimpleQueue.popQueue: PVarQueueBlock;
var
lvTemp, lvRet:PVarQueueBlock;
begin
lvTemp := FHead;
if (lvTemp = nil) then
begin //沒有任何可以Pop出的值
Result := nil;
exit;
end;
//
FHead := FHead.next;
Dec(FCount);
Result :=lvTemp;
end;
上面就是簡單的隊列
上面的實現的隊列在多線程情況下是不安全的。如果要在多並發下隊列要進行加鎖,在push和pop時加鎖也是一種辦法。可以直接用臨界就可以了,但是我們要做的是無鎖隊列
首先記住多並發設計規則:決不要假設任何代碼會連續執行
上面的push操作
FTail.next := pvData; FTail := pvData;
也許執行了FTail.next:=pvData後,會被另外的線程搶走,然後FTail進行了新的賦值,這樣在進行FTail := pvData;這樣整個數據鏈條就會被破壞。
如果這兩行我們能一次行完成,這樣就可以實現無鎖操作了,這樣我們需要引入原子操作.Interlocked中的函數。
說無鎖其實不太確切,只是鎖的粒度小了。我們是使用api的InterlockedCompareExchange函數來實現的。
查一下MSDN
http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560(v=vs.85).aspx
LONG __cdecl InterlockedCompareExchange( _Inout_ LONG volatile *Destination, _In_ LONG Exchange, _In_ LONG Comparand );
A pointer to the destination value.
The exchange value.
The value to compare to Destination.
The function returns the initial value of the Destination parameter.
大概解釋一下。這個函數是比較後進行交換。第一個參數是要存放目的的數據,第二個是交換數據,第三個是比較數據(與第一個比較), 如果交換返回的數據和第三個參數一樣。
這樣就可以在push和pop一步完成。
這裡貼出push的pop操作
procedure TVarQueue.pushQueue(pvData: PVarQueueBlock);
var
lvTemp:PVarQueueBlock;
lvPointer:Pointer;
begin
while True do
begin
lvTemp := FTail;
while lvTemp.next <> nil do lvTemp := lvTemp.next;
if InterlockedCompareExchangePointer(Pointer(lvTemp.next), Pointer(pvData), nil) = nil then
begin
break;
end;
end;
FTail := pvData;
Inc(FCount);
end;
function TVarQueue.popQueue: PVarQueueBlock;
var
lvTemp, lvRet:PVarQueueBlock;
lvPointer:Pointer;
begin
///為了方便 隊列中始終保留一個FHead數據塊
/// 也就是說FHead指向的下一個數據塊才是第一個數據塊
///
while True do
begin
lvTemp := FHead;
if (lvTemp = nil) or (lvTemp.next = nil) then
begin //沒有任何可以Pop出的值
Result := nil;
exit;
end;
if InterlockedCompareExchangePointer(Pointer(FHead), lvTemp.next, lvTemp) = lvTemp then
begin
break;
end;
end;
Dec(FCount);
lvRet := lvTemp.next;
Result := lvRet;
lvTemp.next := nil;
Dispose(lvTemp);
//返回的是head.next
end;
後續我會上傳完整的代碼到DIOCP項目中。
如有漏洞,敬請指出。歡迎假如DIOCP群討論