* Replaced fphttpclient with indy10.

* Added compression support
This commit is contained in:
2015-10-04 14:14:55 +02:00
parent 610c1e4108
commit b1e455022b
1330 changed files with 338589 additions and 27 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

File diff suppressed because it is too large Load Diff

590
indy/SuperCore/IdFiber.pas Normal file
View File

@@ -0,0 +1,590 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.3 6/11/2004 8:39:48 AM DSiders
Added "Do not Localize" comments.
Rev 1.2 2004.04.22 11:45:16 PM czhower
Bug fixes
Rev 1.1 2004.02.09 9:16:34 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:38:48 AM czhower
Move
Rev 1.8 2003.10.24 1:00:04 PM czhower
Name change
Rev 1.7 2003.10.21 12:19:20 AM czhower
TIdTask support and fiber bug fixes.
Rev 1.6 2003.10.19 2:50:38 PM czhower
Fiber cleanup
Rev 1.5 2003.10.19 1:04:26 PM czhower
Updates
Rev 1.3 2003.10.11 5:43:12 PM czhower
Chained servers now functional.
Rev 1.2 2003.09.19 10:09:38 PM czhower
Next stage of fiber support in servers.
Rev 1.1 2003.09.19 3:01:34 PM czhower
Changed to emulate IdThreads Run behaviour
Rev 1.0 8/16/2003 11:09:14 AM JPMugaas
Moved from Indy Core dir as part of package reorg
Rev 1.25 7/2/2003 2:06:40 PM BGooijen
changed IdSupportsFibers to TIdFiberBase.HaveFiberSupport
Rev 1.24 7/1/2003 8:34:14 PM BGooijen
Added function IdSupportsFibers
Fiber-functions are now loaded on runtime
Rev 1.23 2003.06.30 7:33:50 PM czhower
Fix to exception handling.
Rev 1.22 2003.06.30 6:52:20 PM czhower
Exposed FiberWeaver has a property.
Rev 1.21 2003.06.03 11:05:02 PM czhower
Modified ProcessInThisFiber to support error flag return.
Rev 1.20 2003.06.03 8:01:38 PM czhower
Completed fiber exception handling.
Rev 1.19 2003.05.27 10:27:08 AM czhower
Put back original exception handling.
Rev 1.18 5/16/2003 3:48:24 PM BGooijen
Added FreeOnTerminate
Rev 1.17 4/17/2003 7:40:00 PM BGooijen
Added AAutoStart for fibers
Rev 1.16 2003.04.17 7:44:56 PM czhower
Rev 1.15 2003.04.14 10:54:08 AM czhower
Fiber specific exceptions
Rev 1.14 2003.04.12 11:53:56 PM czhower
Added DoExecute
Rev 1.13 4/11/2003 1:46:58 PM BGooijen
added ProcessInThisFiber and WaitForFibers to TIdFiberWeaverBase
Rev 1.12 2003.04.10 11:21:42 PM czhower
Yield support
Rev 1.9 2003.03.27 1:29:14 AM czhower
Exception frame swapping.
Rev 1.7 3/22/2003 09:45:28 PM JPMugaas
Now should compile under D4.
Rev 1.6 2003.03.13 1:25:18 PM czhower
Moved check for parent fiber to SwitchTo
Rev 1.5 3/13/2003 10:18:12 AM BGooijen
Server side fibers, bug fixes
Rev 1.4 2003.02.18 1:25:04 PM czhower
Added exception if user tries to SwitchTo a completed fiber.
Rev 1.3 2003.01.17 2:32:12 PM czhower
Rev 1.2 1-1-2003 16:25:10 BGooijen
The property ParentFiber can now be written to
Added class function TIdFiberBase.GetCurrentFiberBase, which returns the
current TIdFiber
Rev 1.1 12-28-2002 12:01:18 BGooijen
Made a public read only property: ParentFiber
Rev 1.0 11/13/2002 08:44:18 AM JPMugaas
}
unit IdFiber;
interface
uses
Classes,
IdThreadSafe, IdBaseComponent, IdYarn, IdTask,
SyncObjs, SysUtils,
Windows;
type
// TIdFiberBase is the base for both fiber types and contains
// methods that are common to both and defines the general interface. All
// references to fibers should generally use this base type.
TIdFiberBase = class(TObject)
protected
FHandle: Pointer;
FPriorFiber: TIdFiberBase;
FName: string;
FRaiseList: Pointer;
// No descendants should ever call this. Its internal only
// and should only be called after destruction or after the RaiseList has
// been saved
procedure SwitchToMeFrom(
AFromFiber: TIdFiberBase
);
public
constructor Create; reintroduce; virtual;
procedure CheckRunnable; virtual;
class function HaveFiberSupport: Boolean;
procedure SwitchTo(AFiber: TIdFiberBase);
//
property Name: string read FName write FName;
property PriorFiber: TIdFiberBase read FPriorFiber;
property Handle: Pointer read FHandle;
end;
TIdFiber = class;
TIdFiberRelinquishEvent = procedure(
ASender: TIdFiber;
AReschedule: Boolean
) of object;
// TIdConvertedFiber is used to represent thread that have been converted to
// fibers
TIdConvertedFiber = class(TIdFiberBase)
public
constructor Create; override;
end;
// TIdFiber is the general purpose fiber. To implement fibers descend from
// TIdFiber.
TIdFiber = class(TIdFiberBase)
protected
FFatalException: Exception;
FFatalExceptionOccurred: Boolean;
FFinished: TIdThreadSafeBoolean;
FFreeFatalException: Boolean;
FFreeFiber: Boolean;
FLoop: Boolean;
FOnRelinquish: TIdFiberRelinquishEvent;
FParentFiber: TIdFiberBase;
FStarted: TIdThreadSafeBoolean;
FStopped: TIdThreadSafeBoolean;
FYarn: TIdYarn;
//
procedure AfterRun; virtual; //not abstract - otherwise it is required
procedure BeforeRun; virtual; //not abstract - otherwise it is required
function GetFinished: Boolean;
function GetStarted: Boolean;
function GetStopped: Boolean;
procedure Execute;
procedure Run; virtual; abstract;
procedure SwitchToParent;
public
procedure CheckRunnable; override;
constructor Create(
AParentFiber: TIdFiberBase = nil;
ALoop: Boolean = False;
AStackSize: Integer = 0);
reintroduce;
destructor Destroy;
override;
procedure RaiseFatalException;
// Relinquish is used when the fiber is stuck and cannot usefully do
// anything. It will be removed from scheduling until something reschedules
// it. This is different than yield.
//
// Relinquish is used with FiberWeavers to tell them that the fiber is done
// or blocked. Something external such as more work, or completion of a task
// must reschedule the fiber with the fiber weaver.
procedure Relinquish;
procedure SetRelinquishHandler(AValue: TIdFiberRelinquishEvent);
procedure Stop; virtual;
// Gives up execution time and tells scheduler to process next available
// fiber.
// For manual fibers (no weaver) relinquish is called
// For woven fibers, the fiber is rescheduled and relinquished.
procedure Yield;
//
property FatalExceptionOccurred: Boolean read FFatalExceptionOccurred;
property Finished: Boolean read GetFinished;
property Loop: Boolean read FLoop write FLoop;
property Started: Boolean read GetStarted;
property Stopped: Boolean read GetStopped;
property ParentFiber: TIdFiberBase read FParentFiber write FParentFiber;
property Yarn: TIdYarn read FYarn write FYarn;
end;
TIdFiberWithTask = class(TIdFiber)
protected
FTask: TIdTask;
public
procedure AfterRun; override;
procedure BeforeRun; override;
// Defaults because a bit crazy to create a non looped task
constructor Create(
AParentFiber: TIdFiberBase = nil;
ATask: TIdTask = nil;
AName: string = '';
AStackSize: Integer = 0
); reintroduce;
destructor Destroy;
override;
procedure Run;
override;
//
// Must be writeable because tasks are often created after thread or
// thread is pooled
property Task: TIdTask read FTask write FTask;
end;
implementation
uses
IdGlobal, IdResourceStringsCore, IdExceptionCore, IdException;
var
SwitchToFiber: function(lpFiber: Pointer): BOOL; stdcall = nil;
CreateFiber: function(dwStackSize: DWORD; lpStartAddress: TFNFiberStartRoutine;
lpParameter: Pointer): BOOL; stdcall=nil;
DeleteFiber: function (lpFiber: Pointer): BOOL; stdcall = nil;
ConvertThreadToFiber: function (lpParameter: Pointer): BOOL; stdcall = nil;
procedure LoadFiberFunctions;
var
LKernel32Handle: THandle;
begin
if TIdFiberBase.HaveFiberSupport then begin
LKernel32Handle := GetModuleHandle(kernel32);
SwitchToFiber := Getprocaddress(LKernel32Handle,'SwitchToFiber'); {do not localize}
CreateFiber := Getprocaddress(LKernel32Handle,'CreateFiber'); {do not localize}
DeleteFiber := Getprocaddress(LKernel32Handle,'DeleteFiber'); {do not localize}
ConvertThreadToFiber := Getprocaddress(LKernel32Handle,'ConvertThreadToFiber'); {do not localize}
if Assigned(@SwitchToFiber) and
Assigned(@CreateFiber) and
Assigned(@DeleteFiber) and
Assigned(@ConvertThreadToFiber) then begin
Exit;
end else begin
SwitchToFiber := nil;
CreateFiber := nil;
DeleteFiber := nil;
ConvertThreadToFiber := nil;
end;
end;
raise EIdFibersNotSupported.Create(RSFibersNotSupported);
end;
procedure FiberFunc(AFiber: TIdFiber); stdcall;
var
LParentFiber: TIdFiberBase;
begin
with AFiber do begin
Execute;
LParentFiber := ParentFiber;
end;
// Threads converted from Fibers have no parent. Also use may specify
// nil if they want to control exit manually.
//
// We must do this last because with schedulers fibers get switched away
// at this last point and not rescheduled. We do this outside the
// execute as the fiber will likely be freed from somewhere else
if LParentFiber <> nil then begin
LParentFiber.SwitchToMeFrom(AFiber);
end;
end;
{ TIdFiber }
procedure TIdFiber.AfterRun;
begin
end;
procedure TIdFiber.BeforeRun;
begin
end;
procedure TIdFiber.CheckRunnable;
begin
inherited;
EIdFiberFinished.IfTrue(Finished, 'Fiber is finished.'); {do not localize}
EIdFiber.IfTrue((ParentFiber = nil) and (Assigned(FOnRelinquish) = False)
, 'No parent fiber or fiber weaver specified.'); {do not localize}
end;
constructor TIdFiber.Create(
AParentFiber: TIdFiberBase;
ALoop: Boolean;
AStackSize: Integer
);
begin
inherited Create;
FFinished := TIdThreadSafeBoolean.Create;
FStarted := TIdThreadSafeBoolean.Create;
FStopped := TIdThreadSafeBoolean.Create;
FFreeFiber := True;
FLoop := ALoop;
FParentFiber := AParentFiber;
// Create Fiber
FHandle := Pointer(CreateFiber(AStackSize, @FiberFunc, Self));
Win32Check(LongBool(FHandle));
end;
destructor TIdFiber.Destroy;
begin
EIdException.IfTrue(Started and (Finished = False), 'Fiber not finished.'); {do not localize}
// Threads converted from Fibers will have nil parents and if we call
// DeleteFiber it will exit the whole thread.
if FFreeFiber then begin
// Must never call from self. If so ExitThread is called
// Because of this FreeOnTerminate cannot be suported because a fiber
// cannot delete itself, and we never know where a fiber will go for sure
// when it is done. It can be done that the next fiber deletes it, but
// there are catches here too. Because of this I have made it the
// responsibility of the user (manual) or the scheduler (optional).
Win32Check(DeleteFiber(FHandle));
end;
FreeAndNil(FYarn);
FreeAndNil(FFinished);
FreeAndNil(FStarted);
FreeAndNil(FStopped);
// Kudzu:
// Docs say to call ReleaseException, but its empty. But it appears that since
// we are taking the exception and taking it from the raise list, that instead
// what we need to do is call .Free on the exception instead and that the docs
// are wrong. Need to run through a memory checker to verify the behaviour.
//
// Normally the except block frees the exception object, but we are stealing
// it out fo the list, so it does not free it.
//
// Ive looked into TThread and this is what it does as well, so big surprise
// that the docs are wrong.
//
// Update: We only free it if we dont reraise the exception. If we reraise it
// the fiber may be freed in a finally, and thus when the exception is handled
// again an AV or other will occur because the exception has been freed.
// When it is reraised, it is added back into the exception list and the
// VCL will free it as part of the final except block.
//
if FFreeFatalException then begin
FreeAndNil(FFatalException);
end;
//
inherited;
end;
procedure TIdFiber.Execute;
begin
try
try
BeforeRun; try
// This can be combined, but then it checks loop each run and its not
// valid to toggle it after run has started and therefore adds an
// unnecessary check
if Loop then begin
while not Stopped do begin
Run;
// If Weaver, this will let the weaver reschedule.
// If manual it will switch back to parent to let it handle it.
// If stopped just run through so it can clean up and exit
if not Stopped then begin
Yield;
end;
end;
end else begin
Run;
end;
finally AfterRun; end;
except FFatalException := AcquireExceptionObject; end;
if FFatalException <> nil then begin
FFatalExceptionOccurred := True;
FFreeFatalException := True;
end;
finally FFinished.Value := True; end;
end;
function TIdFiber.GetFinished: Boolean;
begin
Result := FFinished.Value;
end;
function TIdFiber.GetStarted: Boolean;
begin
Result := FStarted.Value;
end;
function TIdFiber.GetStopped: Boolean;
begin
Result := FStopped.Value;
end;
procedure TIdFiber.RaiseFatalException;
begin
if FatalExceptionOccurred then begin
FFreeFatalException := False;
raise FFatalException;
end;
end;
procedure TIdFiber.Stop;
begin
FStopped.Value := True;
end;
procedure TIdFiber.SwitchToParent;
begin
EIdException.IfNotAssigned(FParentFiber, 'No parent fiber to switch to.'); {do not localize}
SwitchTo(FParentFiber);
end;
procedure TIdFiber.Relinquish;
begin
if Assigned(FOnRelinquish) then begin
FOnRelinquish(Self, False);
end else begin
SwitchToParent;
end;
end;
procedure TIdFiber.Yield;
begin
// If manual fiber, yield is same as relinquish
if Assigned(FOnRelinquish) then begin
FOnRelinquish(Self, True);
end else begin
SwitchToParent;
end;
end;
procedure TIdFiber.SetRelinquishHandler(AValue: TIdFiberRelinquishEvent);
begin
FOnRelinquish := AValue;
end;
{ TIdConvertedFiber }
constructor TIdConvertedFiber.Create;
begin
inherited;
FHandle := Pointer(ConvertThreadToFiber(Self));
end;
{ TIdFiberBase }
constructor TIdFiberBase.Create;
begin
inherited;
if not Assigned(@CreateFiber) then begin
LoadFiberFunctions;
end;
end;
procedure TIdFiberBase.CheckRunnable;
begin
end;
class function TIdFiberBase.HaveFiberSupport:boolean;
begin
Result := IndyWindowsPlatform = VER_PLATFORM_WIN32_NT;
end;
procedure TIdFiberBase.SwitchTo(AFiber: TIdFiberBase);
begin
//Kudzu
// Be VERY careful in this section. This section takes care of Delphi's
// exception handling mechanism.
//
// This section swaps out the exception frames for each fiber so that
// exceptions are handled properly, preserved between switches, and across
// threads.
//
// Notes:
// -Only works on Windows, but we dont support fibers on Kylix right now
// anyways
// -Developer MUST use our fibers and not call Fiber API calls directly.
// -May not work on C++ Builder at this time.
// -May not work on older Delphi editions at this time.
// -If the user calls this method and the fiber is not the current fiber, will
// be problems. Maybe lock against thread ID and check that.
//
// This could be extended to make ThreadVars "FiberVars" by swaping out the
// TLS entry. I may make this an option in the future.
// This would also take care of the exception stack by itself and may be
// more portable to Linux, CB and older versions of Delphi. Will check later.
//
//
// Save raise list for current fiber
FRaiseList := RaiseList;
AFiber.SwitchToMeFrom(Self);
end;
procedure TIdFiberBase.SwitchToMeFrom(
AFromFiber: TIdFiberBase
);
begin
// See if we can run the fiber. If not it will raise an exception.
CheckRunnable;
FPriorFiber := AFromFiber;
// Restore raise list
SetRaiseList(FRaiseList);
// Switch to the actual fiber
SwitchToFiber(Handle);
end;
{ TIdFiberWithTask }
procedure TIdFiberWithTask.AfterRun;
begin
FTask.DoAfterRun;
inherited;
end;
procedure TIdFiberWithTask.BeforeRun;
begin
inherited;
FTask.DoBeforeRun;
end;
constructor TIdFiberWithTask.Create(
AParentFiber: TIdFiberBase = nil;
ATask: TIdTask = nil;
AName: string = '';
AStackSize: Integer = 0
);
begin
inherited Create(AParentFiber, True, AStackSize);
FTask := ATask;
end;
destructor TIdFiberWithTask.Destroy;
begin
FreeAndNil(FTask);
inherited;
end;
procedure TIdFiberWithTask.Run;
begin
if not FTask.DoRun then begin
Stop;
end;
end;
end.

