本文提供Delphi一個基於原子操作的無鎖隊列,簡易高效。適用於多線程大吞吐量操作的隊列。
可用於Android系統和32,64位Windows系統。
感謝殲10和qsl提供了修改建議!
有如下問題:
1.必須實現開辟內存
2.隊列大小必須是2的冪
3.不能壓入空指針
unit utAtomFIFO;
interface
Uses
SysUtils,
SyncObjs;
Type
TAtomFIFO = Class
Protected
FWritePtr: Integer;
FReadPtr: Integer;
FCount:Integer;
FHighBound:Integer;
FisEmpty:Integer;
FData: array of Pointer;
function GetSize:Integer;
Public
procedure Push(Item: Pointer);
function Pop: Pointer;
Constructor Create(Size: Integer); Virtual;
Destructor Destroy; Override;
Procedure Empty;
property Size: Integer read GetSize;
property UsedCount:Integer read FCount;
End;
Implementation
{$I InterlockedAPIs.inc}
//創建隊列,大小必須是2的冪,需要開辟足夠大的隊列,防止隊列溢出
Constructor TAtomFIFO.Create(Size: Integer);
var
i:NativeInt;
OK:Boolean;
Begin
Inherited Create;
OK:=(Size and (Size-1)=0);
if not OK then raise Exception.Create('FIFO長度必須大於等於256並為2的冪');
try
SetLength(FData, Size);
FHighBound:=Size-1;
except
Raise Exception.Create('FIFO申請內存失敗');
end;
End;
Destructor TAtomFIFO.Destroy;
Begin
SetLength(FData, 0);
Inherited;
End;
procedure TAtomFIFO.Empty;
begin
while (InterlockedExchange(FReadPtr, 0)<>0) and (InterlockedExchange(FWritePtr, 0)<>0) and (InterlockedExchange(FCount, 0)<>0) do;
end;
function TAtomFIFO.GetSize: Integer;
begin
Result:=FHighBound+1;
end;
procedure TAtomFIFO.Push(Item:Pointer);
var
N:Integer;
begin
if Item=nil then Exit;
N:=InterlockedIncrement(FWritePtr) and FHighBound;
FData[N]:=Item;
InterlockedIncrement(FCount);
end;
Function TAtomFIFO.Pop:Pointer;
var
N:Integer;
begin
if InterlockedDecrement(FCount)<0 then
begin
InterlockedIncrement(FCount);
Result:=nil;
end
else
begin
N:=InterlockedIncrement(FReadPtr) and FHighBound;
while FData[N]=nil do Sleep(1);
Result:=FData[N];
FData[N]:=nil;
end;
end;
End.
InterlockedAPIs.inc
{*******************************************************}
{ }
{ CodeGear Delphi Runtime Library }
{ }
{ Copyright(c) 1995-2014 Embarcadero Technologies, Inc. }
{ }
{*******************************************************}
{$IFDEF CPUX86}
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
asm
MOV ECX,EAX
MOV EAX,EDX
LOCK XADD [ECX],EAX
ADD EAX,EDX
end;
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
XCHG EAX,ECX
LOCK CMPXCHG [ECX],EDX
end;
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
JMP InterlockedCompareExchange
end;
function InterlockedDecrement(var Addend: Integer): Integer;
asm
MOV EDX,-1
JMP InterlockedAdd
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
asm
MOV ECX,EAX
MOV EAX,[ECX]
@@loop:
LOCK CMPXCHG [ECX],EDX
JNZ @@loop
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
JMP InterlockedExchange
end;
function InterlockedIncrement(var Addend: Integer): Integer;
asm
MOV EDX,1
JMP InterlockedAdd
end;
{$ENDIF CPUX86}
{$IFDEF CPUX64}
function InterlockedExchangeAdd(var Addend: Integer; Value: Integer): Integer;
asm
.NOFRAME
MOV EAX,EDX
LOCK XADD [RCX].Integer,EAX
end;
function InterlockedDecrement(var Addend: LongInt): LongInt;
asm
.NOFRAME
MOV EAX,-1
LOCK XADD [RCX].Integer,EAX
DEC EAX
end;
function InterlockedIncrement(var Addend: LongInt): LongInt;
asm
MOV EAX,1
LOCK XADD [RCX].Integer,EAX
INC EAX
end;
function InterlockedCompareExchange(var Destination: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
.NOFRAME
MOV EAX,R8d
LOCK CMPXCHG [RCX].Integer,EDX
end;
function InterlockedCompareExchange64(var Destination: Int64; Exchange: Int64; Comparand: Int64): Int64; overload;
asm
.NOFRAME
MOV RAX,R8
LOCK CMPXCHG [RCX],RDX
end;
function InterlockedCompareExchangePointer(var Destination: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
.NOFRAME
MOV RAX,R8
LOCK CMPXCHG [RCX],RDX
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
.NOFRAME
LOCK XCHG [RCX],RDX
MOV RAX,RDX
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;// inline;
asm
.NOFRAME
LOCK XCHG [RCX],EDX
MOV EAX,EDX
end;
{$ENDIF CPUX64}
{$IFDEF CPUARM}
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
begin
Result := AtomicIncrement(Addend, Increment);
end;
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
begin
Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
begin
Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
function InterlockedDecrement(var Addend: Integer): Integer;
begin
Result := AtomicDecrement(Addend);
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
begin
Result := AtomicExchange(Target, Value);
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
begin
Result := AtomicExchange(Target, Value);
end;
function InterlockedIncrement(var Addend: Integer): Integer;
begin
Result := AtomicIncrement(Addend);
end;
{$ENDIF CPUARM}
性能測試:
采用天地弦提供的評估程序,進行了一些修改,分別對使用不同的臨界區的隊列進行對比結果如下:
其中Swith是因隊列讀空,進行線程上下文切換的次數
