summaryrefslogtreecommitdiff
path: root/packages/fcl-process/src/simpleipc.pp
diff options
context:
space:
mode:
Diffstat (limited to 'packages/fcl-process/src/simpleipc.pp')
-rw-r--r--packages/fcl-process/src/simpleipc.pp386
1 files changed, 359 insertions, 27 deletions
diff --git a/packages/fcl-process/src/simpleipc.pp b/packages/fcl-process/src/simpleipc.pp
index 5892762ad8..fd4bd8d4b8 100644
--- a/packages/fcl-process/src/simpleipc.pp
+++ b/packages/fcl-process/src/simpleipc.pp
@@ -20,11 +20,12 @@ unit simpleipc;
interface
uses
- Classes, SysUtils;
+ Contnrs, Classes, SysUtils;
Const
MsgVersion = 1;
-
+ DefaultThreadTimeOut = 50;
+
//Message types
mtUnknown = 0;
mtString = 1;
@@ -33,7 +34,6 @@ type
TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
var
- // Currently implemented only for Windows platform!
DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
DefaultIPCMessageQueueLimit: Integer = 0;
@@ -49,6 +49,36 @@ Type
TSimpleIPCServer = class;
TSimpleIPCClient = class;
+ TIPCServerMsg = class
+ strict private
+ FStream: TStream;
+ FMsgType: TMessageType;
+ public
+ constructor Create;
+ destructor Destroy; override;
+ property Stream: TStream read FStream;
+ property MsgType: TMessageType read FMsgType write FMsgType;
+ end;
+
+ TIPCServerMsgQueue = class
+ strict private
+ FList: TFPObjectList;
+ FMaxCount: Integer;
+ FMaxAction: TIPCMessageOverflowAction;
+ function GetCount: Integer;
+ procedure DeleteAndFree(Index: Integer);
+ function PrepareToPush: Boolean;
+ public
+ constructor Create;
+ destructor Destroy; override;
+ procedure Clear;
+ procedure Push(AItem: TIPCServerMsg);
+ function Pop: TIPCServerMsg;
+ property Count: Integer read GetCount;
+ property MaxCount: Integer read FMaxCount write FMaxCount;
+ property MaxAction: TIPCMessageOverflowAction read FMaxAction write FMaxAction;
+ end;
+
{ TIPCServerComm }
TIPCServerComm = Class(TObject)
@@ -57,14 +87,16 @@ Type
Protected
Function GetInstanceID : String; virtual; abstract;
Procedure DoError(const Msg : String; const Args : Array of const);
- Procedure SetMsgType(AMsgType: TMessageType);
- Function MsgData : TStream;
+ Procedure PushMessage(Const Hdr : TMsgHeader; AStream : TStream);
+ Procedure PushMessage(Msg : TIPCServerMsg);
Public
Constructor Create(AOwner : TSimpleIPCServer); virtual;
Property Owner : TSimpleIPCServer read FOwner;
Procedure StartServer; virtual; Abstract;
Procedure StopServer;virtual; Abstract;
+ // May push messages on the queue
Function PeekMessage(TimeOut : Integer) : Boolean;virtual; Abstract;
+ // Must put message on the queue.
Procedure ReadMessage ;virtual; Abstract;
Property InstanceID : String read GetInstanceID;
end;
@@ -93,24 +125,46 @@ Type
{ TSimpleIPCServer }
+ TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
+
TSimpleIPCServer = Class(TSimpleIPC)
- private
+ protected
+ Private
+ FOnMessageError: TMessageQueueEvent;
+ FOnMessageQueued: TNotifyEvent;
+ FQueue : TIPCServerMsgQueue;
FGlobal: Boolean;
FOnMessage: TNotifyEvent;
FMsgType: TMessageType;
FMsgData : TStream;
+ FThreadTimeOut: Integer;
+ FThread : TThread;
+ FLock : TRTLCriticalSection;
+ FErrMsg : TIPCServerMsg;
+ procedure DoMessageQueued;
+ procedure DoMessageError;
function GetInstanceID: String;
+ function GetMaxAction: TIPCMessageOverflowAction;
+ function GetMaxQueue: Integer;
function GetStringMessage: String;
procedure SetGlobal(const AValue: Boolean);
+ procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
+ procedure SetMaxQueue(AValue: Integer);
Protected
FIPCComm: TIPCServerComm;
+ procedure StartThread; virtual;
+ procedure StopThread; virtual;
Function CommClass : TIPCServerCommClass; virtual;
+ Procedure PushMessage(Msg : TIPCServerMsg); virtual;
+ function PopMessage: Boolean; virtual;
Procedure Activate; override;
Procedure Deactivate; override;
+ Property Queue : TIPCServerMsgQueue Read FQueue;
+ Property Thread : TThread Read FThread;
Public
Constructor Create(AOwner : TComponent); override;
Destructor Destroy; override;
- Procedure StartServer;
+ Procedure StartServer(Threaded : Boolean = False);
Procedure StopServer;
Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
Procedure ReadMessage;
@@ -120,8 +174,18 @@ Type
Property MsgData : TStream Read FMsgData;
Property InstanceID : String Read GetInstanceID;
Published
+ Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
Property Global : Boolean Read FGlobal Write SetGlobal;
+ // Called during ReadMessage
Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
+ // Called when a message is pushed on the queue.
+ Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
+ // Called when the queue overflows and MaxAction = ipcmoaError.
+ Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
+ // Maximum number of messages to keep in the queue
+ property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
+ // What to do when the queue overflows
+ property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
end;
@@ -194,6 +258,103 @@ implementation
{$i simpleipc.inc}
+Resourcestring
+ SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
+
+{ ---------------------------------------------------------------------
+ TIPCServerMsg
+ ---------------------------------------------------------------------}
+
+
+constructor TIPCServerMsg.Create;
+begin
+ FMsgType := 0;
+ FStream := TMemoryStream.Create;
+end;
+
+destructor TIPCServerMsg.Destroy;
+begin
+ FStream.Free;
+end;
+
+{ ---------------------------------------------------------------------
+ TIPCServerMsgQueue
+ ---------------------------------------------------------------------}
+
+constructor TIPCServerMsgQueue.Create;
+begin
+ FMaxCount := DefaultIPCMessageQueueLimit;
+ FMaxAction := DefaultIPCMessageOverflowAction;
+ FList := TFPObjectList.Create(False); // FreeObjects = False!
+end;
+
+destructor TIPCServerMsgQueue.Destroy;
+begin
+ Clear;
+ FList.Free;
+end;
+
+procedure TIPCServerMsgQueue.Clear;
+begin
+ while FList.Count > 0 do
+ DeleteAndFree(FList.Count - 1);
+end;
+
+procedure TIPCServerMsgQueue.DeleteAndFree(Index: Integer);
+begin
+ FList[Index].Free; // Free objects manually!
+ FList.Delete(Index);
+end;
+
+function TIPCServerMsgQueue.GetCount: Integer;
+begin
+ Result := FList.Count;
+end;
+
+function TIPCServerMsgQueue.PrepareToPush: Boolean;
+begin
+ Result := True;
+ case FMaxAction of
+ ipcmoaDiscardOld:
+ begin
+ while (FList.Count >= FMaxCount) do
+ DeleteAndFree(FList.Count - 1);
+ end;
+ ipcmoaDiscardNew:
+ begin
+ Result := (FList.Count < FMaxCount);
+ end;
+ ipcmoaError:
+ begin
+ if (FList.Count >= FMaxCount) then
+ // Caller is expected to catch this exception, so not using Owner.DoError()
+ raise EIPCError.CreateFmt(SErrMessageQueueOverflow, [IntToStr(FMaxCount)]);
+ end;
+ end;
+end;
+
+procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg);
+begin
+ if PrepareToPush then
+ FList.Insert(0, AItem);
+end;
+
+function TIPCServerMsgQueue.Pop: TIPCServerMsg;
+var
+ Index: Integer;
+begin
+ Index := FList.Count - 1;
+ if Index >= 0 then
+ begin
+ // Caller is responsible for freeing the object.
+ Result := TIPCServerMsg(FList[Index]);
+ FList.Delete(Index);
+ end
+ else
+ Result := nil;
+end;
+
+
{ ---------------------------------------------------------------------
TIPCServerComm
---------------------------------------------------------------------}
@@ -203,22 +364,33 @@ begin
FOwner:=AOWner;
end;
-Procedure TIPCServerComm.DoError(const Msg : String; const Args : Array of const);
+procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const);
begin
FOwner.DoError(Msg,Args);
-end;
+end;
-Function TIPCServerComm.MsgData : TStream;
+procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream);
+
+Var
+ M : TIPCServerMsg;
begin
- Result:=FOwner.FMsgData;
+ M:=TIPCServerMsg.Create;
+ try
+ M.MsgType:=Hdr.MsgType;
+ if Hdr.MsgLen>0 then
+ M.Stream.CopyFrom(AStream,Hdr.MsgLen);
+ except
+ M.Free;
+ Raise;
+ end;
+ PushMessage(M);
end;
-Procedure TIPCServerComm.SetMsgType(AMsgType: TMessageType);
-
+procedure TIPCServerComm.PushMessage(Msg: TIPCServerMsg);
begin
- Fowner.FMsgType:=AMsgType;
+ FOwner.PushMessage(Msg);
end;
{ ---------------------------------------------------------------------
@@ -314,11 +486,14 @@ begin
FActive:=False;
FBusy:=False;
FMsgData:=TStringStream.Create('');
+ FQueue:=TIPCServerMsgQueue.Create;
+ FThreadTimeOut:=DefaultThreadTimeOut;
end;
destructor TSimpleIPCServer.Destroy;
begin
Active:=False;
+ FreeAndNil(FQueue);
FreeAndNil(FMsgData);
inherited Destroy;
end;
@@ -332,11 +507,31 @@ begin
end;
end;
+procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction);
+begin
+ FQueue.MaxAction:=AValue;
+end;
+
+procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
+begin
+ FQueue.MaxCount:=AValue;
+end;
+
function TSimpleIPCServer.GetInstanceID: String;
begin
Result:=FIPCComm.InstanceID;
end;
+function TSimpleIPCServer.GetMaxAction: TIPCMessageOverflowAction;
+begin
+ Result:=FQueue.MaxAction;
+end;
+
+function TSimpleIPCServer.GetMaxQueue: Integer;
+begin
+ Result:=FQueue.MaxCount;
+end;
+
function TSimpleIPCServer.GetStringMessage: String;
begin
@@ -344,7 +539,7 @@ begin
end;
-procedure TSimpleIPCServer.StartServer;
+procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
begin
if Not Assigned(FIPCComm) then
begin
@@ -354,47 +549,135 @@ begin
FIPCComm.StartServer;
end;
FActive:=True;
+ If Threaded then
+ StartThread;
+end;
+
+Type
+
+ { TServerThread }
+
+ TServerThread = Class(TThread)
+ private
+ FServer: TSimpleIPCServer;
+ FThreadTimeout: Integer;
+ Public
+ Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
+ procedure Execute; override;
+ Property Server : TSimpleIPCServer Read FServer;
+ Property ThreadTimeout : Integer Read FThreadTimeout;
+ end;
+
+{ TServerThread }
+
+constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
+begin
+ FServer:=AServer;
+ FThreadTimeout:=ATimeOut;
+ Inherited Create(False);
+end;
+
+procedure TServerThread.Execute;
+begin
+ While Not Terminated do
+ FServer.PeekMessage(ThreadTimeout,False);
+end;
+
+procedure TSimpleIPCServer.StartThread;
+
+begin
+ InitCriticalSection(FLock);
+ FThread:=TServerThread.Create(Self,ThreadTimeOut);
+end;
+
+procedure TSimpleIPCServer.StopThread;
+
+begin
+ if Assigned(FThread) then
+ begin
+ FThread.Terminate;
+ FThread.WaitFor;
+ FreeAndNil(FThread);
+ DoneCriticalSection(FLock);
+ end;
end;
procedure TSimpleIPCServer.StopServer;
begin
+ StopThread;
If Assigned(FIPCComm) then
begin
FIPCComm.StopServer;
FreeAndNil(FIPCComm);
end;
+ FQueue.Clear;
FActive:=False;
end;
// TimeOut values:
-// > 0 -- number of milliseconds to wait
+// > 0 -- Number of milliseconds to wait
// = 0 -- return immediately
// = -1 -- wait infinitely
// < -1 -- wait infinitely (force to -1)
function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
begin
CheckActive;
- if TimeOut < -1 then
- TimeOut := -1;
- FBusy:=True;
- Try
- Result:=FIPCComm.PeekMessage(Timeout);
- Finally
- FBusy:=False;
- end;
+ Result:=Queue.Count>0;
+ If Not Result then
+ begin
+ if TimeOut < -1 then
+ TimeOut := -1;
+ FBusy:=True;
+ Try
+ Result:=FIPCComm.PeekMessage(Timeout);
+ Finally
+ FBusy:=False;
+ end;
+ end;
If Result then
If DoReadMessage then
Readmessage;
end;
+function TSimpleIPCServer.PopMessage: Boolean;
+
+var
+ MsgItem: TIPCServerMsg;
+ DoLock : Boolean;
+
+begin
+ DoLock:=Assigned(FThread);
+ if DoLock then
+ EnterCriticalsection(Flock);
+ try
+ MsgItem:=FQueue.Pop;
+ finally
+ LeaveCriticalsection(FLock);
+ end;
+ Result:=Assigned(MsgItem);
+ if Result then
+ try
+ FMsgType := MsgItem.MsgType;
+ MsgItem.Stream.Position := 0;
+ FMsgData.Size := 0;
+ FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
+ finally
+ MsgItem.Free;
+ end;
+end;
+
procedure TSimpleIPCServer.ReadMessage;
+
begin
CheckActive;
FBusy:=True;
Try
- FIPCComm.ReadMessage;
- If Assigned(FOnMessage) then
- FOnMessage(Self);
+ if (FQueue.Count=0) then
+ // Readmessage pushes a message to the queue
+ FIPCComm.ReadMessage;
+ if PopMessage then
+ If Assigned(FOnMessage) then
+ FOnMessage(Self);
Finally
FBusy:=False;
end;
@@ -416,6 +699,55 @@ begin
end;
+procedure TSimpleIPCServer.DoMessageQueued;
+
+begin
+ if Assigned(FOnMessageQueued) then
+ FOnMessageQueued(Self);
+end;
+
+procedure TSimpleIPCServer.DoMessageError;
+begin
+ try
+ if Assigned(FOnMessageQueued) then
+ FOnMessageError(Self,FErrMsg);
+ finally
+ FreeAndNil(FErrMsg)
+ end;
+end;
+
+procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
+
+Var
+ DoLock : Boolean;
+
+begin
+ try
+ DoLock:=Assigned(FThread);
+ If DoLock then
+ EnterCriticalsection(FLock);
+ try
+ Queue.Push(Msg);
+ finally
+ If DoLock then
+ LeaveCriticalsection(FLock);
+ end;
+ if DoLock then
+ TThread.Synchronize(FThread,@DoMessageQueued)
+ else
+ DoMessageQueued;
+ except
+ On E : Exception do
+ FErrMsg:=Msg;
+ end;
+ if Assigned(FErrMsg) then
+ if DoLock then
+ TThread.Synchronize(FThread,@DoMessageError)
+ else
+ DoMessageQueued;
+
+end;
+
{ ---------------------------------------------------------------------