View File

@@ -0,0 +1,53 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.0 2004.02.03 12:38:50 AM czhower
Move
Rev 1.0 2003.10.19 2:50:54 PM czhower
Fiber cleanup
}
unit IdFiberWeaver;
interface
uses
IdBaseComponent, IdFiber,
Windows;
type
TIdFiberWeaver = class(TIdBaseComponent)
protected
procedure Relinquish(
AFiber: TIdFiber;
AReschedule: Boolean
); virtual; abstract;
public
procedure Add(
AFiber: TIdFiber
); virtual; abstract;
function WaitForFibers(
ATimeout: Cardinal = Infinite
): Boolean;
virtual; abstract;
end;
implementation
end.

View File

@@ -0,0 +1,361 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.2 6/11/2004 8:39:52 AM DSiders
Added "Do not Localize" comments.
Rev 1.1 2004.02.09 9:16:38 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:38:52 AM czhower
Move
Rev 1.2 2003.11.04 3:51:20 PM czhower
Update to sync TC
Rev 1.1 2003.10.21 12:19:22 AM czhower
TIdTask support and fiber bug fixes.
Rev 1.0 2003.10.19 2:50:54 PM czhower
Fiber cleanup
Rev 1.4 2003.10.19 1:04:26 PM czhower
Updates
Rev 1.3 2003.10.11 5:43:20 PM czhower
Chained servers now functional.
Rev 1.2 2003.09.19 10:09:40 PM czhower
Next stage of fiber support in servers.
Rev 1.1 2003.08.20 1:46:22 PM czhower
Update to compile.
Rev 1.0 8/16/2003 11:09:12 AM JPMugaas
Moved from Indy Core dir as part of package reorg
Rev 1.8 7/26/2003 12:20:02 PM BGooijen
Small fix to prevent some exceptions
Rev 1.7 2003.06.30 7:33:50 PM czhower
Fix to exception handling.
Rev 1.6 2003.06.25 1:25:58 AM czhower
Small changes.
Rev 1.4 2003.06.03 11:05:02 PM czhower
Modified ProcessInThisFiber to support error flag return.
Rev 1.3 2003.04.17 7:44:58 PM czhower
Rev 1.2 4/11/2003 6:37:38 PM BGooijen
ProcessInThisFiber and WaitForFibers are now overridden here
Rev 1.1 2003.04.10 10:51:06 PM czhower
Rev 1.14 3/27/2003 12:34:02 PM BGooijen
very little clean-up
Rev 1.13 2003.03.27 1:31:18 AM czhower
Removal of hack cast.
Rev 1.12 2003.03.27 1:29:16 AM czhower
Exception frame swapping.
Rev 1.11 2003.03.27 12:45:58 AM czhower
Fixed AV relating to preparation changes for exception frame swapping
Rev 1.10 2003.03.27 12:18:06 AM czhower
Rev 1.9 3/26/2003 8:37:50 PM BGooijen
Added WaitForFibers
Rev 1.8 2003.03.26 12:48:30 AM czhower
Rev 1.7 3/25/2003 01:58:20 PM JPMugaas
Fixed a type-error.
Rev 1.6 3/25/2003 01:27:56 AM JPMugaas
Made a custom exception class that descends from EIdSIlentException so that
the component does not always raise an exception in the server if there's no
client connection.
Rev 1.5 2003.03.16 12:49:32 PM czhower
Rev 1.4 3/13/2003 10:18:14 AM BGooijen
Server side fibers, bug fixes
Rev 1.3 12-15-2002 17:08:00 BGooijen
Removed AssignList, and added a hack-cast to use .Assign
Rev 1.2 2002.12.07 11:10:30 PM czhower
Removed unneeded code.
Rev 1.1 12-6-2002 20:34:10 BGooijen
Now compiles on Delphi 5
Rev 1.0 11/13/2002 08:44:26 AM JPMugaas
}
unit IdFiberWeaverInline;
interface
uses
Classes, IdException,
IdGlobal, IdFiber, IdFiberWeaver, IdThreadSafe,
SyncObjs;
type
TIdFiberWeaverInline = class;
TIdFiberNotifyEvent = procedure(AFiberWeaver: TIdFiberWeaverInline;
AFiber: TIdFiberBase) of object;
TIdFiberWeaverInline = class(TIdFiberWeaver)
protected
// TIdThreadSafeInteger cannot be used for FActiveFiberList because the
// semantics cause the first fiber to be counted more than once during
// finish, and possibly other fibers as well. The only other solution
// involves using TIdFiber itself, and that would cause changes to TIdFiber
// that would be made only for the accomodation of TIdFiberWeaverInline.
//
// As it is TIdFiber itself has no knowledge ot TIdFiberWeaverInline.
//
// FActiveFiberList is used by ProcessInThisThread to detect when all fibers
// have finished.
FActiveFiberList: TIdThreadSafeList;
FAddEvent: TEvent;
// FActiveFiberList contains a list of fibers to schedule. Fibers are
// removed when they are running or are suspened. When a fiber is ready to
// excecuted again it is added to FActiveFiberList and the fiber weaver will
// schedule it.
FFiberList: TIdThreadSafeList;
FFreeFibersOnCompletion: Boolean;
FOnIdle: TNotifyEvent;
FOnSwitch: TIdFiberNotifyEvent;
FSelfFiber: TIdConvertedFiber;
//
procedure DoIdle;
procedure DoSwitch(AFiber: TIdFiberBase); virtual;
procedure InitComponent; override;
procedure Relinquish(
AFiber: TIdFiber;
AReschedule: Boolean
); override;
procedure ScheduleFiber(
ACurrentFiber: TIdFiberBase;
ANextFiber: TIdFiber
);
public
procedure Add(AFiber: TIdFiber); override;
destructor Destroy; override;
function HasFibers: Boolean;
function ProcessInThisThread: Boolean;
function WaitForFibers(
ATimeout: Cardinal = Infinite
): Boolean;
override;
published
property FreeFibersOnCompletion: Boolean read FFreeFibersOnCompletion
write FFreeFibersOnCompletion;
//
property OnIdle: TNotifyEvent read FOnIdle write FOnIdle;
property OnSwitch: TIdFiberNotifyEvent read FOnSwitch write FOnSwitch;
end;
EIdNoFibersToSchedule = class(EIdSilentException);
implementation
uses
SysUtils,
Windows;
{ TIdFiberWeaverInline }
procedure TIdFiberWeaverInline.Add(AFiber: TIdFiber);
begin
inherited;
AFiber.SetRelinquishHandler(Relinquish);
with FFiberList.LockList do try
Add(AFiber);
FAddEvent.SetEvent;
finally FFiberList.UnlockList; end;
end;
destructor TIdFiberWeaverInline.Destroy;
begin
FreeAndNil(FActiveFiberList);
FreeAndNil(FFiberList);
FreeAndNil(FAddEvent);
inherited;
end;
procedure TIdFiberWeaverInline.DoIdle;
begin
if Assigned(FOnIdle) then begin
FOnIdle(Self);
end;
end;
procedure TIdFiberWeaverInline.DoSwitch(AFiber: TIdFiberBase);
begin
if Assigned(FOnSwitch) then begin
FOnSwitch(Self, AFiber);
end;
end;
function TIdFiberWeaverInline.HasFibers: Boolean;
begin
Result := not FFiberList.IsCountLessThan(1);
end;
procedure TIdFiberWeaverInline.InitComponent;
begin
inherited;
FActiveFiberList := TIdThreadSafeList.Create;
FAddEvent := TEvent.Create(nil, False, False, '');
FFiberList := TIdThreadSafeList.Create;
end;
function TIdFiberWeaverInline.ProcessInThisThread: Boolean;
// Returns true if ANY fiber terminated because of an unhandled exception.
// If false, user does not need to loop through the fibers to look.
var
LFiber: TIdFiber;
LFiberList: TList;
begin
Result := False;
LFiberList := FFiberList.LockList; try
if LFiberList.Count = 0 then begin
raise EIdNoFibersToSchedule.Create('No fibers to schedule.'); {do not localize}
end;
FActiveFiberList.Assign(LFiberList);
finally FFiberList.UnlockList; end;
// This loop catches fibers as they finish. Relinquish accomplishes explicit
// switching faster by performing only one switch instead of two.
FSelfFiber := TIdConvertedFiber.Create; try
while True do begin
LFiber := TIdFiber(FFiberList.Pull);
if LFiber = nil then begin
if FActiveFiberList.IsEmpty then begin
// All fibers finished
Break;
end else begin
FAddEvent.WaitFor(Infinite);
end;
end else begin
// So it will switch back here when finished so other fibers can be
// processed.
LFiber.ParentFiber := FSelfFiber;
//
ScheduleFiber(FSelfFiber, LFiber);
// if any fiber terminated with a fatal exception return true
// Dont set it to it, else false would reset it.
if FSelfFiber.PriorFiber is TIdFiber then begin
LFiber := TIdFiber(FSelfFiber.PriorFiber);
if LFiber.FatalExceptionOccurred then begin
Result := True;
end;
// Finished fibers always switch back to parent and will not short
// circuit schedule
if LFiber.Finished then begin
FActiveFiberList.Remove(LFiber);
if FreeFibersOnCompletion then begin
FreeAndNil(LFiber);
end;
end;
end;
end;
end;
finally FreeAndNil(FSelfFiber); end;
end;
procedure TIdFiberWeaverInline.Relinquish(
AFiber: TIdFiber;
AReschedule: Boolean
);
var
LFiber: TIdFiber;
begin
while True do begin
LFiber := nil;
// Get next fiber to schedule
with FFiberList.LockList do try
if Count > 0 then begin
LFiber := TIdFiber(List[0]);
Delete(0);
if AReschedule then begin
Add(AFiber);
end;
// If no fibers to schedule, we will rerun ourself if set to reschedule
end else if AReschedule then begin
// Soft cast as a check that a converted fiber has not been passed
// with AReschedule = True
LFiber := AFiber as TIdFiber;
end;
finally FFiberList.UnlockList; end;
if LFiber = nil then begin
// If there are no fibers to schedule, that means we are waiting on
// ourself, or another relinquished fiber. Wait for one to get readded
// to list.
//
//TODO: Allow a parameter for timeout and call DoIdle
//TODO: Better yet - integrate with AntiFreeze also
DoIdle;
FAddEvent.WaitFor(Infinite);
end else if LFiber = AFiber then begin
// If the next fiber is ourself, simply exit to return to ourself
Break;
end else if LFiber <> nil then begin
// Must set the parent fiber to self so that when it finishes we get
// control again. The main ProcessInThisThread loop does this, but
// only for ones it first starts. Fibers can get added to the list and
// then scheduled here in this short circuit switch. When they finish
// they will have no parent fiber.
LFiber.ParentFiber := FSelfFiber;
ScheduleFiber(AFiber, LFiber);
// If we get switched back to, we have been scheduled so exit
Break;
end;
end;
// For future expansion when can switch between weavers
AFiber.SetRelinquishHandler(Relinquish);
end;
procedure TIdFiberWeaverInline.ScheduleFiber(
ACurrentFiber: TIdFiberBase;
ANextFiber: TIdFiber
);
begin
DoSwitch(ANextFiber);
ACurrentFiber.SwitchTo(ANextFiber);
end;
function TIdFiberWeaverInline.WaitForFibers(
ATimeout: Cardinal = Infinite
): Boolean;
begin
if not FFiberList.IsEmpty then begin
Result := True;
end else begin
Result := (FAddEvent.WaitFor(ATimeout) = wrSignaled) and not FFiberList.IsEmpty;
end;
end;
end.

View File

@@ -0,0 +1,135 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.4 6/11/2004 8:39:56 AM DSiders
Added "Do not Localize" comments.
Rev 1.3 2004-04-23 19:46:52 Mattias
TTempThread now uses WaitForFibers instead of sleep
Rev 1.2 2004.04.22 11:45:18 PM czhower
Bug fixes
Rev 1.1 2004.02.09 9:16:40 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:38:54 AM czhower
Move
Rev 1.2 2003.10.21 12:19:22 AM czhower
TIdTask support and fiber bug fixes.
Rev 1.1 2003.10.19 4:38:32 PM czhower
Updates
}
unit IdFiberWeaverThreaded;
interface
uses
Classes,
IdFiberWeaverInline,
IdThread, IdSchedulerOfThread, IdFiberWeaver, IdFiber;
type
TTempThread = class(TIdThread)
protected
FFiberWeaver: TIdFiberWeaverInline;
//
procedure AfterRun; override;
procedure BeforeRun; override;
procedure Run; override;
end;
TIdFiberWeaverThreaded = class(TIdFiberWeaver)
protected
FThreadScheduler: TIdSchedulerOfThread;
FTempThread: TTempThread;
//
procedure InitComponent; override;
public
procedure Add(
AFiber: TIdFiber
); override;
destructor Destroy;
override;
published
property ThreadScheduler: TIdSchedulerOfThread read FThreadScheduler
write FThreadScheduler;
end;
implementation
uses
SysUtils;
{ TTempThread }
procedure TTempThread.AfterRun;
begin
inherited;
FreeAndNil(FFiberWeaver);
end;
procedure TTempThread.BeforeRun;
begin
inherited;
//TODO: Make this pluggable at run time? depends where threads come
//from - merge to scheduler? Base is in IdFiber though....
FFiberWeaver := TIdFiberWeaverInline.Create(nil);
FFiberWeaver.FreeFibersOnCompletion := True;
end;
procedure TTempThread.Run;
begin
//TODO: Temp hack
if FFiberWeaver.HasFibers then begin
FFiberWeaver.ProcessInThisThread;
end else begin
//Sleep(50);
FFiberWeaver.WaitForFibers(50);
end;
end;
{ TIdFiberWeaverThreaded }
procedure TIdFiberWeaverThreaded.Add(AFiber: TIdFiber);
begin
FTempThread.FFiberWeaver.Add(AFiber);
end;
destructor TIdFiberWeaverThreaded.Destroy;
begin
// is only created at run time
if FTempThread <> nil then begin
FTempThread.TerminateAndWaitFor;
FreeAndNil(FTempThread);
end;
inherited;
end;
procedure TIdFiberWeaverThreaded.InitComponent;
begin
inherited;
if not (csDesigning in ComponentState) then begin
FTempThread := TTempThread.Create(False, True, 'TIdSchedulerOfFiber Temp'); {do not localize}
end;
end;
end.

View File

@@ -0,0 +1,859 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.6 9/16/2004 8:11:40 PM JPMugaas
Should compile again.
Rev 1.5 6/11/2004 8:39:58 AM DSiders
Added "Do not Localize" comments.
Rev 1.4 2004.05.06 1:47:26 PM czhower
Now uses IndexOf
Rev 1.3 2004.04.13 10:37:56 PM czhower
Updates
Rev 1.2 2004.03.07 11:46:08 AM czhower
Flushbuffer fix + other minor ones found
Rev 1.1 2004.02.09 9:16:44 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:38:56 AM czhower
Move
Rev 1.6 2003.10.24 10:37:38 AM czhower
IdStream
Rev 1.5 2003.10.19 4:38:32 PM czhower
Updates
Rev 1.4 2003.10.19 2:50:40 PM czhower
Fiber cleanup
Rev 1.3 2003.10.14 11:17:02 PM czhower
Updates to match core changes.
Rev 1.2 2003.10.11 5:43:30 PM czhower
Chained servers now functional.
Rev 1.1 2003.09.19 10:09:40 PM czhower
Next stage of fiber support in servers.
Rev 1.0 8/16/2003 11:09:08 AM JPMugaas
Moved from Indy Core dir as part of package reorg
Rev 1.49 2003.07.17 4:42:06 PM czhower
More IOCP improvements.
Rev 1.45 2003.07.14 11:46:46 PM czhower
IOCP now passes all bubbles.
Rev 1.43 2003.07.14 1:10:52 AM czhower
Now passes all bubble tests for chained stack.
Rev 1.41 7/7/2003 1:34:06 PM BGooijen
Added WriteFile(...)
Rev 1.40 7/3/2003 2:03:52 PM BGooijen
IOCP works server-side now
Rev 1.39 2003.06.30 5:41:54 PM czhower
-Fixed AV that occurred sometimes when sockets were closed with chains
-Consolidated code that was marked by a todo for merging as it no longer
needed to be separate
-Removed some older code that was no longer necessary
Passes bubble tests.
Rev 1.38 6/29/2003 10:56:26 PM BGooijen
Removed .Memory from the buffer, and added some extra methods
Rev 1.37 2003.06.25 4:30:02 PM czhower
Temp hack fix for AV problem. Working on real solution now.
Rev 1.36 6/24/2003 11:17:44 PM BGooijen
change in TIdIOHandlerChain.ReadLn, LTermPos= 0 is now handled differently
Rev 1.35 23/6/2003 22:33:18 GGrieve
fix CheckForDataOnSource - specify timeout
Rev 1.34 6/22/2003 11:22:22 PM JPMugaas
Should now compile.
Rev 1.33 6/4/2003 1:08:40 AM BGooijen
Added CheckForDataOnSource and removed some (duplicate) code
Rev 1.32 6/3/2003 8:07:20 PM BGooijen
Added TIdIOHandlerChain.AllData
Rev 1.31 5/11/2003 2:37:58 PM BGooijen
Bindings are updated now
Rev 1.30 5/11/2003 12:00:08 PM BGooijen
Rev 1.29 5/11/2003 12:03:16 AM BGooijen
Rev 1.28 2003.05.09 10:59:24 PM czhower
Rev 1.27 2003.04.22 9:48:50 PM czhower
Rev 1.25 2003.04.17 11:01:14 PM czhower
Rev 1.19 2003.04.10 10:51:04 PM czhower
Rev 1.18 4/2/2003 3:39:26 PM BGooijen
Added Intercepts
Rev 1.17 3/29/2003 5:53:52 PM BGooijen
added AfterAccept
Rev 1.16 3/27/2003 2:57:58 PM BGooijen
Added a RawWrite for streams, implemented WriteStream, changed
WriteToDestination to use TIdWorkOpUnitWriteBuffer
Rev 1.15 2003.03.26 12:20:28 AM czhower
Moved visibility of execute to protected.
Rev 1.14 3/25/2003 11:07:58 PM BGooijen
ChainEngine descends now from TIdBaseComponent
Rev 1.13 3/25/2003 01:33:48 AM JPMugaas
Fixed compiler warnings.
Rev 1.12 3/24/2003 11:03:50 PM BGooijen
Various fixes to readln:
- uses connection default now
- doesn't raise an exception on timeout any more
Rev 1.11 2003.03.13 1:22:58 PM czhower
Typo fixed. lenth --> Length
Rev 1.10 3/13/2003 10:18:20 AM BGooijen
Server side fibers, bug fixes
Rev 1.9 3/2/2003 12:36:22 AM BGooijen
Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
ReadBuffer doesn't use ReadStream any more.
TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
exceptions).
woReadLn doesn't check the intire buffer any more, but continued where it
stopped the last time.
Added basic support for timeouts (probably only on read operations, and maybe
connect), accuratie of timeout is currently 500msec.
Rev 1.8 2/28/2003 10:15:16 PM BGooijen
bugfix: changed some occurrences of FRecvBuffer to FInputBuffer
Rev 1.7 2/27/2003 10:11:12 PM BGooijen
Rev 1.6 2/26/2003 1:08:52 PM BGooijen
Rev 1.5 2/25/2003 10:36:28 PM BGooijen
Added more opcodes, methods, and moved opcodes to separate files.
Rev 1.4 2003.02.25 9:02:32 PM czhower
Hand off to Bas
Rev 1.3 2003.02.25 1:36:04 AM czhower
Rev 1.2 2002.12.11 11:00:58 AM czhower
Rev 1.1 2002.12.07 12:26:06 AM czhower
Rev 1.0 11/13/2002 08:45:00 AM JPMugaas
}
unit IdIOHandlerChain;
interface
uses
Classes,
IdBaseComponent, IdBuffer, IdGlobal, IdIOHandler, IdIOHandlerSocket,
IdFiber, IdThreadSafe, IdWorkOpUnit, IdStackConsts, IdWinsock2, IdThread,
IdFiberWeaver, IdStream, IdStreamVCL,
Windows;
type
TIdConnectMode = (cmNonBlock, cmIOCP);
TIdIOHandlerChain = class;
TIdChainEngineThread = class;
TIdChainEngine = class(TIdBaseComponent)
protected
FCompletionPort: THandle;
FThread: TIdChainEngineThread;
//
procedure Execute;
function GetInputBuffer(const AIOHandler: TIdIOHandler): TIdBuffer;
procedure InitComponent; override;
procedure SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
procedure Terminating;
public
procedure AddWork(AWorkOpUnit: TIdWorkOpUnit);
procedure BeforeDestruction; override;
destructor Destroy; override;
procedure RemoveSocket(AIOHandler: TIdIOHandlerChain);
procedure SocketAccepted(AIOHandler: TIdIOHandlerChain);
end;
TIdIOHandlerChain = class(TIdIOHandlerSocket)
protected
FChainEngine: TIdChainEngine;
FConnectMode: TIdConnectMode;
FFiber: TIdFiber;
FFiberWeaver: TIdFiberWeaver;
FOverlapped: PIdOverlapped;
//
procedure ConnectClient; override;
procedure QueueAndWait(
AWorkOpUnit: TIdWorkOpUnit;
ATimeout: Integer = IdTimeoutDefault;
AFreeWorkOpUnit: Boolean = True;
AAllowGracefulException: Boolean = True
);
procedure WorkOpUnitCompleted(
AWorkOpUnit: TIdWorkOpUnit
);
public
procedure AfterAccept; override;
function AllData: string; override;
procedure CheckForDataOnSource(
ATimeout : Integer = 0
); override;
procedure CheckForDisconnect(
ARaiseExceptionIfDisconnected: Boolean = True;
AIgnoreBuffer: Boolean = False
); override;
constructor Create(
AOwner: TComponent;
AChainEngine: TIdChainEngine;
AFiberWeaver: TIdFiberWeaver;
AFiber: TIdFiber
); reintroduce; virtual;
destructor Destroy; override;
procedure Open; override;
function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean = True;
ATimeout: Integer = IdTimeoutDefault;
ARaiseExceptionOnTimeout: Boolean = True): Integer; override;
procedure ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
AReadUntilDisconnect: Boolean); override;
// TODO: Allow ReadBuffer to by pass the internal buffer. Will it really
// help? Only ReadBuffer would be able to use this optimiztion in most
// cases and it is not used by many. Most calls are to stream (disk) based
// or strings as ReadLn.
procedure ReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True);
override;
function ReadLn(
ATerminator: string = LF;
ATimeout: Integer = IdTimeoutDefault;
AMaxLineLength: Integer = -1
): string;
override;
// function WriteFile(
// AFile: string;
// AEnableTransferFile: Boolean
// ): Cardinal; override;
function WriteFile(
const AFile: String;
AEnableTransferFile: Boolean): Int64; override;
{ procedure Write(
AStream: TIdStream;
ASize: Integer = 0;
AWriteByteCount: Boolean = False);
override; }
procedure Write(
AStream: TIdStreamVCL;
ASize: Int64 = 0;
AWriteByteCount: Boolean = False
); override;
procedure WriteDirect(
ABuffer: TIdBytes
); override;
//
property ConnectMode: TIdConnectMode read FConnectMode write FConnectMode;
property Overlapped: PIdOverlapped read FOverlapped;
end;
TIdChainEngineThread = class(TIdThread)
protected
FChainEngine: TIdChainEngine;
public
constructor Create(
AOwner: TIdChainEngine;
const AName: string
); reintroduce;
procedure Run; override;
property Terminated;
end;
implementation
uses
IdComponent, IdException, IdExceptionCore, IdStack, IdResourceStrings, IdWorkOpUnits,
IdStackWindows,
SysUtils;
const
GCompletionKeyTerminate = $F0F0F0F0;
{ TIdIOHandlerChain }
procedure TIdIOHandlerChain.CheckForDataOnSource(ATimeout: Integer = 0);
begin
// TODO: Change this so we dont have to rely on an exception trap
try
QueueAndWait(TIdWorkOpUnitReadAvailable.Create, ATimeout, True, False);
except
on E: EIdReadTimeout do begin
// Nothing
end else begin
raise;
end;
end;
end;
procedure TIdIOHandlerChain.ConnectClient;
begin
// TODO: Non blocking does not support Socks
Binding.OverLapped := (ConnectMode = cmIOCP);
inherited;
case ConnectMode of
cmNonBlock: begin
//TODO: Non blocking DNS resolution too?
Binding.SetPeer(GWindowsStack.ResolveHost(Host), Port);
GWindowsStack.SetBlocking(Binding.Handle, False);
// Does not block
Binding.Connect;
end;
cmIOCP: begin
//TODO: For now we are doing blocking, just to get it to work. fix later
// IOCP was not designed for connects, so we'll have to do some monkeying
// maybe even create an engine thread just to watch for connect events.
//TODO: Resolution too?
Binding.SetPeer(GStack.ResolveHost(Host), Port);
Binding.Connect;
GWindowsStack.SetBlocking(Binding.Handle, False);
end;
else begin
raise EIdException.Create('Unrecognized ConnectMode'); {do not localize}
end;
end;
QueueAndWait(TIdWorkOpUnitWaitConnected.Create);
//Update the bindings
Binding.UpdateBindingLocal;
//TODO: Could Peer binding ever be other than what we specified above? Need to reread it?
Binding.UpdateBindingPeer;
end;
procedure TIdIOHandlerChain.AfterAccept;
begin
FChainEngine.SocketAccepted(self);
end;
procedure TIdIOHandlerChain.Open;
begin
// Things before inherited, inherited actually connects and ConnectClient
// needs these things
inherited;
end;
procedure TIdIOHandlerChain.CheckForDisconnect(
ARaiseExceptionIfDisconnected: Boolean; AIgnoreBuffer: Boolean);
var
LDisconnected: Boolean;
begin
// ClosedGracefully // Server disconnected
// IOHandler = nil // Client disconnected
if ClosedGracefully then begin
if BindingAllocated then begin
Close;
// Call event handlers to inform the user program that we were disconnected
// DoStatus(hsDisconnected);
//DoOnDisconnected;
end;
LDisconnected := True;
end else begin
LDisconnected := not BindingAllocated;
end;
if LDisconnected then begin
// Do not raise unless all data has been read by the user
if Assigned(FInputBuffer) then begin
if ((FInputBuffer.Size = 0) or AIgnoreBuffer)
and ARaiseExceptionIfDisconnected then begin
RaiseConnClosedGracefully;
end;
end;
end;
end;
function TIdIOHandlerChain.ReadFromSource(
ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer;
ARaiseExceptionOnTimeout: Boolean): Integer;
begin
Result := 0;
raise EIdException.Create('Fall through error in ' + ClassName); {do not localize}
end;
procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
AReadUntilDisconnect: Boolean);
begin
if AReadUntilDisconnect then begin
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
, True, False);
end else begin
QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
end;
end;
procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
AByteCount: Integer; AAppend: Boolean = True);
begin
EIdException.IfFalse(AByteCount >= 0);
if AByteCount > 0 then begin
if FInputBuffer.Size < AByteCount then begin
QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
end;
Assert(FInputBuffer.Size >= AByteCount);
FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
end;
end;
function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
var
LTermPos: Integer;
begin
if AMaxLineLength = -1 then begin
AMaxLineLength := MaxLineLength;
end;
// User may pass '' if they need to pass arguments beyond the first.
if ATerminator = '' then begin
ATerminator := LF;
end;
FReadLnSplit := False;
FReadLnTimedOut := False;
try
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
if (LTermPos = 0) and ((AMaxLineLength = 0)
or (FInputBuffer.Size < AMaxLineLength)) then begin
QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
, ATimeout);
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
end;
// LTermPos cannot be 0, and the code below can't handle it properly
Assert(LTermPos > 0);
if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
case FMaxLineAction of
// TODO: find the right exception class here
maException: raise EIdException.Create('MaxLineLength exceded'); {do not localize}
maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
end;
end else begin
Result := FInputBuffer.Extract(LTermPos - 1);
if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
Delete(Result, Length(Result), 1);
end;
FInputBuffer.Extract(Length(ATerminator));// remove the terminator
end;
except on E: EIdReadTimeout do
FReadLnTimedOut := True;
end;
end;
function TIdIOHandlerChain.AllData: string;
var
LStream: TStringStream;
begin
BeginWork(wmRead); try
Result := '';
LStream := TStringStream.Create(''); try
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
, True, False);
Result := LStream.DataString;
finally FreeAndNil(LStream); end;
finally EndWork(wmRead); end;
end;
function TIdIOHandlerChain.WriteFile(
const AFile: String;
AEnableTransferFile: Boolean): Int64;
var
LWO:TIdWorkOpUnitWriteFile;
begin
//BGO: we ignore AEnableTransferFile for now
Result := 0;
// if not Assigned(Intercept) then begin
LWO := TIdWorkOpUnitWriteFile.Create(AFile);
try
QueueAndWait(LWO,IdTimeoutDefault, false);
finally
// Result := LWO.BytesSent;
FreeAndNil(LWO);
end;
// end else begin
// inherited WriteFile(AFile, AEnableTransferFile);
// end;
end;
procedure TIdIOHandlerChain.Write(
AStream: TIdStreamVCL;
ASize: Int64 = 0;
AWriteByteCount: Boolean = False
);
var
LStart: Integer;
LThisSize: Integer;
begin
if ASize < 0 then begin //"-1" All form current position
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
AStream.VCLStream.Seek(LStart, soFromBeginning);
end else if ASize = 0 then begin //"0" ALL
LStart := 0;
ASize := AStream.VCLStream.Seek(0, soFromEnd);
AStream.VCLStream.Seek(0, soFromBeginning);
end else begin //else ">0" ACount bytes
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
end;
if AWriteByteCount then begin
Write(ASize);
end;
// BeginWork(wmWrite, ASize);
try
while ASize > 0 do begin
LThisSize := Min(128 * 1024, ASize); // 128K blocks
QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
, False));
Dec(ASize, LThisSize);
Inc(LStart, LThisSize);
end;
finally
// EndWork(wmWrite);
end;
end;
procedure TIdIOHandlerChain.WriteDirect(
ABuffer: TIdBytes
);
begin
QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
end;
procedure TIdIOHandlerChain.QueueAndWait(
AWorkOpUnit: TIdWorkOpUnit;
ATimeout: Integer = IdTimeoutDefault;
AFreeWorkOpUnit: Boolean = True;
AAllowGracefulException: Boolean = True
);
var
LWorkOpUnit: TIdWorkOpUnit;
begin
try
CheckForDisconnect(AAllowGracefulException);
LWorkOpUnit := AWorkOpUnit;
//
if ATimeout = IdTimeoutInfinite then begin
LWorkOpUnit.TimeOutAt := 0;
end else begin
if ATimeout = IdTimeoutDefault then begin
if FReadTimeout <= 0 then begin
LWorkOpUnit.TimeOutAt := 0;
end else begin
//we type cast FReadTimeOut as a cardinal to prevent the compiler from
//expanding vars to an Int64 type. That can incur a performance penalty.
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
end
end else begin
//FReadTimeOut is typecase as a cardinal to prevent the compiler from
//expanding vars to an Int64 type which can incur a performance penalty.
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
end
end;
//
LWorkOpUnit.Fiber := FFiber;
LWorkOpUnit.IOHandler := Self;
LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
LWorkOpUnit.SocketHandle := Binding.Handle;
// Add to queue and wait to be rescheduled when work is completed
FChainEngine.AddWork(LWorkOpUnit);
// Check to see if we need to reraise an exception
LWorkOpUnit.RaiseException;
// Check for timeout
if LWorkOpUnit.TimedOut then begin
raise EIdReadTimeout.Create('Timed out'); {do not localize}
end;
// Check to see if it was closed during this operation
CheckForDisconnect(AAllowGracefulException);
finally
if AFreeWorkOpUnit then begin
AWorkOpUnit.Free;
end;
end;
end;
constructor TIdIOHandlerChain.Create(
AOwner: TComponent;
AChainEngine: TIdChainEngine;
AFiberWeaver: TIdFiberWeaver;
AFiber: TIdFiber
);
begin
inherited Create(AOwner);
//
EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
FChainEngine := AChainEngine;
FChainEngine.SetIOHandlerOptions(Self);
//
EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
FFiberWeaver := AFiberWeaver;
//
EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
FFiber := AFiber;
// Initialize Overlapped structure
New(FOverlapped);
ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
New(FOverlapped.Buffer);
end;
procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
begin
FFiberWeaver.Add(AWorkOpUnit.Fiber);
end;
destructor TIdIOHandlerChain.Destroy;
begin
// Tell the chain engine that we are closing and to remove any references to
// us and cease any usage.
// Do not do this in close, it can cause deadlocks because the engine can
// call close while in its Execute.
FChainEngine.RemoveSocket(Self);
Dispose(FOverlapped.Buffer);
Dispose(FOverlapped);
inherited;
end;
{ TIdChainEngine }
procedure TIdChainEngine.BeforeDestruction;
begin
if FThread <> nil then begin
// Signal thread for termination
FThread.Terminate;
// Tell the engine we are attempting termination
Terminating;
// Wait for the thread to terminate
FThread.WaitFor;
// Free thread
FreeAndNil(FThread);
end;
inherited;
end;
function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
begin
Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
end;
procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
begin
AIOHandler.ConnectMode := cmIOCP;
end;
procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
begin
// Associate the socket with the completion port.
if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
= 0 then begin
RaiseLastOSError;
end;
end;
procedure TIdChainEngine.Terminating;
begin
if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
, nil) then begin
RaiseLastOSError;
end;
end;
procedure TIdChainEngine.Execute;
var
LBytesTransferred: DWord;
LCompletionKey: DWord;
LOverlapped: PIdOverlapped;
begin
// Wait forever on the completion port. If we are terminating, a terminate
// signal is sent into the queue.
if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
, LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
if LCompletionKey <> GCompletionKeyTerminate then begin
// Socket has been closed
if LBytesTransferred = 0 then begin
LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
end;
LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
end;
end;
end;
procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
begin
// raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket');
end;
procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
begin
if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
// Associate the socket with the completion port.
if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
= 0 then begin
RaiseLastOSError;
end;
AWorkOpUnit.Complete;
end;
AWorkOpUnit.Start;
end;
destructor TIdChainEngine.Destroy;
begin
if CloseHandle(FCompletionPort) = False then begin
RaiseLastOSError;
end;
inherited;
end;
procedure TIdChainEngine.InitComponent;
begin
{
var SysInfo: TSystemInfo;
GetSystemInfo(SysInfo);
SysInfo.dwNumberOfProcessors
Use GetSystemInfo instead. It will return the all info on the local
system's architecture and will also return a valid ActiveProcessorMask
which is a DWORD to be read as a bit array of the processor on the
system...
CZH> And next
CZH> question - any one know off hand how to set affinity? :)
Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
on wether you want to act on the whole process or just a single
thread (SetThreadIdealProcessor is another way to do it: it just gives
the scheduler a hint about where to run a thread without forcing it:
good for keeping two threads doing IO one with each other on the same
processor).
}
inherited;
if not (csDesigning in ComponentState) then begin
// Cant use .Name, its not initialized yet in Create
FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
end;
//MS says destruction is automatic, but Google seems to say that this initial
//one is not auto managed as MS says, and that CloseHandle should be called.
FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if FCompletionPort = 0 then begin
RaiseLastOSError;
end;
end;
{ TIdChainEngineThread }
constructor TIdChainEngineThread.Create(
AOwner: TIdChainEngine;
const AName: string
);
begin
FChainEngine := AOwner;
inherited Create(False, True, AName);
end;
(*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
var
LPOverlapped: PIdOverlapped;
LHFile:THandle;
begin
New(LPOverlapped);
ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
New(LPOverlapped^.Buffer);
LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
if LHFile=INVALID_HANDLE_VALUE then begin
RaiseLastOSError;
end;
try
if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
AWorkOpUnit.Fiber.Relinquish;
end else begin
raise EIdException.Create('error in ServiceQueryTransmitFile');
end;
finally
CloseHandle(LHFile);
end;
end;
*)
(*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
begin
ADst.IOHandler := ASrc.IOHandler;
ADst.Fiber := ASrc.Fiber;
ADst.OnCompleted := ASrc.OnCompleted;
ADst.SocketHandle:= ASrc.SocketHandle;
end;
var
LStream:TfileStream;
LWorkOpUnit : TIdWorkOpUnitWriteStream;
LBuf:pointer;
LBufLen:integer;
begin
Assert(False, 'to do');
LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
try
LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
try
CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
LBufLen:=Min(LStream.size,128*1024);
getmem(LBuf,LBufLen);
LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
finally
AWorkOpUnit.BytesSent := LStream.Size;
LWorkOpUnit.free;
end;
finally
LStream.free;
end;
end;
*)
procedure TIdChainEngineThread.Run;
begin
FChainEngine.Execute;
end;
end.

View File

@@ -0,0 +1,153 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.2 6/11/2004 8:40:06 AM DSiders
Added "Do not Localize" comments.
Rev 1.1 2004.02.09 9:16:48 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:38:58 AM czhower
Move
Rev 1.9 2003.10.24 1:00:06 PM czhower
Name change
Rev 1.8 2003.10.21 12:19:20 AM czhower
TIdTask support and fiber bug fixes.
Rev 1.7 2003.10.19 4:38:34 PM czhower
Updates
Rev 1.6 2003.10.19 2:50:40 PM czhower
Fiber cleanup
Rev 1.5 2003.10.19 1:04:28 PM czhower
Updates
Rev 1.4 2003.10.14 11:17:06 PM czhower
Updates to match core changes.
Rev 1.3 2003.10.11 5:43:48 PM czhower
Chained servers now functional.
Rev 1.2 2003.09.19 10:09:42 PM czhower
Next stage of fiber support in servers.
}
unit IdSchedulerOfFiber;
interface
uses
Classes,
IdFiberWeaver, IdTask, IdFiber, IdScheduler, IdYarn;
type
TIdSchedulerOfFiber = class;
TIdYarnOfFiber = class(TIdYarn)
protected
FFiber: TIdFiberWithTask;
FScheduler: TIdScheduler;
public
constructor Create(
AScheduler: TIdScheduler;
AFiber: TIdFiberWithTask
); reintroduce; virtual;
destructor Destroy;
override;
//
property Fiber: TIdFiberWithTask read FFiber;
end;
TIdSchedulerOfFiber = class(TIdScheduler)
protected
FFiberWeaver: TIdFiberWeaver;
public
function AcquireYarn
: TIdYarn;
override;
procedure StartYarn(
AYarn: TIdYarn;
ATask: TIdTask
); override;
procedure TerminateYarn(
AYarn: TIdYarn
); override;
published
//TODO: Need to add notification for this prop
//TODO: Dont allow setting while active
property FiberWeaver: TIdFiberWeaver read FFiberWeaver write FFiberWeaver;
end;
implementation
uses
IdGlobal,
SysUtils;
{ TIdSchedulerOfFiber }
function TIdSchedulerOfFiber.AcquireYarn: TIdYarn;
var
LFiber: TIdFiberWithTask;
begin
LFiber := TIdFiberWithTask.Create(nil, nil, Format('%s User', [Name])); {do not localize}
Result := TIdYarnOfFiber.Create(Self, LFiber);
ActiveYarns.Add(Result);
end;
procedure TIdSchedulerOfFiber.StartYarn(
AYarn: TIdYarn;
ATask: TIdTask
);
begin
inherited;
TIdYarnOfFiber(AYarn).Fiber.Task := ATask;
// Last - Put it in the queue to be scheduled
Assert(FiberWeaver<>nil);
FiberWeaver.Add(TIdYarnOfFiber(AYarn).Fiber);
end;
procedure TIdSchedulerOfFiber.TerminateYarn(AYarn: TIdYarn);
begin
// Fibers dont "run", so we dont terminate them
FreeAndNil(AYarn);
end;
{ TIdYarnOfFiber }
constructor TIdYarnOfFiber.Create(
AScheduler: TIdScheduler;
AFiber: TIdFiberWithTask
);
begin
inherited Create;
FScheduler := AScheduler;
FFiber := AFiber;
AFiber.Yarn := Self;
end;
destructor TIdYarnOfFiber.Destroy;
begin
FScheduler.ReleaseYarn(Self);
inherited;
end;
end.

View File

@@ -0,0 +1,173 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.2 6/11/2004 8:40:06 AM DSiders
Added "Do not Localize" comments.
Rev 1.1 2004.02.09 9:16:50 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:39:00 AM czhower
Move
Rev 1.6 2003.10.19 4:38:34 PM czhower
Updates
Rev 1.5 2003.10.19 2:51:10 PM czhower
Fiber cleanup
Rev 1.4 2003.10.14 11:17:10 PM czhower
Updates to match core changes.
Rev 1.3 2003.10.11 5:43:56 PM czhower
Chained servers now functional.
Rev 1.2 2003.09.19 10:09:42 PM czhower
Next stage of fiber support in servers.
Rev 1.1 2003.09.18 5:54:32 PM czhower
TIdYarnFix
Rev 1.0 8/16/2003 11:09:02 AM JPMugaas
Moved from Indy Core dir as part of package reorg
Rev 1.6 7/6/2003 8:04:08 PM BGooijen
Renamed IdScheduler* to IdSchedulerOf*
Rev 1.5 4/11/2003 01:09:54 PM JPMugaas
Rev 1.4 3/29/2003 5:55:02 PM BGooijen
now calls AfterAccept
Rev 1.3 3/27/2003 12:51:30 PM BGooijen
changed for IdSchedulerFiberBase
Rev 1.2 3/25/2003 11:05:30 PM BGooijen
The ChainEngine is now a property
Rev 1.1 3/23/2003 11:30:26 PM BGooijen
Moved a lot of code to IdSchedulerFiber, added MakeClientIOHandler
Rev 1.0 3/13/2003 11:51:14 AM BGooijen
Initial check in
}
unit IdServerIOHandlerChain;
interface
uses
IdServerIOHandler, IdIOHandlerChain, IdYarn,
IdSocketHandle, IdThread, IdIOHandler, IdScheduler, IdFiber,
Classes;
type
TIdServerIOHandlerChain = class(TIdServerIOHandler)
protected
FChainEngine: TIdChainEngine;
public
function Accept(
ASocket: TIdSocketHandle;
AListenerThread: TIdThread;
AYarn: TIdYarn
): TIdIOHandler;
override;
function MakeClientIOHandler(
AYarn: TIdYarn
): TIdIOHandler;
override;
procedure SetScheduler(
AScheduler: TIdScheduler
); override;
published
//TODO: Need to add notification for this prop
property ChainEngine: TIdChainEngine read FChainEngine write FChainEngine;
end;
implementation
uses
IdGlobal, IdSchedulerOfFiber, IdException, IdFiberWeaver,
SysUtils;
procedure TIdServerIOHandlerChain.SetScheduler(
AScheduler: TIdScheduler
);
begin
if AScheduler <> nil then begin
EIdException.IfFalse(AScheduler is TIdSchedulerOfFiber
, 'Scheduler not a fiber scheduler'); {do not localize}
end;
FScheduler := AScheduler;
end;
function TIdServerIOHandlerChain.Accept(
ASocket: TIdSocketHandle;
AListenerThread: TIdThread;
AYarn: TIdYarn
): TIdIOHandler;
var
LIOHandler: TIdIOHandlerChain;
begin
EIdException.IfNotAssigned(FChainEngine, 'No ChainEngine defined.'); {do not localize}
LIOHandler := TIdIOHandlerChain.Create(nil, FChainEngine
//TODO: Can remove this cast later
, TIdFiberWeaver(TIdSchedulerOfFiber(FScheduler).FiberWeaver)
, TIdYarnOfFiber(AYarn).Fiber);
LIOHandler.Open;
Result := nil;
if AListenerThread <> nil then begin
while not AListenerThread.Stopped do try
if ASocket.Select(100) then begin // Wait for 100 ms
if LIOHandler.Binding.Accept(ASocket.Handle) then begin
LIOHandler.AfterAccept;
Result := LIOHandler;
Exit;
end else begin
FreeAndNil(LIOHandler);
Exit;
end;
end;
finally
if AListenerThread.Stopped then begin
FreeAndNil(LIOHandler);
end;
end;
end else begin
// Old way for compatibility
if LIOHandler.Binding.Accept(ASocket.Handle) then begin
Result := LIOHandler;
Exit;
end else begin
FreeAndNil(LIOHandler);
end;
end;
end;
function TIdServerIOHandlerChain.MakeClientIOHandler(
AYarn: TIdYarn
): TIdIOHandler;
begin
Result := TIdIOHandlerChain.Create(nil, FChainEngine
//TODO: CAn remove this cast later.
, TIdFiberWeaver(TIdSchedulerOfFiber(FScheduler).FiberWeaver)
, TIdYarnOfFiber(AYarn).Fiber);
end;
end.

Binary file not shown.

View File

@@ -0,0 +1,96 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.1 9/15/2004 5:04:20 PM DSiders
Added localization comments.
Rev 1.0 2004.02.03 12:39:04 AM czhower
Move
Rev 1.5 2003.10.19 4:38:36 PM czhower
Updates
Rev 1.4 2003.10.19 2:50:40 PM czhower
Fiber cleanup
Rev 1.3 2003.10.19 1:04:28 PM czhower
Updates
Rev 1.2 2003.08.20 1:45:14 PM czhower
Fixes.
Rev 1.1 8/19/2003 12:16:32 PM JPMugaas
Should now compile in new packages.
Rev 1.0 8/16/2003 11:03:52 AM JPMugaas
Moved from Core as part of a package reorganization
}
unit IdSuperCoreRegister;
interface
uses
Classes;
{
Note: We separate this from IdCoreRegister because in Delphi 7,
these will be in a separate package. This is particularly important as
some of this is in a different stage of development than most of Indy 10.
}
procedure Register;
implementation
uses
IdDsnCoreResourceStrings,
IdFiberWeaverInline,
IdIOHandlerChain,
IdServerIOHandlerChain,
IdFiberWeaverThreaded,
IdSchedulerOfFiber;
{$I ..\Core\IdCompilerDefines.inc}
{$IFDEF DOTNET}
{$R IconsDotNet\TIdChainEngine.bmp}
{$R IconsDotNet\TIdFiberWeaverInline.bmp}
{$R IconsDotNet\TIdFiberWeaverThreaded.bmp}
{$R IconsDotNet\TIdSchedulerOfFiber.bmp}
{$R IconsDotNet\TIdServerIOHandlerChain.bmp}
{$ELSE}
{$IFDEF Borland}
{$R IdSuperCoreRegister.dcr}
{$ELSE}
{$R IdSuperCoreRegisterCool.dcr}
{$ENDIF}
{$ENDIF}
procedure Register;
begin
RegisterComponents('Indy Super Core', {do not localize}
[ TIdChainEngine,
TIdFiberWeaverInline,
TIdFiberWeaverThreaded,
TIdSchedulerOfFiber,
TIdServerIOHandlerChain
]);
end;
end.

Binary file not shown.

View File

