diff options
Diffstat (limited to 'packages/fcl-process/src/simpleipc.pp')
-rw-r--r-- | packages/fcl-process/src/simpleipc.pp | 386 |
1 files changed, 359 insertions, 27 deletions
diff --git a/packages/fcl-process/src/simpleipc.pp b/packages/fcl-process/src/simpleipc.pp index 5892762ad8..fd4bd8d4b8 100644 --- a/packages/fcl-process/src/simpleipc.pp +++ b/packages/fcl-process/src/simpleipc.pp @@ -20,11 +20,12 @@ unit simpleipc; interface uses - Classes, SysUtils; + Contnrs, Classes, SysUtils; Const MsgVersion = 1; - + DefaultThreadTimeOut = 50; + //Message types mtUnknown = 0; mtString = 1; @@ -33,7 +34,6 @@ type TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError); var - // Currently implemented only for Windows platform! DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone; DefaultIPCMessageQueueLimit: Integer = 0; @@ -49,6 +49,36 @@ Type TSimpleIPCServer = class; TSimpleIPCClient = class; + TIPCServerMsg = class + strict private + FStream: TStream; + FMsgType: TMessageType; + public + constructor Create; + destructor Destroy; override; + property Stream: TStream read FStream; + property MsgType: TMessageType read FMsgType write FMsgType; + end; + + TIPCServerMsgQueue = class + strict private + FList: TFPObjectList; + FMaxCount: Integer; + FMaxAction: TIPCMessageOverflowAction; + function GetCount: Integer; + procedure DeleteAndFree(Index: Integer); + function PrepareToPush: Boolean; + public + constructor Create; + destructor Destroy; override; + procedure Clear; + procedure Push(AItem: TIPCServerMsg); + function Pop: TIPCServerMsg; + property Count: Integer read GetCount; + property MaxCount: Integer read FMaxCount write FMaxCount; + property MaxAction: TIPCMessageOverflowAction read FMaxAction write FMaxAction; + end; + { TIPCServerComm } TIPCServerComm = Class(TObject) @@ -57,14 +87,16 @@ Type Protected Function GetInstanceID : String; virtual; abstract; Procedure DoError(const Msg : String; const Args : Array of const); - Procedure SetMsgType(AMsgType: TMessageType); - Function MsgData : TStream; + Procedure PushMessage(Const Hdr : TMsgHeader; AStream : TStream); + Procedure PushMessage(Msg : TIPCServerMsg); Public Constructor Create(AOwner : TSimpleIPCServer); virtual; Property Owner : TSimpleIPCServer read FOwner; Procedure StartServer; virtual; Abstract; Procedure StopServer;virtual; Abstract; + // May push messages on the queue Function PeekMessage(TimeOut : Integer) : Boolean;virtual; Abstract; + // Must put message on the queue. Procedure ReadMessage ;virtual; Abstract; Property InstanceID : String read GetInstanceID; end; @@ -93,24 +125,46 @@ Type { TSimpleIPCServer } + TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object; + TSimpleIPCServer = Class(TSimpleIPC) - private + protected + Private + FOnMessageError: TMessageQueueEvent; + FOnMessageQueued: TNotifyEvent; + FQueue : TIPCServerMsgQueue; FGlobal: Boolean; FOnMessage: TNotifyEvent; FMsgType: TMessageType; FMsgData : TStream; + FThreadTimeOut: Integer; + FThread : TThread; + FLock : TRTLCriticalSection; + FErrMsg : TIPCServerMsg; + procedure DoMessageQueued; + procedure DoMessageError; function GetInstanceID: String; + function GetMaxAction: TIPCMessageOverflowAction; + function GetMaxQueue: Integer; function GetStringMessage: String; procedure SetGlobal(const AValue: Boolean); + procedure SetMaxAction(AValue: TIPCMessageOverflowAction); + procedure SetMaxQueue(AValue: Integer); Protected FIPCComm: TIPCServerComm; + procedure StartThread; virtual; + procedure StopThread; virtual; Function CommClass : TIPCServerCommClass; virtual; + Procedure PushMessage(Msg : TIPCServerMsg); virtual; + function PopMessage: Boolean; virtual; Procedure Activate; override; Procedure Deactivate; override; + Property Queue : TIPCServerMsgQueue Read FQueue; + Property Thread : TThread Read FThread; Public Constructor Create(AOwner : TComponent); override; Destructor Destroy; override; - Procedure StartServer; + Procedure StartServer(Threaded : Boolean = False); Procedure StopServer; Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean; Procedure ReadMessage; @@ -120,8 +174,18 @@ Type Property MsgData : TStream Read FMsgData; Property InstanceID : String Read GetInstanceID; Published + Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut; Property Global : Boolean Read FGlobal Write SetGlobal; + // Called during ReadMessage Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage; + // Called when a message is pushed on the queue. + Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued; + // Called when the queue overflows and MaxAction = ipcmoaError. + Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError; + // Maximum number of messages to keep in the queue + property MaxQueue: Integer read GetMaxQueue write SetMaxQueue; + // What to do when the queue overflows + property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction; end; @@ -194,6 +258,103 @@ implementation {$i simpleipc.inc} +Resourcestring + SErrMessageQueueOverflow = 'Message queue overflow (limit %s)'; + +{ --------------------------------------------------------------------- + TIPCServerMsg + ---------------------------------------------------------------------} + + +constructor TIPCServerMsg.Create; +begin + FMsgType := 0; + FStream := TMemoryStream.Create; +end; + +destructor TIPCServerMsg.Destroy; +begin + FStream.Free; +end; + +{ --------------------------------------------------------------------- + TIPCServerMsgQueue + ---------------------------------------------------------------------} + +constructor TIPCServerMsgQueue.Create; +begin + FMaxCount := DefaultIPCMessageQueueLimit; + FMaxAction := DefaultIPCMessageOverflowAction; + FList := TFPObjectList.Create(False); // FreeObjects = False! +end; + +destructor TIPCServerMsgQueue.Destroy; +begin + Clear; + FList.Free; +end; + +procedure TIPCServerMsgQueue.Clear; +begin + while FList.Count > 0 do + DeleteAndFree(FList.Count - 1); +end; + +procedure TIPCServerMsgQueue.DeleteAndFree(Index: Integer); +begin + FList[Index].Free; // Free objects manually! + FList.Delete(Index); +end; + +function TIPCServerMsgQueue.GetCount: Integer; +begin + Result := FList.Count; +end; + +function TIPCServerMsgQueue.PrepareToPush: Boolean; +begin + Result := True; + case FMaxAction of + ipcmoaDiscardOld: + begin + while (FList.Count >= FMaxCount) do + DeleteAndFree(FList.Count - 1); + end; + ipcmoaDiscardNew: + begin + Result := (FList.Count < FMaxCount); + end; + ipcmoaError: + begin + if (FList.Count >= FMaxCount) then + // Caller is expected to catch this exception, so not using Owner.DoError() + raise EIPCError.CreateFmt(SErrMessageQueueOverflow, [IntToStr(FMaxCount)]); + end; + end; +end; + +procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg); +begin + if PrepareToPush then + FList.Insert(0, AItem); +end; + +function TIPCServerMsgQueue.Pop: TIPCServerMsg; +var + Index: Integer; +begin + Index := FList.Count - 1; + if Index >= 0 then + begin + // Caller is responsible for freeing the object. + Result := TIPCServerMsg(FList[Index]); + FList.Delete(Index); + end + else + Result := nil; +end; + + { --------------------------------------------------------------------- TIPCServerComm ---------------------------------------------------------------------} @@ -203,22 +364,33 @@ begin FOwner:=AOWner; end; -Procedure TIPCServerComm.DoError(const Msg : String; const Args : Array of const); +procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const); begin FOwner.DoError(Msg,Args); -end; +end; -Function TIPCServerComm.MsgData : TStream; +procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream); + +Var + M : TIPCServerMsg; begin - Result:=FOwner.FMsgData; + M:=TIPCServerMsg.Create; + try + M.MsgType:=Hdr.MsgType; + if Hdr.MsgLen>0 then + M.Stream.CopyFrom(AStream,Hdr.MsgLen); + except + M.Free; + Raise; + end; + PushMessage(M); end; -Procedure TIPCServerComm.SetMsgType(AMsgType: TMessageType); - +procedure TIPCServerComm.PushMessage(Msg: TIPCServerMsg); begin - Fowner.FMsgType:=AMsgType; + FOwner.PushMessage(Msg); end; { --------------------------------------------------------------------- @@ -314,11 +486,14 @@ begin FActive:=False; FBusy:=False; FMsgData:=TStringStream.Create(''); + FQueue:=TIPCServerMsgQueue.Create; + FThreadTimeOut:=DefaultThreadTimeOut; end; destructor TSimpleIPCServer.Destroy; begin Active:=False; + FreeAndNil(FQueue); FreeAndNil(FMsgData); inherited Destroy; end; @@ -332,11 +507,31 @@ begin end; end; +procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction); +begin + FQueue.MaxAction:=AValue; +end; + +procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer); +begin + FQueue.MaxCount:=AValue; +end; + function TSimpleIPCServer.GetInstanceID: String; begin Result:=FIPCComm.InstanceID; end; +function TSimpleIPCServer.GetMaxAction: TIPCMessageOverflowAction; +begin + Result:=FQueue.MaxAction; +end; + +function TSimpleIPCServer.GetMaxQueue: Integer; +begin + Result:=FQueue.MaxCount; +end; + function TSimpleIPCServer.GetStringMessage: String; begin @@ -344,7 +539,7 @@ begin end; -procedure TSimpleIPCServer.StartServer; +procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False); begin if Not Assigned(FIPCComm) then begin @@ -354,47 +549,135 @@ begin FIPCComm.StartServer; end; FActive:=True; + If Threaded then + StartThread; +end; + +Type + + { TServerThread } + + TServerThread = Class(TThread) + private + FServer: TSimpleIPCServer; + FThreadTimeout: Integer; + Public + Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer); + procedure Execute; override; + Property Server : TSimpleIPCServer Read FServer; + Property ThreadTimeout : Integer Read FThreadTimeout; + end; + +{ TServerThread } + +constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer); +begin + FServer:=AServer; + FThreadTimeout:=ATimeOut; + Inherited Create(False); +end; + +procedure TServerThread.Execute; +begin + While Not Terminated do + FServer.PeekMessage(ThreadTimeout,False); +end; + +procedure TSimpleIPCServer.StartThread; + +begin + InitCriticalSection(FLock); + FThread:=TServerThread.Create(Self,ThreadTimeOut); +end; + +procedure TSimpleIPCServer.StopThread; + +begin + if Assigned(FThread) then + begin + FThread.Terminate; + FThread.WaitFor; + FreeAndNil(FThread); + DoneCriticalSection(FLock); + end; end; procedure TSimpleIPCServer.StopServer; begin + StopThread; If Assigned(FIPCComm) then begin FIPCComm.StopServer; FreeAndNil(FIPCComm); end; + FQueue.Clear; FActive:=False; end; // TimeOut values: -// > 0 -- number of milliseconds to wait +// > 0 -- Number of milliseconds to wait // = 0 -- return immediately // = -1 -- wait infinitely // < -1 -- wait infinitely (force to -1) function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean; begin CheckActive; - if TimeOut < -1 then - TimeOut := -1; - FBusy:=True; - Try - Result:=FIPCComm.PeekMessage(Timeout); - Finally - FBusy:=False; - end; + Result:=Queue.Count>0; + If Not Result then + begin + if TimeOut < -1 then + TimeOut := -1; + FBusy:=True; + Try + Result:=FIPCComm.PeekMessage(Timeout); + Finally + FBusy:=False; + end; + end; If Result then If DoReadMessage then Readmessage; end; +function TSimpleIPCServer.PopMessage: Boolean; + +var + MsgItem: TIPCServerMsg; + DoLock : Boolean; + +begin + DoLock:=Assigned(FThread); + if DoLock then + EnterCriticalsection(Flock); + try + MsgItem:=FQueue.Pop; + finally + LeaveCriticalsection(FLock); + end; + Result:=Assigned(MsgItem); + if Result then + try + FMsgType := MsgItem.MsgType; + MsgItem.Stream.Position := 0; + FMsgData.Size := 0; + FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size); + finally + MsgItem.Free; + end; +end; + procedure TSimpleIPCServer.ReadMessage; + begin CheckActive; FBusy:=True; Try - FIPCComm.ReadMessage; - If Assigned(FOnMessage) then - FOnMessage(Self); + if (FQueue.Count=0) then + // Readmessage pushes a message to the queue + FIPCComm.ReadMessage; + if PopMessage then + If Assigned(FOnMessage) then + FOnMessage(Self); Finally FBusy:=False; end; @@ -416,6 +699,55 @@ begin end; +procedure TSimpleIPCServer.DoMessageQueued; + +begin + if Assigned(FOnMessageQueued) then + FOnMessageQueued(Self); +end; + +procedure TSimpleIPCServer.DoMessageError; +begin + try + if Assigned(FOnMessageQueued) then + FOnMessageError(Self,FErrMsg); + finally + FreeAndNil(FErrMsg) + end; +end; + +procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg); + +Var + DoLock : Boolean; + +begin + try + DoLock:=Assigned(FThread); + If DoLock then + EnterCriticalsection(FLock); + try + Queue.Push(Msg); + finally + If DoLock then + LeaveCriticalsection(FLock); + end; + if DoLock then + TThread.Synchronize(FThread,@DoMessageQueued) + else + DoMessageQueued; + except + On E : Exception do + FErrMsg:=Msg; + end; + if Assigned(FErrMsg) then + if DoLock then + TThread.Synchronize(FThread,@DoMessageError) + else + DoMessageQueued; + +end; + { --------------------------------------------------------------------- |