860 lines
26 KiB
Plaintext
860 lines
26 KiB
Plaintext
{
|
|
$Project$
|
|
$Workfile$
|
|
$Revision$
|
|
$DateUTC$
|
|
$Id$
|
|
|
|
This file is part of the Indy (Internet Direct) project, and is offered
|
|
under the dual-licensing agreement described on the Indy website.
|
|
(http://www.indyproject.org/)
|
|
|
|
Copyright:
|
|
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
|
|
}
|
|
{
|
|
$Log$
|
|
}
|
|
{
|
|
Rev 1.6 9/16/2004 8:11:40 PM JPMugaas
|
|
Should compile again.
|
|
|
|
Rev 1.5 6/11/2004 8:39:58 AM DSiders
|
|
Added "Do not Localize" comments.
|
|
|
|
Rev 1.4 2004.05.06 1:47:26 PM czhower
|
|
Now uses IndexOf
|
|
|
|
Rev 1.3 2004.04.13 10:37:56 PM czhower
|
|
Updates
|
|
|
|
Rev 1.2 2004.03.07 11:46:08 AM czhower
|
|
Flushbuffer fix + other minor ones found
|
|
|
|
Rev 1.1 2004.02.09 9:16:44 PM czhower
|
|
Updated to compile and match lib changes.
|
|
|
|
Rev 1.0 2004.02.03 12:38:56 AM czhower
|
|
Move
|
|
|
|
Rev 1.6 2003.10.24 10:37:38 AM czhower
|
|
IdStream
|
|
|
|
Rev 1.5 2003.10.19 4:38:32 PM czhower
|
|
Updates
|
|
|
|
Rev 1.4 2003.10.19 2:50:40 PM czhower
|
|
Fiber cleanup
|
|
|
|
Rev 1.3 2003.10.14 11:17:02 PM czhower
|
|
Updates to match core changes.
|
|
|
|
Rev 1.2 2003.10.11 5:43:30 PM czhower
|
|
Chained servers now functional.
|
|
|
|
Rev 1.1 2003.09.19 10:09:40 PM czhower
|
|
Next stage of fiber support in servers.
|
|
|
|
Rev 1.0 8/16/2003 11:09:08 AM JPMugaas
|
|
Moved from Indy Core dir as part of package reorg
|
|
|
|
Rev 1.49 2003.07.17 4:42:06 PM czhower
|
|
More IOCP improvements.
|
|
|
|
Rev 1.45 2003.07.14 11:46:46 PM czhower
|
|
IOCP now passes all bubbles.
|
|
|
|
Rev 1.43 2003.07.14 1:10:52 AM czhower
|
|
Now passes all bubble tests for chained stack.
|
|
|
|
Rev 1.41 7/7/2003 1:34:06 PM BGooijen
|
|
Added WriteFile(...)
|
|
|
|
Rev 1.40 7/3/2003 2:03:52 PM BGooijen
|
|
IOCP works server-side now
|
|
|
|
Rev 1.39 2003.06.30 5:41:54 PM czhower
|
|
-Fixed AV that occurred sometimes when sockets were closed with chains
|
|
-Consolidated code that was marked by a todo for merging as it no longer
|
|
needed to be separate
|
|
-Removed some older code that was no longer necessary
|
|
|
|
Passes bubble tests.
|
|
|
|
Rev 1.38 6/29/2003 10:56:26 PM BGooijen
|
|
Removed .Memory from the buffer, and added some extra methods
|
|
|
|
Rev 1.37 2003.06.25 4:30:02 PM czhower
|
|
Temp hack fix for AV problem. Working on real solution now.
|
|
|
|
Rev 1.36 6/24/2003 11:17:44 PM BGooijen
|
|
change in TIdIOHandlerChain.ReadLn, LTermPos= 0 is now handled differently
|
|
|
|
Rev 1.35 23/6/2003 22:33:18 GGrieve
|
|
fix CheckForDataOnSource - specify timeout
|
|
|
|
Rev 1.34 6/22/2003 11:22:22 PM JPMugaas
|
|
Should now compile.
|
|
|
|
Rev 1.33 6/4/2003 1:08:40 AM BGooijen
|
|
Added CheckForDataOnSource and removed some (duplicate) code
|
|
|
|
Rev 1.32 6/3/2003 8:07:20 PM BGooijen
|
|
Added TIdIOHandlerChain.AllData
|
|
|
|
Rev 1.31 5/11/2003 2:37:58 PM BGooijen
|
|
Bindings are updated now
|
|
|
|
Rev 1.30 5/11/2003 12:00:08 PM BGooijen
|
|
|
|
Rev 1.29 5/11/2003 12:03:16 AM BGooijen
|
|
|
|
Rev 1.28 2003.05.09 10:59:24 PM czhower
|
|
|
|
Rev 1.27 2003.04.22 9:48:50 PM czhower
|
|
|
|
Rev 1.25 2003.04.17 11:01:14 PM czhower
|
|
|
|
Rev 1.19 2003.04.10 10:51:04 PM czhower
|
|
|
|
Rev 1.18 4/2/2003 3:39:26 PM BGooijen
|
|
Added Intercepts
|
|
|
|
Rev 1.17 3/29/2003 5:53:52 PM BGooijen
|
|
added AfterAccept
|
|
|
|
Rev 1.16 3/27/2003 2:57:58 PM BGooijen
|
|
Added a RawWrite for streams, implemented WriteStream, changed
|
|
WriteToDestination to use TIdWorkOpUnitWriteBuffer
|
|
|
|
Rev 1.15 2003.03.26 12:20:28 AM czhower
|
|
Moved visibility of execute to protected.
|
|
|
|
Rev 1.14 3/25/2003 11:07:58 PM BGooijen
|
|
ChainEngine descends now from TIdBaseComponent
|
|
|
|
Rev 1.13 3/25/2003 01:33:48 AM JPMugaas
|
|
Fixed compiler warnings.
|
|
|
|
Rev 1.12 3/24/2003 11:03:50 PM BGooijen
|
|
Various fixes to readln:
|
|
- uses connection default now
|
|
- doesn't raise an exception on timeout any more
|
|
|
|
Rev 1.11 2003.03.13 1:22:58 PM czhower
|
|
Typo fixed. lenth --> Length
|
|
|
|
Rev 1.10 3/13/2003 10:18:20 AM BGooijen
|
|
Server side fibers, bug fixes
|
|
|
|
Rev 1.9 3/2/2003 12:36:22 AM BGooijen
|
|
Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
|
|
ReadBuffer doesn't use ReadStream any more.
|
|
TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
|
|
exceptions).
|
|
woReadLn doesn't check the intire buffer any more, but continued where it
|
|
stopped the last time.
|
|
Added basic support for timeouts (probably only on read operations, and maybe
|
|
connect), accuratie of timeout is currently 500msec.
|
|
|
|
Rev 1.8 2/28/2003 10:15:16 PM BGooijen
|
|
bugfix: changed some occurrences of FRecvBuffer to FInputBuffer
|
|
|
|
Rev 1.7 2/27/2003 10:11:12 PM BGooijen
|
|
|
|
Rev 1.6 2/26/2003 1:08:52 PM BGooijen
|
|
|
|
Rev 1.5 2/25/2003 10:36:28 PM BGooijen
|
|
Added more opcodes, methods, and moved opcodes to separate files.
|
|
|
|
Rev 1.4 2003.02.25 9:02:32 PM czhower
|
|
Hand off to Bas
|
|
|
|
Rev 1.3 2003.02.25 1:36:04 AM czhower
|
|
|
|
Rev 1.2 2002.12.11 11:00:58 AM czhower
|
|
|
|
Rev 1.1 2002.12.07 12:26:06 AM czhower
|
|
|
|
Rev 1.0 11/13/2002 08:45:00 AM JPMugaas
|
|
}
|
|
unit IdIOHandlerChain;
|
|
|
|
interface
|
|
|
|
uses
|
|
Classes,
|
|
IdBaseComponent, IdBuffer, IdGlobal, IdIOHandler, IdIOHandlerSocket,
|
|
IdFiber, IdThreadSafe, IdWorkOpUnit, IdStackConsts, IdWinsock2, IdThread,
|
|
IdFiberWeaver, IdStream, IdStreamVCL,
|
|
Windows;
|
|
|
|
type
|
|
TIdConnectMode = (cmNonBlock, cmIOCP);
|
|
TIdIOHandlerChain = class;
|
|
TIdChainEngineThread = class;
|
|
|
|
TIdChainEngine = class(TIdBaseComponent)
|
|
protected
|
|
FCompletionPort: THandle;
|
|
FThread: TIdChainEngineThread;
|
|
//
|
|
procedure Execute;
|
|
function GetInputBuffer(const AIOHandler: TIdIOHandler): TIdBuffer;
|
|
procedure InitComponent; override;
|
|
procedure SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
|
|
procedure Terminating;
|
|
public
|
|
procedure AddWork(AWorkOpUnit: TIdWorkOpUnit);
|
|
procedure BeforeDestruction; override;
|
|
destructor Destroy; override;
|
|
procedure RemoveSocket(AIOHandler: TIdIOHandlerChain);
|
|
procedure SocketAccepted(AIOHandler: TIdIOHandlerChain);
|
|
end;
|
|
|
|
TIdIOHandlerChain = class(TIdIOHandlerSocket)
|
|
protected
|
|
FChainEngine: TIdChainEngine;
|
|
FConnectMode: TIdConnectMode;
|
|
FFiber: TIdFiber;
|
|
FFiberWeaver: TIdFiberWeaver;
|
|
FOverlapped: PIdOverlapped;
|
|
//
|
|
procedure ConnectClient; override;
|
|
procedure QueueAndWait(
|
|
AWorkOpUnit: TIdWorkOpUnit;
|
|
ATimeout: Integer = IdTimeoutDefault;
|
|
AFreeWorkOpUnit: Boolean = True;
|
|
AAllowGracefulException: Boolean = True
|
|
);
|
|
procedure WorkOpUnitCompleted(
|
|
AWorkOpUnit: TIdWorkOpUnit
|
|
);
|
|
public
|
|
procedure AfterAccept; override;
|
|
function AllData: string; override;
|
|
procedure CheckForDataOnSource(
|
|
ATimeout : Integer = 0
|
|
); override;
|
|
procedure CheckForDisconnect(
|
|
ARaiseExceptionIfDisconnected: Boolean = True;
|
|
AIgnoreBuffer: Boolean = False
|
|
); override;
|
|
constructor Create(
|
|
AOwner: TComponent;
|
|
AChainEngine: TIdChainEngine;
|
|
AFiberWeaver: TIdFiberWeaver;
|
|
AFiber: TIdFiber
|
|
); reintroduce; virtual;
|
|
destructor Destroy; override;
|
|
procedure Open; override;
|
|
function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean = True;
|
|
ATimeout: Integer = IdTimeoutDefault;
|
|
ARaiseExceptionOnTimeout: Boolean = True): Integer; override;
|
|
procedure ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
|
|
AReadUntilDisconnect: Boolean); override;
|
|
// TODO: Allow ReadBuffer to by pass the internal buffer. Will it really
|
|
// help? Only ReadBuffer would be able to use this optimiztion in most
|
|
// cases and it is not used by many. Most calls are to stream (disk) based
|
|
// or strings as ReadLn.
|
|
procedure ReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True);
|
|
override;
|
|
function ReadLn(
|
|
ATerminator: string = LF;
|
|
ATimeout: Integer = IdTimeoutDefault;
|
|
AMaxLineLength: Integer = -1
|
|
): string;
|
|
override;
|
|
// function WriteFile(
|
|
// AFile: string;
|
|
// AEnableTransferFile: Boolean
|
|
// ): Cardinal; override;
|
|
function WriteFile(
|
|
const AFile: String;
|
|
AEnableTransferFile: Boolean): Int64; override;
|
|
{ procedure Write(
|
|
AStream: TIdStream;
|
|
ASize: Integer = 0;
|
|
AWriteByteCount: Boolean = False);
|
|
override; }
|
|
procedure Write(
|
|
AStream: TIdStreamVCL;
|
|
ASize: Int64 = 0;
|
|
AWriteByteCount: Boolean = False
|
|
); override;
|
|
procedure WriteDirect(
|
|
ABuffer: TIdBytes
|
|
); override;
|
|
//
|
|
property ConnectMode: TIdConnectMode read FConnectMode write FConnectMode;
|
|
property Overlapped: PIdOverlapped read FOverlapped;
|
|
end;
|
|
|
|
TIdChainEngineThread = class(TIdThread)
|
|
protected
|
|
FChainEngine: TIdChainEngine;
|
|
public
|
|
constructor Create(
|
|
AOwner: TIdChainEngine;
|
|
const AName: string
|
|
); reintroduce;
|
|
procedure Run; override;
|
|
property Terminated;
|
|
end;
|
|
|
|
implementation
|
|
|
|
uses
|
|
IdComponent, IdException, IdExceptionCore, IdStack, IdResourceStrings, IdWorkOpUnits,
|
|
IdStackWindows,
|
|
SysUtils;
|
|
|
|
const
|
|
GCompletionKeyTerminate = $F0F0F0F0;
|
|
|
|
{ TIdIOHandlerChain }
|
|
|
|
procedure TIdIOHandlerChain.CheckForDataOnSource(ATimeout: Integer = 0);
|
|
begin
|
|
// TODO: Change this so we dont have to rely on an exception trap
|
|
try
|
|
QueueAndWait(TIdWorkOpUnitReadAvailable.Create, ATimeout, True, False);
|
|
except
|
|
on E: EIdReadTimeout do begin
|
|
// Nothing
|
|
end else begin
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.ConnectClient;
|
|
begin
|
|
// TODO: Non blocking does not support Socks
|
|
Binding.OverLapped := (ConnectMode = cmIOCP);
|
|
inherited;
|
|
case ConnectMode of
|
|
cmNonBlock: begin
|
|
//TODO: Non blocking DNS resolution too?
|
|
Binding.SetPeer(GWindowsStack.ResolveHost(Host), Port);
|
|
GWindowsStack.SetBlocking(Binding.Handle, False);
|
|
// Does not block
|
|
Binding.Connect;
|
|
end;
|
|
cmIOCP: begin
|
|
//TODO: For now we are doing blocking, just to get it to work. fix later
|
|
// IOCP was not designed for connects, so we'll have to do some monkeying
|
|
// maybe even create an engine thread just to watch for connect events.
|
|
//TODO: Resolution too?
|
|
Binding.SetPeer(GStack.ResolveHost(Host), Port);
|
|
Binding.Connect;
|
|
GWindowsStack.SetBlocking(Binding.Handle, False);
|
|
end;
|
|
else begin
|
|
raise EIdException.Create('Unrecognized ConnectMode'); {do not localize}
|
|
end;
|
|
end;
|
|
QueueAndWait(TIdWorkOpUnitWaitConnected.Create);
|
|
|
|
//Update the bindings
|
|
Binding.UpdateBindingLocal;
|
|
//TODO: Could Peer binding ever be other than what we specified above? Need to reread it?
|
|
Binding.UpdateBindingPeer;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.AfterAccept;
|
|
begin
|
|
FChainEngine.SocketAccepted(self);
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.Open;
|
|
begin
|
|
// Things before inherited, inherited actually connects and ConnectClient
|
|
// needs these things
|
|
inherited;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.CheckForDisconnect(
|
|
ARaiseExceptionIfDisconnected: Boolean; AIgnoreBuffer: Boolean);
|
|
var
|
|
LDisconnected: Boolean;
|
|
begin
|
|
// ClosedGracefully // Server disconnected
|
|
// IOHandler = nil // Client disconnected
|
|
if ClosedGracefully then begin
|
|
if BindingAllocated then begin
|
|
Close;
|
|
// Call event handlers to inform the user program that we were disconnected
|
|
// DoStatus(hsDisconnected);
|
|
//DoOnDisconnected;
|
|
end;
|
|
LDisconnected := True;
|
|
end else begin
|
|
LDisconnected := not BindingAllocated;
|
|
end;
|
|
if LDisconnected then begin
|
|
// Do not raise unless all data has been read by the user
|
|
if Assigned(FInputBuffer) then begin
|
|
if ((FInputBuffer.Size = 0) or AIgnoreBuffer)
|
|
and ARaiseExceptionIfDisconnected then begin
|
|
RaiseConnClosedGracefully;
|
|
end;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
function TIdIOHandlerChain.ReadFromSource(
|
|
ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer;
|
|
ARaiseExceptionOnTimeout: Boolean): Integer;
|
|
begin
|
|
Result := 0;
|
|
raise EIdException.Create('Fall through error in ' + ClassName); {do not localize}
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
|
|
AReadUntilDisconnect: Boolean);
|
|
begin
|
|
if AReadUntilDisconnect then begin
|
|
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
|
|
, True, False);
|
|
end else begin
|
|
QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
|
|
end;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
|
|
AByteCount: Integer; AAppend: Boolean = True);
|
|
begin
|
|
EIdException.IfFalse(AByteCount >= 0);
|
|
if AByteCount > 0 then begin
|
|
if FInputBuffer.Size < AByteCount then begin
|
|
QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
|
|
end;
|
|
Assert(FInputBuffer.Size >= AByteCount);
|
|
FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
|
|
end;
|
|
end;
|
|
|
|
function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
|
|
ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
|
|
var
|
|
LTermPos: Integer;
|
|
begin
|
|
if AMaxLineLength = -1 then begin
|
|
AMaxLineLength := MaxLineLength;
|
|
end;
|
|
// User may pass '' if they need to pass arguments beyond the first.
|
|
if ATerminator = '' then begin
|
|
ATerminator := LF;
|
|
end;
|
|
FReadLnSplit := False;
|
|
FReadLnTimedOut := False;
|
|
try
|
|
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
|
|
if (LTermPos = 0) and ((AMaxLineLength = 0)
|
|
or (FInputBuffer.Size < AMaxLineLength)) then begin
|
|
QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
|
|
, ATimeout);
|
|
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
|
|
end;
|
|
// LTermPos cannot be 0, and the code below can't handle it properly
|
|
Assert(LTermPos > 0);
|
|
if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
|
|
case FMaxLineAction of
|
|
// TODO: find the right exception class here
|
|
maException: raise EIdException.Create('MaxLineLength exceded'); {do not localize}
|
|
maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
|
|
end;
|
|
end else begin
|
|
Result := FInputBuffer.Extract(LTermPos - 1);
|
|
if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
|
|
Delete(Result, Length(Result), 1);
|
|
end;
|
|
FInputBuffer.Extract(Length(ATerminator));// remove the terminator
|
|
end;
|
|
except on E: EIdReadTimeout do
|
|
FReadLnTimedOut := True;
|
|
end;
|
|
end;
|
|
|
|
function TIdIOHandlerChain.AllData: string;
|
|
var
|
|
LStream: TStringStream;
|
|
begin
|
|
BeginWork(wmRead); try
|
|
Result := '';
|
|
LStream := TStringStream.Create(''); try
|
|
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
|
|
, True, False);
|
|
Result := LStream.DataString;
|
|
finally FreeAndNil(LStream); end;
|
|
finally EndWork(wmRead); end;
|
|
end;
|
|
|
|
function TIdIOHandlerChain.WriteFile(
|
|
const AFile: String;
|
|
AEnableTransferFile: Boolean): Int64;
|
|
var
|
|
LWO:TIdWorkOpUnitWriteFile;
|
|
begin
|
|
//BGO: we ignore AEnableTransferFile for now
|
|
Result := 0;
|
|
// if not Assigned(Intercept) then begin
|
|
LWO := TIdWorkOpUnitWriteFile.Create(AFile);
|
|
try
|
|
QueueAndWait(LWO,IdTimeoutDefault, false);
|
|
finally
|
|
// Result := LWO.BytesSent;
|
|
FreeAndNil(LWO);
|
|
end;
|
|
// end else begin
|
|
// inherited WriteFile(AFile, AEnableTransferFile);
|
|
// end;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.Write(
|
|
AStream: TIdStreamVCL;
|
|
ASize: Int64 = 0;
|
|
AWriteByteCount: Boolean = False
|
|
);
|
|
var
|
|
LStart: Integer;
|
|
LThisSize: Integer;
|
|
begin
|
|
if ASize < 0 then begin //"-1" All form current position
|
|
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
|
|
ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
|
|
AStream.VCLStream.Seek(LStart, soFromBeginning);
|
|
end else if ASize = 0 then begin //"0" ALL
|
|
LStart := 0;
|
|
ASize := AStream.VCLStream.Seek(0, soFromEnd);
|
|
AStream.VCLStream.Seek(0, soFromBeginning);
|
|
end else begin //else ">0" ACount bytes
|
|
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
|
|
end;
|
|
|
|
if AWriteByteCount then begin
|
|
Write(ASize);
|
|
end;
|
|
|
|
// BeginWork(wmWrite, ASize);
|
|
try
|
|
while ASize > 0 do begin
|
|
LThisSize := Min(128 * 1024, ASize); // 128K blocks
|
|
QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
|
|
, False));
|
|
Dec(ASize, LThisSize);
|
|
Inc(LStart, LThisSize);
|
|
end;
|
|
finally
|
|
// EndWork(wmWrite);
|
|
end;
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.WriteDirect(
|
|
ABuffer: TIdBytes
|
|
);
|
|
begin
|
|
QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.QueueAndWait(
|
|
AWorkOpUnit: TIdWorkOpUnit;
|
|
ATimeout: Integer = IdTimeoutDefault;
|
|
AFreeWorkOpUnit: Boolean = True;
|
|
AAllowGracefulException: Boolean = True
|
|
);
|
|
var
|
|
LWorkOpUnit: TIdWorkOpUnit;
|
|
begin
|
|
try
|
|
CheckForDisconnect(AAllowGracefulException);
|
|
LWorkOpUnit := AWorkOpUnit;
|
|
//
|
|
if ATimeout = IdTimeoutInfinite then begin
|
|
LWorkOpUnit.TimeOutAt := 0;
|
|
end else begin
|
|
if ATimeout = IdTimeoutDefault then begin
|
|
if FReadTimeout <= 0 then begin
|
|
LWorkOpUnit.TimeOutAt := 0;
|
|
end else begin
|
|
//we type cast FReadTimeOut as a cardinal to prevent the compiler from
|
|
//expanding vars to an Int64 type. That can incur a performance penalty.
|
|
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
|
|
end
|
|
end else begin
|
|
//FReadTimeOut is typecase as a cardinal to prevent the compiler from
|
|
//expanding vars to an Int64 type which can incur a performance penalty.
|
|
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
|
|
end
|
|
end;
|
|
//
|
|
LWorkOpUnit.Fiber := FFiber;
|
|
LWorkOpUnit.IOHandler := Self;
|
|
LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
|
|
LWorkOpUnit.SocketHandle := Binding.Handle;
|
|
// Add to queue and wait to be rescheduled when work is completed
|
|
FChainEngine.AddWork(LWorkOpUnit);
|
|
// Check to see if we need to reraise an exception
|
|
LWorkOpUnit.RaiseException;
|
|
// Check for timeout
|
|
if LWorkOpUnit.TimedOut then begin
|
|
raise EIdReadTimeout.Create('Timed out'); {do not localize}
|
|
end;
|
|
// Check to see if it was closed during this operation
|
|
CheckForDisconnect(AAllowGracefulException);
|
|
finally
|
|
if AFreeWorkOpUnit then begin
|
|
AWorkOpUnit.Free;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
constructor TIdIOHandlerChain.Create(
|
|
AOwner: TComponent;
|
|
AChainEngine: TIdChainEngine;
|
|
AFiberWeaver: TIdFiberWeaver;
|
|
AFiber: TIdFiber
|
|
);
|
|
begin
|
|
inherited Create(AOwner);
|
|
//
|
|
EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
|
|
FChainEngine := AChainEngine;
|
|
FChainEngine.SetIOHandlerOptions(Self);
|
|
//
|
|
EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
|
|
FFiberWeaver := AFiberWeaver;
|
|
//
|
|
EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
|
|
FFiber := AFiber;
|
|
// Initialize Overlapped structure
|
|
New(FOverlapped);
|
|
ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
|
|
New(FOverlapped.Buffer);
|
|
end;
|
|
|
|
procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
|
|
begin
|
|
FFiberWeaver.Add(AWorkOpUnit.Fiber);
|
|
end;
|
|
|
|
destructor TIdIOHandlerChain.Destroy;
|
|
begin
|
|
// Tell the chain engine that we are closing and to remove any references to
|
|
// us and cease any usage.
|
|
// Do not do this in close, it can cause deadlocks because the engine can
|
|
// call close while in its Execute.
|
|
FChainEngine.RemoveSocket(Self);
|
|
Dispose(FOverlapped.Buffer);
|
|
Dispose(FOverlapped);
|
|
inherited;
|
|
end;
|
|
|
|
{ TIdChainEngine }
|
|
|
|
procedure TIdChainEngine.BeforeDestruction;
|
|
begin
|
|
if FThread <> nil then begin
|
|
// Signal thread for termination
|
|
FThread.Terminate;
|
|
// Tell the engine we are attempting termination
|
|
Terminating;
|
|
// Wait for the thread to terminate
|
|
FThread.WaitFor;
|
|
// Free thread
|
|
FreeAndNil(FThread);
|
|
end;
|
|
inherited;
|
|
end;
|
|
|
|
function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
|
|
begin
|
|
Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
|
|
end;
|
|
|
|
procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
|
|
begin
|
|
AIOHandler.ConnectMode := cmIOCP;
|
|
end;
|
|
|
|
procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
|
|
begin
|
|
// Associate the socket with the completion port.
|
|
if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
|
|
= 0 then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdChainEngine.Terminating;
|
|
begin
|
|
if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
|
|
, nil) then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdChainEngine.Execute;
|
|
var
|
|
LBytesTransferred: DWord;
|
|
LCompletionKey: DWord;
|
|
LOverlapped: PIdOverlapped;
|
|
begin
|
|
// Wait forever on the completion port. If we are terminating, a terminate
|
|
// signal is sent into the queue.
|
|
if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
|
|
, LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
|
|
if LCompletionKey <> GCompletionKeyTerminate then begin
|
|
// Socket has been closed
|
|
if LBytesTransferred = 0 then begin
|
|
LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
|
|
end;
|
|
LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
|
|
begin
|
|
// raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket');
|
|
end;
|
|
|
|
procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
|
|
begin
|
|
if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
|
|
// Associate the socket with the completion port.
|
|
if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
|
|
= 0 then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
AWorkOpUnit.Complete;
|
|
end;
|
|
AWorkOpUnit.Start;
|
|
end;
|
|
|
|
destructor TIdChainEngine.Destroy;
|
|
begin
|
|
if CloseHandle(FCompletionPort) = False then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
inherited;
|
|
end;
|
|
|
|
procedure TIdChainEngine.InitComponent;
|
|
begin
|
|
{
|
|
var SysInfo: TSystemInfo;
|
|
GetSystemInfo(SysInfo);
|
|
SysInfo.dwNumberOfProcessors
|
|
|
|
Use GetSystemInfo instead. It will return the all info on the local
|
|
system's architecture and will also return a valid ActiveProcessorMask
|
|
which is a DWORD to be read as a bit array of the processor on the
|
|
system...
|
|
|
|
CZH> And next
|
|
CZH> question - any one know off hand how to set affinity? :)
|
|
|
|
Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
|
|
on wether you want to act on the whole process or just a single
|
|
thread (SetThreadIdealProcessor is another way to do it: it just gives
|
|
the scheduler a hint about where to run a thread without forcing it:
|
|
good for keeping two threads doing IO one with each other on the same
|
|
processor).
|
|
}
|
|
inherited;
|
|
if not (csDesigning in ComponentState) then begin
|
|
// Cant use .Name, its not initialized yet in Create
|
|
FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
|
|
end;
|
|
//MS says destruction is automatic, but Google seems to say that this initial
|
|
//one is not auto managed as MS says, and that CloseHandle should be called.
|
|
FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
|
|
if FCompletionPort = 0 then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
end;
|
|
|
|
{ TIdChainEngineThread }
|
|
|
|
constructor TIdChainEngineThread.Create(
|
|
AOwner: TIdChainEngine;
|
|
const AName: string
|
|
);
|
|
begin
|
|
FChainEngine := AOwner;
|
|
inherited Create(False, True, AName);
|
|
end;
|
|
|
|
(*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
|
|
var
|
|
LPOverlapped: PIdOverlapped;
|
|
LHFile:THandle;
|
|
begin
|
|
New(LPOverlapped);
|
|
ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
|
|
New(LPOverlapped^.Buffer);
|
|
LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
|
|
LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
|
|
LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
|
|
if LHFile=INVALID_HANDLE_VALUE then begin
|
|
RaiseLastOSError;
|
|
end;
|
|
try
|
|
if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
|
|
AWorkOpUnit.Fiber.Relinquish;
|
|
end else begin
|
|
raise EIdException.Create('error in ServiceQueryTransmitFile');
|
|
end;
|
|
finally
|
|
CloseHandle(LHFile);
|
|
end;
|
|
end;
|
|
*)
|
|
(*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
|
|
|
|
procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
|
|
begin
|
|
ADst.IOHandler := ASrc.IOHandler;
|
|
ADst.Fiber := ASrc.Fiber;
|
|
ADst.OnCompleted := ASrc.OnCompleted;
|
|
ADst.SocketHandle:= ASrc.SocketHandle;
|
|
end;
|
|
|
|
var
|
|
LStream:TfileStream;
|
|
LWorkOpUnit : TIdWorkOpUnitWriteStream;
|
|
|
|
LBuf:pointer;
|
|
LBufLen:integer;
|
|
begin
|
|
Assert(False, 'to do');
|
|
LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
|
|
try
|
|
LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
|
|
try
|
|
CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
|
|
LBufLen:=Min(LStream.size,128*1024);
|
|
getmem(LBuf,LBufLen);
|
|
LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
|
|
LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
|
|
IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
|
|
finally
|
|
AWorkOpUnit.BytesSent := LStream.Size;
|
|
LWorkOpUnit.free;
|
|
end;
|
|
finally
|
|
LStream.free;
|
|
end;
|
|
end;
|
|
*)
|
|
|
|
procedure TIdChainEngineThread.Run;
|
|
begin
|
|
FChainEngine.Execute;
|
|
end;
|
|
|
|
end.
|
|
|