@@ -0,0 +1,334 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.2 6/11/2004 8:40:10 AM DSiders
Added "Do not Localize" comments.
Rev 1.1 2004.02.09 9:16:54 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:39:08 AM czhower
Move
Rev 1.17 2003.10.19 2:50:42 PM czhower
Fiber cleanup
Rev 1.16 2003.10.11 5:44:02 PM czhower
Chained servers now functional.
Rev 1.15 2003.07.17 4:42:06 PM czhower
More IOCP improvements.
Rev 1.14 2003.07.17 3:55:18 PM czhower
Removed IdIOChainEngineIOCP and merged it into TIdChaingEngine in
IdIOHandlerChain.pas.
Rev 1.10 2003.07.14 12:54:32 AM czhower
Fixed graceful close detection if it occurs after connect.
Rev 1.9 2003.07.10 7:40:24 PM czhower
Comments
Rev 1.8 7/5/2003 11:47:12 PM BGooijen
Added TIdWorkOpUnitCheckForDisconnect and TIdWorkOpUnitWriteFile
Rev 1.7 4/23/2003 8:22:20 PM BGooijen
Rev 1.6 2003.04.22 9:48:50 PM czhower
Rev 1.5 2003.04.20 9:12:20 PM czhower
Rev 1.5 2003.04.19 3:14:14 PM czhower
Rev 1.4 2003.04.17 7:45:02 PM czhower
Rev 1.2 3/27/2003 2:43:04 PM BGooijen
Added woWriteStream and woWriteBuffer
Rev 1.1 3/2/2003 12:36:24 AM BGooijen
Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
ReadBuffer doesn't use ReadStream any more.
TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
exceptions).
woReadLn doesn't check the intire buffer any more, but continued where it
stopped the last time.
Added basic support for timeouts (probably only on read operations, and maybe
connect), accuratie of timeout is currently 500msec.
Rev 1.0 2/25/2003 10:45:46 PM BGooijen
Opcode files, some of these were in IdIOHandlerChain.pas
}
unit IdWorkOpUnit;
interface
uses
IdFiber, IdIOHandlerSocket, IdStackConsts, IdWinsock2, IdGlobal,
SysUtils, Windows;
type
TIdWorkOpUnit = class;
TOnWorkOpUnitCompleted = procedure(ASender: TIdWorkOpUnit) of object;
TIdOverLapped = packed record
// Reqquired parts of structure
Internal: DWORD;
InternalHigh: DWORD;
Offset: DWORD;
OffsetHigh: DWORD;
HEvent: THandle;
// Indy parts
WorkOpUnit: TIdWorkOpUnit;
Buffer: PWSABUF; // Indy part too, we reference it and pass it to IOCP
end;
PIdOverlapped = ^TIdOverlapped;
TIdWorkOpUnit = class(TObject)
protected
FCompleted: Boolean;
FException: Exception;
FFiber: TIdFiber;
FIOHandler: TIdIOHandlerSocket;
FOnCompleted: TOnWorkOpUnitCompleted;
FSocketHandle:TIdStackSocketHandle;
FTimeOutAt: Integer;
FTimedOut: Boolean;
//
procedure DoCompleted;
virtual;
function GetOverlapped(
ABuffer: Pointer;
ABufferSize: Integer
): PIdOverlapped;
procedure Starting; virtual; abstract;
public
procedure Complete; virtual;
destructor Destroy; override;
procedure MarkComplete; virtual;
// Process is called by the chain engine when data has been processed
procedure Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
); virtual; abstract;
procedure RaiseException;
procedure Start;
//
property Completed: Boolean read FCompleted;
property Fiber: TIdFiber read FFiber write FFiber;
property IOHandler: TIdIOHandlerSocket read FIOHandler write FIOHandler;
property OnCompleted: TOnWorkOpUnitCompleted read FOnCompleted
write FOnCompleted;
property SocketHandle:TIdStackSocketHandle read FSocketHandle
write FSocketHandle;
property TimeOutAt:integer read FTimeOutAt write FTimeOutAt;
property TimedOut:boolean read FTimedOut write FTimedOut;
end;
TIdWorkOpUnitRead = class(TIdWorkOpUnit)
protected
// Used when a dynamic buffer is needed
// Since its reference managed, memory is auto cleaned up
FBytes: TIdBytes;
//
procedure Processing(
ABuffer: TIdBytes
); virtual; abstract;
procedure Starting;
override;
public
procedure Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
); override;
procedure Read;
end;
TIdWorkOpUnitWrite = class(TIdWorkOpUnit)
protected
procedure Processing(
ABytes: Integer
); virtual; abstract;
procedure Write(
ABuffer: Pointer;
ASize: Integer
);
public
procedure Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
); override;
end;
const
WOPageSize = 8192;
implementation
uses
IdException, IdIOHandlerChain, IdStack, IdStackWindows;
{ TIdWorkOpUnit }
procedure TIdWorkOpUnit.Complete;
begin
DoCompleted;
end;
destructor TIdWorkOpUnit.Destroy;
begin
FreeAndNil(FException);
inherited;
end;
procedure TIdWorkOpUnit.DoCompleted;
begin
if Assigned(OnCompleted) then begin
OnCompleted(Self);
end;
end;
procedure TIdWorkOpUnit.MarkComplete;
begin
FCompleted := True;
end;
procedure TIdWorkOpUnit.RaiseException;
var
LException: Exception;
begin
if FException <> nil then begin
LException := FException;
// We need to set this to nil so it wont be freed. Delphi will free it
// as part of its exception handling mechanism
FException := nil;
raise LException;
end;
end;
function TIdWorkOpUnit.GetOverlapped(
ABuffer: Pointer;
ABufferSize: Integer
): PIdOverlapped;
begin
Result := TIdIOHandlerChain(IOHandler).Overlapped;
with Result^ do begin
Internal := 0;
InternalHigh := 0;
Offset := 0;
OffsetHigh := 0;
HEvent := 0;
WorkOpUnit := Self;
Buffer.Buf := ABuffer;
Buffer.Len := ABufferSize;
end;
end;
procedure TIdWorkOpUnit.Start;
begin
Starting;
// This can get called after its already been marked complete. This is
// ok and the fiber scheduler handles such a situation.
Fiber.Relinquish;
end;
{ TIdWorkOpUnitWrite }
procedure TIdWorkOpUnitWrite.Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
);
begin
Processing(AByteCount);
end;
procedure TIdWorkOpUnitWrite.Write(ABuffer: Pointer;
ASize: Integer);
var
LFlags: DWORD;
LOverlapped: PIdOverlapped;
LLastError: Integer;
LVoid: DWORD;
begin
LFlags := 0;
LOverlapped := GetOverlapped(ABuffer, ASize);
case WSASend(SocketHandle, LOverlapped.Buffer, 1, LVoid, LFlags, LOverlapped
, nil) of
0: ; // Do nothing
SOCKET_ERROR: begin
LLastError := GWindowsStack.WSGetLastError;
if LLastError <> WSA_IO_PENDING then begin
GWindowsStack.RaiseSocketError(LLastError);
end;
end;
else Assert(False, 'Unknown result code received from WSARecv'); {do not localize}
end;
end;
{ TIdWorkOpUnitRead }
procedure TIdWorkOpUnitRead.Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
);
begin
SetLength(FBytes, AByteCount);
Processing(FBytes);
end;
procedure TIdWorkOpUnitRead.Read;
var
LBytesReceived: DWORD;
LFlags: DWORD;
LOverlapped: PIdOverlapped;
LLastError: Integer;
begin
LFlags := 0;
// Initialize byte array and pass it to overlapped
SetLength(FBytes, WOPageSize);
LOverlapped := GetOverlapped(@FBytes[0], Length(FBytes));
//TODO: What is this 997? Need to check for it? If changed, do in Write too
// GStack.CheckForSocketError( // can raise a 997
case WSARecv(SocketHandle, LOverlapped.Buffer, 1, LBytesReceived, LFlags
, LOverlapped, nil) of
// , [997] );
// Kudzu
// In this case it completed immediately. The MS docs are not clear, but
// testing shows that it still causes the completion port.
0: ; // Do nothing
SOCKET_ERROR: begin
LLastError := GWindowsStack.WSGetLastError;
// If its WSA_IO_PENDING this is normal and its been queued
if LLastError <> WSA_IO_PENDING then begin
GWindowsStack.RaiseSocketError(LLastError);
end;
end;
else Assert(False, 'Unknown result code received from WSARecv'); {do not localize}
end;
end;
procedure TIdWorkOpUnitRead.Starting;
begin
Read;
end;
end.

View File

@@ -0,0 +1,431 @@
{
$Project$
$Workfile$
$Revision$
$DateUTC$
$Id$
This file is part of the Indy (Internet Direct) project, and is offered
under the dual-licensing agreement described on the Indy website.
(http://www.indyproject.org/)
Copyright:
(c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
}
{
$Log$
}
{
Rev 1.4 6/11/2004 8:40:12 AM DSiders
Added "Do not Localize" comments.
Rev 1.3 2004.05.06 1:47:28 PM czhower
Now uses IndexOf
Rev 1.2 2004.04.22 11:45:18 PM czhower
Bug fixes
Rev 1.1 2004.02.09 9:16:58 PM czhower
Updated to compile and match lib changes.
Rev 1.0 2004.02.03 12:39:10 AM czhower
Move
Rev 1.14 2003.10.19 2:50:42 PM czhower
Fiber cleanup
Rev 1.13 2003.10.11 5:44:20 PM czhower
Chained servers now functional.
Rev 1.12 2003.07.17 4:42:08 PM czhower
More IOCP improvements.
Rev 1.11 2003.07.17 3:55:18 PM czhower
Removed IdIOChainEngineIOCP and merged it into TIdChaingEngine in
IdIOHandlerChain.pas.
Rev 1.7 2003.07.14 11:00:52 PM czhower
More IOCP fixes.
Rev 1.6 2003.07.14 12:54:34 AM czhower
Fixed graceful close detection if it occurs after connect.
Rev 1.5 7/7/2003 1:25:26 PM BGooijen
Added BytesSent property to TIdWorkOpUnitWriteFile
Rev 1.4 7/5/2003 11:47:14 PM BGooijen
Added TIdWorkOpUnitCheckForDisconnect and TIdWorkOpUnitWriteFile
Rev 1.3 3/27/2003 2:43:06 PM BGooijen
Added woWriteStream and woWriteBuffer
Rev 1.2 3/22/2003 09:45:30 PM JPMugaas
Now should compile under D4.
Rev 1.1 3/2/2003 12:36:26 AM BGooijen
Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
ReadBuffer doesn't use ReadStream any more.
TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
exceptions).
woReadLn doesn't check the intire buffer any more, but continued where it
stopped the last time.
Added basic support for timeouts (probably only on read operations, and maybe
connect), accuratie of timeout is currently 500msec.
Rev 1.0 2/27/2003 10:11:50 PM BGooijen
WorkOpUnits combined in one file
}
unit IdWorkOpUnits;
interface
uses
Classes,
IdWorkOpUnit, IdGlobal,
SysUtils;
type
TIdWorkOpUnitStreamBaseRead = class(TIdWorkOpUnitRead)
protected
FStream: TStream;
public
constructor Create(AStream: TStream); reintroduce; virtual;
end;
TIdWorkOpUnitStreamBaseWrite = class(TIdWorkOpUnitWrite)
protected
FFreeStream: Boolean;
FStream: TStream;
public
constructor Create(
AStream: TStream;
AFreeStream: Boolean = True
); reintroduce; virtual;
destructor Destroy; override;
end;
TIdWorkOpUnitWriteBuffer = class(TIdWorkOpUnitWrite)
protected
FBuffer: Pointer;
FFreeBuffer: Boolean;
FSize: Integer;
//
procedure Processing(ABytes: Integer); override;
procedure Starting; override;
public
constructor Create(ABuffer: Pointer; ASize: Integer;
AFreeBuffer: Boolean = True); reintroduce; virtual;
destructor Destroy; override;
end;
TIdWorkOpUnitWriteFile = class(TIdWorkOpUnitWrite)
protected
FFilename: String;
FBytesSent: Integer;
//
procedure Processing(ABytes: Integer); override;
procedure Starting; override;
public
constructor Create(AFileName: string); reintroduce;
end;
TIdWorkOpUnitWriteStream = class(TIdWorkOpUnitStreamBaseWrite)
protected
FCount: Integer;
FStartPos: Integer;
//
procedure Processing(ABytes: Integer); override;
procedure Starting; override;
public
constructor Create(AStream: TStream; AStartPos, ACount: Integer;
AFreeStream: Boolean); reintroduce; virtual;
end;
TIdWorkOpUnitWaitConnected = class(TIdWorkOpUnit)
protected
procedure Starting; override;
public
procedure Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
); override;
end;
TIdWorkOpUnitReadSized = class(TIdWorkOpUnitRead)
protected
FSize: Integer;
//
procedure Processing(
ABuffer: TIdBytes
); override;
public
constructor Create(ASize: Integer); reintroduce;
end;
TIdWorkOpUnitReadSizedStream = class(TIdWorkOpUnitStreamBaseRead)
protected
FSize: Integer;
//
procedure Processing(
ABuffer: TIdBytes
); override;
public
constructor Create(AStream: TStream; ASize: Integer);
reintroduce;
end;
TIdWorkOpUnitReadLn = class(TIdWorkOpUnitRead)
protected
FLastPos: Integer;
FMaxLength: Integer;
FTerminator: string;
//
procedure Processing(
ABuffer: TIdBytes
); override;
public
constructor Create(
ATerminator: string;
AMaxLength: Integer
); reintroduce;
end;
TIdWorkOpUnitReadUntilDisconnect = class(TIdWorkOpUnitStreamBaseRead)
protected
procedure Processing(
ABuffer: TIdBytes
); override;
end;
TIdWorkOpUnitReadAvailable = class(TIdWorkOpUnitRead)
protected
procedure Processing(
ABuffer: TIdBytes
); override;
end;
implementation
{ TIdWorkOpUnitWriteStream }
constructor TIdWorkOpUnitWriteStream.Create(AStream: TStream; AStartPos,ACount:integer; AFreeStream: Boolean);
begin
inherited Create(AStream, AFreeStream);
FStream.Position := AStartPos;
FCount := ACount;
end;
procedure TIdWorkOpUnitWriteStream.Processing(ABytes: Integer);
//TODO: This used to use pages from IdBuffer, which because of .Net do not exist
// anymore. We need to maybe keep a local persistent buffer instead then for
// storage reasons.
var
LBuffer: TIdBytes;
LSize: Integer;
begin
FCount := FCount - ABytes;
if FCount = 0 then begin
Complete;
end else begin
FStream.Position := ABytes;
//
//TODO: Dont hard code this value. Also find an optimal size for IOCP
LSize := Min(FCount, WOPageSize);
SetLength(LBuffer, LSize);
//
FStream.ReadBuffer(LBuffer[0], LSize);
Write(@LBuffer[0], LSize);
end;
end;
procedure TIdWorkOpUnitWriteStream.Starting;
begin
Processing(0);
end;
{ TIdWorkOpUnitWriteBuffer }
constructor TIdWorkOpUnitWriteBuffer.Create(ABuffer: pointer; ASize: integer; AFreeBuffer: Boolean = True);
begin
inherited Create;
FSize := ASize;
FBuffer := ABuffer;
FFreeBuffer := AFreeBuffer;
end;
destructor TIdWorkOpUnitWriteBuffer.Destroy;
begin
if FFreeBuffer then begin
FreeMem(FBuffer);
FBuffer := nil;
end;
inherited;
end;
procedure TIdWorkOpUnitWriteBuffer.Processing(ABytes: Integer);
begin
//TODO: Change the pointer to a type that points to bytes
FBuffer := Pointer(Cardinal(FBuffer) + Cardinal(ABytes));
FSize := FSize - ABytes;
if FSize = 0 then begin
Complete;
end else begin
//TODO: Reduce this down so it never sends more than a page
Write(FBuffer, Min(FSize, WOPageSize));
end;
end;
procedure TIdWorkOpUnitWriteBuffer.Starting;
begin
Processing(0);
end;
{ TIdWorkOpUnitWriteFile }
constructor TIdWorkOpUnitWriteFile.Create(AFileName:string);
begin
inherited Create;
FFilename := AFileName;
end;
procedure TIdWorkOpUnitWriteFile.Processing(ABytes: Integer);
begin
Assert(False, 'Need to implement WriteFile, also add to a bubble'); {do not localize}
end;
procedure TIdWorkOpUnitWriteFile.Starting;
begin
end;
{ TIdWorkOpUnitSizedStream }
constructor TIdWorkOpUnitReadSizedStream.Create(AStream: TStream; ASize:integer);
begin
inherited Create(AStream);
FSize := ASize;
end;
procedure TIdWorkOpUnitWaitConnected.Process(
AOverlapped: PIdOverlapped;
AByteCount: Integer
);
begin
end;
procedure TIdWorkOpUnitWaitConnected.Starting;
begin
end;
{ TIdWorkOpUnitReadLn }
constructor TIdWorkOpUnitReadLn.Create(
ATerminator: string;
AMaxLength: Integer);
begin
inherited Create;
FLastPos := 1;
FTerminator := ATerminator;
FMaxLength := AMaxLength;
end;
procedure TIdWorkOpUnitReadLn.Processing(
ABuffer: TIdBytes
);
begin
//TODO: ReadLn is very common. Need to optimize this class and maybe
// even pass pack the result directly so we dont search twice.
//Also allow for hinting from the user.
IOHandler.InputBuffer.Write(ABuffer);
if not IOHandler.Connected then begin
Complete;
end else if IOHandler.InputBuffer.IndexOf(FTerminator, FLastPos) = -1 then begin
Read;
end else begin
Complete;
end;
end;
procedure TIdWorkOpUnitReadUntilDisconnect.Processing(
ABuffer: TIdBytes
);
begin
// 0 is disconnected, so keep requesting til 0
if Length(ABuffer) = 0 then begin
Complete;
end else begin
FStream.WriteBuffer(ABuffer[0], Length(ABuffer));
Read;
end;
end;
{ TIdWorkOpUnitReadAvailable }
procedure TIdWorkOpUnitReadAvailable.Processing(
ABuffer: TIdBytes
);
begin
Complete;
end;
{ TIdWorkOpUnitReadSized }
constructor TIdWorkOpUnitReadSized.Create(ASize: Integer);
begin
inherited Create;
FSize := ASize;
end;
procedure TIdWorkOpUnitReadSized.Processing(
ABuffer: TIdBytes
);
begin
IOHandler.InputBuffer.Write(ABuffer);
FSize := FSize - Length(ABuffer);
if FSize = 0 then begin
Complete;
end else begin
Read;
end;
end;
{ TIdWorkOpUnitStreamBaseRead }
constructor TIdWorkOpUnitStreamBaseRead.Create(AStream: TStream);
begin
inherited Create;
FStream := AStream;
end;
{ TIdWorkOpUnitStreamBaseWrite }
constructor TIdWorkOpUnitStreamBaseWrite.Create(AStream: TStream;
AFreeStream: Boolean);
begin
inherited Create;
FStream := AStream;
FFreeStream := AFreeStream;
end;
destructor TIdWorkOpUnitStreamBaseWrite.Destroy;
begin
if FFreeStream then begin
FreeAndNil(FStream);
end;
inherited;
end;
procedure TIdWorkOpUnitReadSizedStream.Processing(
ABuffer: TIdBytes
);
begin
FStream.WriteBuffer(ABuffer[0], Length(ABuffer));
FSize := FSize - Length(ABuffer);
if FSize = 0 then begin
Complete;
end else begin
Read;
end;
end;
end.

View File

@@ -0,0 +1,39 @@
package IndySuperCore70;
{$R *.res}
{$BOOLEVAL OFF}
{$EXTENDEDSYNTAX ON}
{$IMPORTEDDATA ON}
{$LOCALSYMBOLS ON}
{$LONGSTRINGS ON}
{$OPENSTRINGS ON}
{$OPTIMIZATION ON}
{$REFERENCEINFO ON}
{$SAFEDIVIDE OFF}
{$STACKFRAMES OFF}
{$TYPEDADDRESS OFF}
{$VARSTRINGCHECKS ON}
{$WRITEABLECONST OFF}
{$MINENUMSIZE 1}
{$IMAGEBASE $400000}
{$DESCRIPTION 'Indy 10 SuperCore'}
{$RUNONLY}
{$IMPLICITBUILD ON}
requires
rtl,
IndySystem70,
IndyCore70;
contains
IdFiber in 'IdFiber.pas',
IdFiberWeaver in 'IdFiberWeaver.pas',
IdFiberWeaverInline in 'IdFiberWeaverInline.pas',
IdFiberWeaverThreaded in 'IdFiberWeaverThreaded.pas',
IdIOHandlerChain in 'IdIOHandlerChain.pas',
IdSchedulerOfFiber in 'IdSchedulerOfFiber.pas',
IdServerIOHandlerChain in 'IdServerIOHandlerChain.pas',
IdWorkOpUnit in 'IdWorkOpUnit.pas',
IdWorkOpUnits in 'IdWorkOpUnits.pas';
end.

Binary file not shown.

View File

@@ -0,0 +1,39 @@
package dclIndySuperCore70;
{$R *.res}
{$ALIGN 8}
{$ASSERTIONS ON}
{$BOOLEVAL OFF}
{$DEBUGINFO ON}
{$EXTENDEDSYNTAX ON}
{$IMPORTEDDATA ON}
{$IOCHECKS ON}
{$LOCALSYMBOLS ON}
{$LONGSTRINGS ON}
{$OPENSTRINGS ON}
{$OPTIMIZATION ON}
{$OVERFLOWCHECKS OFF}
{$RANGECHECKS OFF}
{$REFERENCEINFO ON}
{$SAFEDIVIDE OFF}
{$STACKFRAMES OFF}
{$TYPEDADDRESS OFF}
{$VARSTRINGCHECKS ON}
{$WRITEABLECONST ON}
{$MINENUMSIZE 1}
{$IMAGEBASE $600000}
{$DESCRIPTION 'Internet Direct (Indy) 10.00.0.17-B - Super Core'}
{$DESIGNONLY}
{$IMPLICITBUILD OFF}
requires
vcl,
designide,
dclIndyCore70,
IndyCore70,
IndySuperCore70;
contains
IdSuperCoreRegister in 'IdSuperCoreRegister.pas';
end.

Binary file not shown.