程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 更多編程語言 >> Delphi >> DIOCP開源項目-Delphi高性能無鎖隊列(lock-free)

DIOCP開源項目-Delphi高性能無鎖隊列(lock-free)

編輯:Delphi

最近想在DIOCP中加入任務調度線程,DIOCP的工作線程作為生產者(producer)將接受到的數據對象,投遞到任務調度線程中,然後統一進行分配。然而這一切都需要一個隊列, 這幾天都在關注無鎖隊列。

 

[隊列]

首先是一個隊列,簡單的隊列就是,生產者把數據壓入隊列(push), 消費者通過隊列Pop出數據進行處理。

簡單的隊列就是提供Push,和Pop函數。

我們用一個鏈表來存儲數據。Head ->data01->data02...data_n->Tail, 每個數據塊的結構如下

type
  PVarQueueBlock = ^TVarQueueBlock;
  TVarQueueBlock = packed record
    value:Variant;
    next:PVarQueueBlock;
  end;

 

1.在進行Push壓入數據時壓入將Tail.next指向新壓入的數據塊, 然後用新的數據塊做Tail

procedure TSimpleQueue.pushQueue(pvData: PVarQueueBlock);
begin
  if FHead = nil then
  begin
    FHead = pvData;
    FTail := FHead;
  end else
  begin
    FTail.next := pvData;
    FTail := pvData;
  end;
  Inc(FCount);
end;

 

 

2.在進行Pop數據時把Head數據塊取出,然後用Head數據塊指向的下一塊當作Head.

function TSimpleQueue.popQueue: PVarQueueBlock;
var
  lvTemp, lvRet:PVarQueueBlock;
begin
  lvTemp := FHead;
  if (lvTemp = nil) then
  begin        //沒有任何可以Pop出的值
    Result := nil;
    exit;
  end;

  //
  FHead := FHead.next;

  Dec(FCount);
  Result :=lvTemp;
end;

 

上面就是簡單的隊列

 

[無鎖隊列]

上面的實現的隊列在多線程情況下是不安全的。如果要在多並發下隊列要進行加鎖,在push和pop時加鎖也是一種辦法。可以直接用臨界就可以了,但是我們要做的是無鎖隊列

 

首先記住多並發設計規則:決不要假設任何代碼會連續執行

 

上面的push操作

FTail.next := pvData;
FTail := pvData;

也許執行了FTail.next:=pvData後,會被另外的線程搶走,然後FTail進行了新的賦值,這樣在進行FTail := pvData;這樣整個數據鏈條就會被破壞。

如果這兩行我們能一次行完成,這樣就可以實現無鎖操作了,這樣我們需要引入原子操作.Interlocked中的函數。

 

說無鎖其實不太確切,只是鎖的粒度小了。我們是使用api的InterlockedCompareExchange函數來實現的。

查一下MSDN

http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560(v=vs.85).aspx

LONG __cdecl InterlockedCompareExchange(
  _Inout_  LONG volatile *Destination,
  _In_     LONG Exchange,
  _In_     LONG Comparand
);

Parameters

Destination [in, out]

A pointer to the destination value.

Exchange [in]

The exchange value.

Comparand [in]

The value to compare to Destination.

Return value

The function returns the initial value of the Destination parameter.

 

大概解釋一下。這個函數是比較後進行交換。第一個參數是要存放目的的數據,第二個是交換數據,第三個是比較數據(與第一個比較), 如果交換返回的數據和第三個參數一樣。

 

 

這樣就可以在push和pop一步完成。

這裡貼出push的pop操作

procedure TVarQueue.pushQueue(pvData: PVarQueueBlock);
var
  lvTemp:PVarQueueBlock;
  lvPointer:Pointer;
begin
  while True do
  begin
     lvTemp := FTail;
     while lvTemp.next <> nil do lvTemp := lvTemp.next;
     if InterlockedCompareExchangePointer(Pointer(lvTemp.next), Pointer(pvData), nil) = nil then
     begin
       break;
     end;
  end;
  FTail := pvData;
  Inc(FCount);
end;
function TVarQueue.popQueue: PVarQueueBlock;
var
  lvTemp, lvRet:PVarQueueBlock;
  lvPointer:Pointer;
begin
  ///為了方便 隊列中始終保留一個FHead數據塊
  ///  也就是說FHead指向的下一個數據塊才是第一個數據塊
  ///

  while True do
  begin
    lvTemp := FHead;
    if (lvTemp = nil) or (lvTemp.next = nil) then
    begin        //沒有任何可以Pop出的值
      Result := nil;
      exit;
    end;
    if InterlockedCompareExchangePointer(Pointer(FHead), lvTemp.next, lvTemp) = lvTemp then
    begin
      break;
    end;
  end;
  Dec(FCount);
  lvRet := lvTemp.next;
  Result := lvRet;
  lvTemp.next := nil;
  Dispose(lvTemp);
  //返回的是head.next
end;

 

後續我會上傳完整的代碼到DIOCP項目中。

如有漏洞,敬請指出。歡迎假如DIOCP群討論

 

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved