diff options
author | Jens Geyer <jensg@apache.org> | 2019-11-09 23:24:52 +0100 |
---|---|---|
committer | Jens Geyer <jensg@apache.org> | 2019-11-28 00:52:47 +0100 |
commit | a019cda66ceccdf9068c9f5e2c27f890c450f9ec (patch) | |
tree | 5cfe30f4f28ba09ec047903c0e2996b962888c50 /lib/delphi | |
parent | c140bb90b0c2ed6175afe8c9c579acd8ea9c7d0b (diff) | |
download | thrift-a019cda66ceccdf9068c9f5e2c27f890c450f9ec.tar.gz |
THRIFT-5012 Centralize configuration aspects into a commonly used configuration object [ci skip]
Client: Delphi
Patch: Jens Geyer
This closes #1955
Diffstat (limited to 'lib/delphi')
26 files changed, 721 insertions, 725 deletions
diff --git a/lib/delphi/src/Thrift.Configuration.pas b/lib/delphi/src/Thrift.Configuration.pas new file mode 100644 index 000000000..0cb11af35 --- /dev/null +++ b/lib/delphi/src/Thrift.Configuration.pas @@ -0,0 +1,121 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Thrift.Configuration; + +interface + +uses + SysUtils, Generics.Collections, Generics.Defaults; + +const + DEFAULT_RECURSION_LIMIT = 64; + DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB + DEFAULT_MAX_FRAME_SIZE = 16384000; // this value is used consistently across all Thrift libraries + + DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms + +type + IThriftConfiguration = interface + ['{ADD75449-1A67-4B78-9B75-502A1E338CFC}'] + function GetRecursionLimit : Cardinal; + procedure SetRecursionLimit( const value : Cardinal); + function GetMaxFrameSize : Cardinal; + procedure SetMaxFrameSize( const value : Cardinal); + function GetMaxMessageSize : Cardinal; + procedure SetMaxMessageSize( const value : Cardinal); + + property RecursionLimit : Cardinal read GetRecursionLimit write SetRecursionLimit; + property MaxFrameSize : Cardinal read GetMaxFrameSize write SetMaxFrameSize; + property MaxMessageSize : Cardinal read GetMaxMessageSize write SetMaxMessageSize; + end; + + + TThriftConfigurationImpl = class( TInterfacedObject, IThriftConfiguration) + strict protected + FRecursionLimit : Cardinal; + FMaxFrameSize : Cardinal; + FMaxMessageSize : Cardinal; + + // IThriftConfiguration + function GetRecursionLimit : Cardinal; + procedure SetRecursionLimit( const value : Cardinal); + function GetMaxFrameSize : Cardinal; + procedure SetMaxFrameSize( const value : Cardinal); + function GetMaxMessageSize : Cardinal; + procedure SetMaxMessageSize( const value : Cardinal); + + public + constructor Create; + end; + + +implementation + + +{ TThriftConfigurationImpl } + + +constructor TThriftConfigurationImpl.Create; +begin + inherited Create; + + FRecursionLimit := DEFAULT_RECURSION_LIMIT; + FMaxFrameSize := DEFAULT_MAX_FRAME_SIZE; + FMaxMessageSize := DEFAULT_MAX_MESSAGE_SIZE; +end; + + +function TThriftConfigurationImpl.GetRecursionLimit: Cardinal; +begin + result := FRecursionLimit; +end; + + +procedure TThriftConfigurationImpl.SetRecursionLimit(const value: Cardinal); +begin + FRecursionLimit := value; +end; + + +function TThriftConfigurationImpl.GetMaxFrameSize: Cardinal; +begin + result := FMaxFrameSize; +end; + + +procedure TThriftConfigurationImpl.SetMaxFrameSize(const value: Cardinal); +begin + FMaxFrameSize := value; +end; + + +function TThriftConfigurationImpl.GetMaxMessageSize: Cardinal; +begin + result := FMaxMessageSize; +end; + + +procedure TThriftConfigurationImpl.SetMaxMessageSize(const value: Cardinal); +begin + FMaxMessageSize := value; +end; + + +end. diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas index 109e66064..665cfc4ba 100644 --- a/lib/delphi/src/Thrift.Protocol.Compact.pas +++ b/lib/delphi/src/Thrift.Protocol.Compact.pas @@ -28,6 +28,7 @@ uses SysUtils, Math, Generics.Collections, + Thrift.Configuration, Thrift.Transport, Thrift.Protocol, Thrift.Utils; @@ -268,7 +269,7 @@ end; //--- TCompactProtocolImpl ------------------------------------------------- -constructor TCompactProtocolImpl.Create(const trans: ITransport); +constructor TCompactProtocolImpl.Create( const trans : ITransport); begin inherited Create( trans); diff --git a/lib/delphi/src/Thrift.Protocol.JSON.pas b/lib/delphi/src/Thrift.Protocol.JSON.pas index e72a81dcf..61cad8b62 100644 --- a/lib/delphi/src/Thrift.Protocol.JSON.pas +++ b/lib/delphi/src/Thrift.Protocol.JSON.pas @@ -29,6 +29,7 @@ uses SysUtils, Math, Generics.Collections, + Thrift.Configuration, Thrift.Transport, Thrift.Protocol, Thrift.Utils; @@ -298,7 +299,7 @@ const function TJSONProtocolImpl.TFactory.GetProtocol( const trans: ITransport): IProtocol; begin - result := TJSONProtocolImpl.Create(trans); + result := TJSONProtocolImpl.Create( trans); end; class function TJSONProtocolImpl.GetTypeNameForTypeID(typeID : TType) : string; diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas index 94e6e1863..d5a758797 100644 --- a/lib/delphi/src/Thrift.Protocol.pas +++ b/lib/delphi/src/Thrift.Protocol.pas @@ -31,6 +31,7 @@ uses Thrift.Stream, Thrift.Utils, Thrift.Collections, + Thrift.Configuration, Thrift.Transport; type @@ -67,9 +68,6 @@ const VALID_MESSAGETYPES = [Low(TMessageType)..High(TMessageType)]; -const - DEFAULT_RECURSION_LIMIT = 64; - type IProtocol = interface; @@ -196,7 +194,7 @@ type end; IProtocol = interface - ['{7F3640D7-5082-49E7-B562-84202F323C3A}'] + ['{F0040D99-937F-400D-9932-AF04F665899F}'] function GetTransport: ITransport; procedure WriteMessageBegin( const msg: TThriftMessage); procedure WriteMessageEnd; @@ -243,15 +241,13 @@ type function ReadString: string; function ReadAnsiString: AnsiString; - procedure SetRecursionLimit( value : Integer); - function GetRecursionLimit : Integer; function NextRecursionLevel : IProtocolRecursionTracker; procedure IncrementRecursionDepth; procedure DecrementRecursionDepth; function GetMinSerializedSize( const aType : TType) : Integer; property Transport: ITransport read GetTransport; - property RecursionLimit : Integer read GetRecursionLimit write SetRecursionLimit; + function Configuration : IThriftConfiguration; end; TProtocolImpl = class abstract( TInterfacedObject, IProtocol) @@ -260,8 +256,6 @@ type FRecursionLimit : Integer; FRecursionDepth : Integer; - procedure SetRecursionLimit( value : Integer); - function GetRecursionLimit : Integer; function NextRecursionLevel : IProtocolRecursionTracker; procedure IncrementRecursionDepth; procedure DecrementRecursionDepth; @@ -272,8 +266,9 @@ type procedure CheckReadBytesAvailable( const value : TThriftMap); overload; inline; procedure Reset; virtual; - function GetTransport: ITransport; - public + function GetTransport: ITransport; + function Configuration : IThriftConfiguration; + procedure WriteMessageBegin( const msg: TThriftMessage); virtual; abstract; procedure WriteMessageEnd; virtual; abstract; procedure WriteStructBegin( const struc: TThriftStruct); virtual; abstract; @@ -319,9 +314,10 @@ type function ReadString: string; virtual; function ReadAnsiString: AnsiString; virtual; - property Transport: ITransport read GetTransport; + property Transport: ITransport read GetTransport; - constructor Create( trans: ITransport ); + public + constructor Create( const aTransport : ITransport); end; IBase = interface( ISupportsToString) @@ -554,24 +550,14 @@ end; { TProtocolImpl } -constructor TProtocolImpl.Create(trans: ITransport); +constructor TProtocolImpl.Create( const aTransport : ITransport); begin inherited Create; - FTrans := trans; - FRecursionLimit := DEFAULT_RECURSION_LIMIT; + FTrans := aTransport; + FRecursionLimit := aTransport.Configuration.RecursionLimit; FRecursionDepth := 0; end; -procedure TProtocolImpl.SetRecursionLimit( value : Integer); -begin - FRecursionLimit := value; -end; - -function TProtocolImpl.GetRecursionLimit : Integer; -begin - result := FRecursionLimit; -end; - function TProtocolImpl.NextRecursionLevel : IProtocolRecursionTracker; begin result := TProtocolRecursionTrackerImpl.Create(Self); @@ -594,10 +580,14 @@ begin Result := FTrans; end; +function TProtocolImpl.Configuration : IThriftConfiguration; +begin + Result := FTrans.Configuration; +end; + procedure TProtocolImpl.Reset; begin - if FTrans.TransportControl <> nil - then FTrans.TransportControl.ResetConsumedMessageSize; + FTrans.ResetConsumedMessageSize; end; function TProtocolImpl.ReadAnsiString: AnsiString; @@ -654,15 +644,12 @@ end; procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftMap); -var nPairSize : Integer -; +var nPairSize : Integer; begin nPairSize := GetMinSerializedSize(value.KeyType) + GetMinSerializedSize(value.ValueType); FTrans.CheckReadBytesAvailable( value.Count * nPairSize); end; - - { TProtocolUtil } class procedure TProtocolUtil.Skip( prot: IProtocol; type_: TType); @@ -1486,7 +1473,5 @@ end; - - end. diff --git a/lib/delphi/src/Thrift.Serializer.pas b/lib/delphi/src/Thrift.Serializer.pas index 1cbcbec5f..cb62603db 100644 --- a/lib/delphi/src/Thrift.Serializer.pas +++ b/lib/delphi/src/Thrift.Serializer.pas @@ -28,6 +28,7 @@ uses {$ELSE} System.Classes, Winapi.Windows, System.SysUtils, {$ENDIF} + Thrift.Configuration, Thrift.Protocol, Thrift.Transport, Thrift.Stream; @@ -42,16 +43,9 @@ type FProtocol : IProtocol; public - // Create a new TSerializer that uses the TBinaryProtocol by default. - constructor Create; overload; - - // Create a new TSerializer. - // It will use the TProtocol specified by the factory that is passed in. - constructor Create( const factory : IProtocolFactory); overload; - - // Create a new TSerializer. - // It will use the TProtocol and layered transports specified by the factories that are passed in. - constructor Create( const protfact : IProtocolFactory; const transfact : ITransportFactory); overload; + constructor Create( const aProtFact : IProtocolFactory = nil; // defaults to TBinaryProtocol + const aTransFact : ITransportFactory = nil; + const aConfig : IThriftConfiguration = nil); // DTOR destructor Destroy; override; @@ -70,19 +64,9 @@ type FProtocol : IProtocol; public - // Create a new TDeserializer that uses the TBinaryProtocol by default. - constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload; - - // Create a new TDeserializer. - // It will use the TProtocol specified by the factory that is passed in. - constructor Create( const factory : IProtocolFactory; - const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload; - - // Create a new TDeserializer. - // It will use the TProtocol and layered transports specified by the factories that are passed in. - constructor Create( const protfact : IProtocolFactory; - const transfact : ITransportFactory; - const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload; + constructor Create( const aProtFact : IProtocolFactory = nil; // defaults to TBinaryProtocol + const aTransFact : ITransportFactory = nil; + const aConfig : IThriftConfiguration = nil); // DTOR destructor Destroy; override; @@ -100,34 +84,24 @@ implementation { TSerializer } -constructor TSerializer.Create; -// Create a new TSerializer that uses the TBinaryProtocol by default. -begin - //no inherited; - Create( TBinaryProtocolImpl.TFactory.Create, nil); -end; - - -constructor TSerializer.Create( const factory : IProtocolFactory); -// Create a new TSerializer. -// It will use the TProtocol specified by the factory that is passed in. -begin - //no inherited; - Create( factory, nil); -end; - - -constructor TSerializer.Create( const protfact : IProtocolFactory; const transfact : ITransportFactory); -// Create a new TSerializer. -// It will use the TProtocol specified by the factory that is passed in. +constructor TSerializer.Create( const aProtFact : IProtocolFactory; + const aTransFact : ITransportFactory; + const aConfig : IThriftConfiguration); var adapter : IThriftStream; + protfact : IProtocolFactory; begin inherited Create; + FStream := TMemoryStream.Create; adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE); - FTransport := TStreamTransportImpl.Create( nil, adapter, TTransportControlImpl.Create(0)); // we don't read anything here - if transfact <> nil then FTransport := transfact.GetTransport( FTransport); - FProtocol := protfact.GetProtocol( FTransport); + + FTransport := TStreamTransportImpl.Create( nil, adapter, aConfig); + if aTransfact <> nil then FTransport := aTransfact.GetTransport( FTransport); + + if aProtFact <> nil + then protfact := aProtFact + else protfact := TBinaryProtocolImpl.TFactory.Create; + FProtocol := protfact.GetProtocol( FTransport); if not FTransport.IsOpen then FTransport.Open; @@ -188,36 +162,24 @@ end; { TDeserializer } -constructor TDeserializer.Create( const aMaxMessageSize : Integer); -// Create a new TDeserializer that uses the TBinaryProtocol by default. -begin - //no inherited; - Create( TBinaryProtocolImpl.TFactory.Create, nil, aMaxMessageSize); -end; - - -constructor TDeserializer.Create( const factory : IProtocolFactory; const aMaxMessageSize : Integer); -// Create a new TDeserializer. -// It will use the TProtocol specified by the factory that is passed in. -begin - //no inherited; - Create( factory, nil, aMaxMessageSize); -end; - - -constructor TDeserializer.Create( const protfact : IProtocolFactory; - const transfact : ITransportFactory; - const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); -// Create a new TDeserializer. -// It will use the TProtocol specified by the factory that is passed in. +constructor TDeserializer.Create( const aProtFact : IProtocolFactory; + const aTransFact : ITransportFactory; + const aConfig : IThriftConfiguration); var adapter : IThriftStream; + protfact : IProtocolFactory; begin inherited Create; + FStream := TMemoryStream.Create; adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE); - FTransport := TStreamTransportImpl.Create( adapter, nil, TTransportControlImpl.Create(aMaxMessageSize)); - if transfact <> nil then FTransport := transfact.GetTransport( FTransport); - FProtocol := protfact.GetProtocol( FTransport); + + FTransport := TStreamTransportImpl.Create( adapter, nil, aConfig); + if aTransfact <> nil then FTransport := aTransfact.GetTransport( FTransport); + + if aProtFact <> nil + then protfact := aProtFact + else protfact := TBinaryProtocolImpl.TFactory.Create; + FProtocol := protfact.GetProtocol( FTransport); if not FTransport.IsOpen then FTransport.Open; diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas index da053b96b..a73e6cb44 100644 --- a/lib/delphi/src/Thrift.Server.pas +++ b/lib/delphi/src/Thrift.Server.pas @@ -32,7 +32,8 @@ uses {$ENDIF} Thrift, Thrift.Protocol, - Thrift.Transport; + Thrift.Transport, + Thrift.Configuration; type IServerEvents = interface @@ -70,6 +71,7 @@ type FOutputProtocolFactory : IProtocolFactory; FLogDelegate : TLogDelegate; FServerEvents : IServerEvents; + FConfiguration : IThriftConfiguration; class procedure DefaultLogDelegate( const str: string); @@ -86,31 +88,17 @@ type const aOutputTransportFactory : ITransportFactory; const aInputProtocolFactory : IProtocolFactory; const aOutputProtocolFactory : IProtocolFactory; + const aConfig : IThriftConfiguration; const aLogDelegate : TLogDelegate ); overload; constructor Create( - const aProcessor :IProcessor; - const aServerTransport: IServerTransport - ); overload; - - constructor Create( - const aProcessor :IProcessor; - const aServerTransport: IServerTransport; - const aLogDelegate: TLogDelegate - ); overload; - - constructor Create( - const aProcessor :IProcessor; - const aServerTransport: IServerTransport; - const aTransportFactory : ITransportFactory - ); overload; - - constructor Create( - const aProcessor :IProcessor; + const aProcessor: IProcessor; const aServerTransport: IServerTransport; - const aTransportFactory : ITransportFactory; - const aProtocolFactory : IProtocolFactory + const aTransportFactory: ITransportFactory = nil; + const aProtocolFactory: IProtocolFactory = nil; + const aConfig : IThriftConfiguration = nil; + const aLogDel: TServerImpl.TLogDelegate = nil ); overload; end; @@ -119,30 +107,6 @@ type private FStop : Boolean; public - constructor Create( - const aProcessor: IProcessor; - const aServerTransport: IServerTransport - ); overload; - - constructor Create( - const aProcessor: IProcessor; - const aServerTransport: IServerTransport; - const ALogDel: TServerImpl.TLogDelegate - ); overload; - - constructor Create( - const aProcessor: IProcessor; - const aServerTransport: IServerTransport; - const aTransportFactory: ITransportFactory - ); overload; - - constructor Create( - const aProcessor: IProcessor; - const aServerTransport: IServerTransport; - const aTransportFactory: ITransportFactory; - const aProtocolFactory: IProtocolFactory - ); overload; - procedure Serve; override; procedure Stop; override; end; @@ -154,83 +118,55 @@ implementation constructor TServerImpl.Create( const aProcessor: IProcessor; const aServerTransport: IServerTransport; - const aLogDelegate: TLogDelegate); -var - InputFactory, OutputFactory : IProtocolFactory; - InputTransFactory, OutputTransFactory : ITransportFactory; - + const aInputTransportFactory, aOutputTransportFactory: ITransportFactory; + const aInputProtocolFactory, aOutputProtocolFactory: IProtocolFactory; + const aConfig : IThriftConfiguration; + const aLogDelegate : TLogDelegate); begin - InputFactory := TBinaryProtocolImpl.TFactory.Create; - OutputFactory := TBinaryProtocolImpl.TFactory.Create; - InputTransFactory := TTransportFactoryImpl.Create; - OutputTransFactory := TTransportFactoryImpl.Create; - - //no inherited; - Create( - aProcessor, - aServerTransport, - InputTransFactory, - OutputTransFactory, - InputFactory, - OutputFactory, - ALogDelegate - ); -end; + inherited Create; + FProcessor := aProcessor; + FServerTransport := aServerTransport; -constructor TServerImpl.Create(const aProcessor: IProcessor; - const aServerTransport: IServerTransport); -var - InputFactory, OutputFactory : IProtocolFactory; - InputTransFactory, OutputTransFactory : ITransportFactory; + if aConfig <> nil + then FConfiguration := aConfig + else FConfiguration := TThriftConfigurationImpl.Create; -begin - InputFactory := TBinaryProtocolImpl.TFactory.Create; - OutputFactory := TBinaryProtocolImpl.TFactory.Create; - InputTransFactory := TTransportFactoryImpl.Create; - OutputTransFactory := TTransportFactoryImpl.Create; - - //no inherited; - Create( - aProcessor, - aServerTransport, - InputTransFactory, - OutputTransFactory, - InputFactory, - OutputFactory, - DefaultLogDelegate - ); -end; + if aInputTransportFactory <> nil + then FInputTransportFactory := aInputTransportFactory + else FInputTransportFactory := TTransportFactoryImpl.Create; -constructor TServerImpl.Create(const aProcessor: IProcessor; - const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory); -var - InputProtocolFactory : IProtocolFactory; - OutputProtocolFactory : IProtocolFactory; -begin - InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; + if aOutputTransportFactory <> nil + then FOutputTransportFactory := aOutputTransportFactory + else FOutputTransportFactory := TTransportFactoryImpl.Create; + + if aInputProtocolFactory <> nil + then FInputProtocolFactory := aInputProtocolFactory + else FInputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; + + if aOutputProtocolFactory <> nil + then FOutputProtocolFactory := aOutputProtocolFactory + else FOutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - //no inherited; - Create( aProcessor, aServerTransport, aTransportFactory, aTransportFactory, - InputProtocolFactory, OutputProtocolFactory, DefaultLogDelegate); + if Assigned(aLogDelegate) + then FLogDelegate := aLogDelegate + else FLogDelegate := DefaultLogDelegate; end; -constructor TServerImpl.Create(const aProcessor: IProcessor; - const aServerTransport: IServerTransport; - const aInputTransportFactory, aOutputTransportFactory: ITransportFactory; - const aInputProtocolFactory, aOutputProtocolFactory: IProtocolFactory; - const aLogDelegate : TLogDelegate); + +constructor TServerImpl.Create( const aProcessor: IProcessor; + const aServerTransport: IServerTransport; + const aTransportFactory: ITransportFactory; + const aProtocolFactory: IProtocolFactory; + const aConfig : IThriftConfiguration; + const aLogDel: TServerImpl.TLogDelegate); begin - inherited Create; - FProcessor := aProcessor; - FServerTransport := aServerTransport; - FInputTransportFactory := aInputTransportFactory; - FOutputTransportFactory := aOutputTransportFactory; - FInputProtocolFactory := aInputProtocolFactory; - FOutputProtocolFactory := aOutputProtocolFactory; - FLogDelegate := aLogDelegate; + Create( aProcessor, aServerTransport, + aTransportFactory, aTransportFactory, + aProtocolFactory, aProtocolFactory, + aConfig, aLogDel); end; + class procedure TServerImpl.DefaultLogDelegate( const str: string); begin try @@ -241,16 +177,6 @@ begin end; end; -constructor TServerImpl.Create( const aProcessor: IProcessor; - const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory; - const aProtocolFactory: IProtocolFactory); -begin - //no inherited; - Create( aProcessor, aServerTransport, - aTransportFactory, aTransportFactory, - aProtocolFactory, aProtocolFactory, - DefaultLogDelegate); -end; function TServerImpl.GetServerEvents : IServerEvents; @@ -268,55 +194,6 @@ end; { TSimpleServer } -constructor TSimpleServer.Create( const aProcessor: IProcessor; - const aServerTransport: IServerTransport); -var - InputProtocolFactory : IProtocolFactory; - OutputProtocolFactory : IProtocolFactory; - InputTransportFactory : ITransportFactory; - OutputTransportFactory : ITransportFactory; -begin - InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - InputTransportFactory := TTransportFactoryImpl.Create; - OutputTransportFactory := TTransportFactoryImpl.Create; - - inherited Create( aProcessor, aServerTransport, InputTransportFactory, - OutputTransportFactory, InputProtocolFactory, OutputProtocolFactory, DefaultLogDelegate); -end; - -constructor TSimpleServer.Create( const aProcessor: IProcessor; - const aServerTransport: IServerTransport; const ALogDel: TServerImpl.TLogDelegate); -var - InputProtocolFactory : IProtocolFactory; - OutputProtocolFactory : IProtocolFactory; - InputTransportFactory : ITransportFactory; - OutputTransportFactory : ITransportFactory; -begin - InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create; - InputTransportFactory := TTransportFactoryImpl.Create; - OutputTransportFactory := TTransportFactoryImpl.Create; - - inherited Create( aProcessor, aServerTransport, InputTransportFactory, - OutputTransportFactory, InputProtocolFactory, OutputProtocolFactory, ALogDel); -end; - -constructor TSimpleServer.Create( const aProcessor: IProcessor; - const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory); -begin - inherited Create( aProcessor, aServerTransport, aTransportFactory, - aTransportFactory, TBinaryProtocolImpl.TFactory.Create, TBinaryProtocolImpl.TFactory.Create, DefaultLogDelegate); -end; - -constructor TSimpleServer.Create( const aProcessor: IProcessor; - const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory; - const aProtocolFactory: IProtocolFactory); -begin - inherited Create( aProcessor, aServerTransport, aTransportFactory, - aTransportFactory, aProtocolFactory, aProtocolFactory, DefaultLogDelegate); -end; - procedure TSimpleServer.Serve; var client : ITransport; diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas index 0f4e723e8..16680591c 100644 --- a/lib/delphi/src/Thrift.Stream.pas +++ b/lib/delphi/src/Thrift.Stream.pas @@ -37,22 +37,16 @@ uses type IThriftStream = interface - ['{DBE61E28-2A77-42DB-A5A3-3CCB8A2D09FA}'] + ['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}'] procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload; procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload; function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; - procedure CheckReadBytesAvailable( const value : Integer); procedure Open; procedure Close; procedure Flush; function IsOpen: Boolean; function ToArray: TBytes; - end; - - - IThriftStream2 = interface( IThriftStream) - ['{1F55D9FE-F617-4B80-B8CA-4A300D8E33F6}'] function Size : Int64; function Position : Int64; end; @@ -67,15 +61,16 @@ type procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload; virtual; function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; inline; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; virtual; - procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract; procedure Open; virtual; abstract; procedure Close; virtual; abstract; procedure Flush; virtual; abstract; function IsOpen: Boolean; virtual; abstract; function ToArray: TBytes; virtual; abstract; + function Size : Int64; virtual; + function Position : Int64; virtual; end; - TThriftStreamAdapterDelphi = class( TThriftStreamImpl, IThriftStream2) + TThriftStreamAdapterDelphi = class( TThriftStreamImpl) strict private FStream : TStream; FOwnsStream : Boolean; @@ -83,38 +78,32 @@ type // IThriftStream procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure Open; override; procedure Close; override; procedure Flush; override; function IsOpen: Boolean; override; function ToArray: TBytes; override; - - // IThriftStream2 - function Size : Int64; - function Position : Int64; + function Size : Int64; override; + function Position : Int64; override; public constructor Create( const aStream: TStream; aOwnsStream : Boolean); destructor Destroy; override; end; - TThriftStreamAdapterCOM = class( TThriftStreamImpl, IThriftStream2) + TThriftStreamAdapterCOM = class( TThriftStreamImpl) strict private FStream : IStream; strict protected // IThriftStream procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure Open; override; procedure Close; override; procedure Flush; override; function IsOpen: Boolean; override; function ToArray: TBytes; override; - - // IThriftStream2 - function Size : Int64; - function Position : Int64; + function Size : Int64; override; + function Position : Int64; override; public constructor Create( const aStream: IStream); end; @@ -191,14 +180,6 @@ begin end; end; -procedure TThriftStreamAdapterCOM.CheckReadBytesAvailable( const value : Integer); -var nRemaining : Int64; -begin - nRemaining := Self.Size - Self.Position; - if nRemaining < value - then raise TTransportExceptionEndOfFile.Create('Not enough input data'); -end; - function TThriftStreamAdapterCOM.ToArray: TBytes; var len : Int64; @@ -267,6 +248,19 @@ begin CheckSizeAndOffset( pBuf, offset+count, offset, count); end; +function TThriftStreamImpl.Size : Int64; +begin + ASSERT(FALSE); + raise ENotImplemented.Create(ClassName+'.Size'); +end; + +function TThriftStreamImpl.Position : Int64; +begin + ASSERT(FALSE); + raise ENotImplemented.Create(ClassName+'.Position'); +end; + + { TThriftStreamAdapterDelphi } constructor TThriftStreamAdapterDelphi.Create( const aStream: TStream; aOwnsStream: Boolean); @@ -332,13 +326,6 @@ begin else Result := 0; end; -procedure TThriftStreamAdapterDelphi.CheckReadBytesAvailable( const value : Integer); -var nRemaining : Int64; -begin - nRemaining := FStream.Size - FStream.Position; - if nRemaining < value then raise TTransportExceptionEndOfFile.Create('Not enough input data'); -end; - function TThriftStreamAdapterDelphi.ToArray: TBytes; var OrgPos : Integer; diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas index b92cce1e1..bdc65d1fd 100644 --- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas +++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas @@ -34,13 +34,14 @@ uses Winapi.ActiveX, Winapi.msxml, {$ENDIF} Thrift.Collections, + Thrift.Configuration, Thrift.Transport, Thrift.Exception, Thrift.Utils, Thrift.Stream; type - TMsxmlHTTPClientImpl = class( TTransportImpl, IHTTPClient) + TMsxmlHTTPClientImpl = class( TEndpointTransportBase, IHTTPClient) strict private FUri : string; FInputStream : IThriftStream; @@ -59,7 +60,6 @@ type function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; procedure Write( const pBuf : Pointer; off, len : Integer); override; procedure Flush; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure SetDnsResolveTimeout(const Value: Integer); function GetDnsResolveTimeout: Integer; @@ -81,26 +81,29 @@ type property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout; property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders; public - constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil); + constructor Create( const aUri: string; const aConfig : IThriftConfiguration); reintroduce; destructor Destroy; override; end; implementation +const + XMLHTTP_CONNECTION_TIMEOUT = 60 * 1000; + XMLHTTP_SENDRECV_TIMEOUT = 30 * 1000; { TMsxmlHTTPClientImpl } -constructor TMsxmlHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl); +constructor TMsxmlHTTPClientImpl.Create( const aUri: string; const aConfig : IThriftConfiguration); begin - inherited Create( aTransportCtl); - FUri := AUri; + inherited Create( aConfig); + FUri := aUri; // defaults according to MSDN FDnsResolveTimeout := 0; // no timeout - FConnectionTimeout := 60 * 1000; - FSendTimeout := 30 * 1000; - FReadTimeout := 30 * 1000; + FConnectionTimeout := XMLHTTP_CONNECTION_TIMEOUT; + FSendTimeout := XMLHTTP_SENDRECV_TIMEOUT; + FReadTimeout := XMLHTTP_SENDRECV_TIMEOUT; FCustomHeaders := TThriftDictionaryImpl<string,string>.Create; FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True); @@ -219,13 +222,6 @@ begin end; end; -procedure TMsxmlHTTPClientImpl.CheckReadBytesAvailable( const value : Integer); -begin - if FInputStream <> nil - then FInputStream.CheckReadBytesAvailable( value) - else raise TTransportExceptionNotOpen.Create('No request has been sent'); -end; - function TMsxmlHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; begin if FInputStream = nil then begin @@ -234,7 +230,6 @@ begin try Result := FInputStream.Read( pBuf, buflen, off, len); - ConsumeReadBytes( result); except on E: Exception do raise TTransportExceptionUnknown.Create(E.Message); @@ -261,6 +256,7 @@ begin xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference ))); FInputStream := nil; FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream); + UpdateKnownMessageSize( FInputStream.Size); finally ms.Free; end; diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas index b602b64c8..635a84178 100644 --- a/lib/delphi/src/Thrift.Transport.Pipes.pas +++ b/lib/delphi/src/Thrift.Transport.Pipes.pas @@ -29,6 +29,7 @@ uses {$ELSE} Winapi.Windows, System.SysUtils, System.Math, Winapi.AccCtrl, Winapi.AclAPI, System.SyncObjs, {$ENDIF} + Thrift.Configuration, Thrift.Transport, Thrift.Utils, Thrift.Stream; @@ -53,7 +54,6 @@ type //procedure Open; override; - see derived classes procedure Close; override; procedure Flush; override; - procedure CheckReadBytesAvailable( const value : Integer); override; function ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; function ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; @@ -65,7 +65,9 @@ type public constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; - const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT); + const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT + ); reintroduce; overload; + destructor Destroy; override; end; @@ -85,7 +87,8 @@ type const aShareMode: DWORD = 0; const aSecurityAttributes: PSecurityAttributes = nil; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; - const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT); overload; + const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT + ); reintroduce; overload; end; @@ -99,7 +102,9 @@ type public constructor Create( const aPipeHandle : THandle; const aOwnsHandle, aEnableOverlapped : Boolean; - const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload; + const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT + ); reintroduce; overload; + destructor Destroy; override; end; @@ -113,7 +118,7 @@ type TPipeTransportBase = class( TStreamTransportImpl, IPipeTransport) - public + strict protected // ITransport function GetIsOpen: Boolean; override; procedure Open; override; @@ -127,27 +132,32 @@ type constructor Create( const aPipe : THandle; const aOwnsHandle : Boolean; const aTimeOut : DWORD; - const aTransportCtl : ITransportControl); overload; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; constructor Create( const aPipeName : string; const aShareMode: DWORD = 0; const aSecurityAttributes: PSecurityAttributes = nil; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT; - const aTransportCtl : ITransportControl = nil); overload; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; end; TNamedPipeTransportServerEndImpl = class( TNamedPipeTransportClientEndImpl) strict private FHandle : THandle; - public + strict protected // ITransport procedure Close; override; + public constructor Create( const aPipe : THandle; const aOwnsHandle : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; - const aTransportCtl : ITransportControl = nil); reintroduce; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; + end; @@ -157,7 +167,8 @@ type constructor Create( const aPipeRead, aPipeWrite : THandle; const aOwnsHandles : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; - const aTransportCtl : ITransportControl = nil); overload; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; end; @@ -187,7 +198,7 @@ type procedure InternalClose; virtual; abstract; function QueryStopServer : Boolean; public - constructor Create; + constructor Create( const aConfig : IThriftConfiguration); destructor Destroy; override; procedure Listen; override; procedure Close; override; @@ -221,7 +232,10 @@ type procedure InternalClose; override; public - constructor Create(aBufsize : Cardinal = 4096; aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); + constructor Create( const aBufsize : Cardinal = 4096; + const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; end; @@ -245,9 +259,12 @@ type procedure InternalClose; override; public - constructor Create( aPipename : string; aBufsize : Cardinal = 4096; - aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES; - aTimeOut : Cardinal = INFINITE); + constructor Create( const aPipename : string; + const aBufsize : Cardinal = 4096; + const aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES; + const aTimeOut : Cardinal = INFINITE; + const aConfig : IThriftConfiguration = nil + ); reintroduce; overload; end; @@ -278,15 +295,14 @@ end; { TPipeStreamBase } -constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean; - const aTimeOut, aOpenTimeOut : DWORD); +constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean; const aTimeOut, aOpenTimeOut : DWORD); begin inherited Create; - ASSERT( aTimeout > 0); // aOpenTimeout may be 0 FPipe := INVALID_HANDLE_VALUE; FTimeout := aTimeOut; FOpenTimeOut := aOpenTimeOut; FOverlapped := aEnableOverlapped; + ASSERT( FTimeout > 0); // FOpenTimeout may be 0 end; @@ -318,12 +334,6 @@ begin end; -procedure TPipeStreamBase.CheckReadBytesAvailable( const value : Integer); -begin - // can't tell how much we can suck out of the pipe -end; - - procedure TPipeStreamBase.Write( const pBuf : Pointer; offset, count : Integer); begin if FOverlapped @@ -538,7 +548,7 @@ constructor TNamedPipeStreamImpl.Create( const aPipeName : string; const aSecurityAttributes: PSecurityAttributes; const aTimeOut, aOpenTimeOut : DWORD); begin - inherited Create( aEnableOverlapped, aTimeout, aOpenTimeOut); + inherited Create( aEnableOverlapped, aTimeOut, aOpenTimeOut); FPipeName := aPipeName; FShareMode := aShareMode; @@ -601,7 +611,7 @@ constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle; const aOwnsHandle, aEnableOverlapped : Boolean; const aTimeOut : DWORD); begin - inherited Create( aEnableOverlapped, aTimeOut); + inherited Create( aEnableOverlapped, aTimeout, aTimeout); if aOwnsHandle then FSrcHandle := aPipeHandle @@ -655,13 +665,14 @@ end; { TNamedPipeTransportClientEndImpl } -constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD; +constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; + const aShareMode: DWORD; const aSecurityAttributes: PSecurityAttributes; const aTimeOut, aOpenTimeOut : DWORD; - const aTransportCtl : ITransportControl); + const aConfig : IThriftConfiguration); // Named pipe constructor begin - inherited Create( nil, nil, aTransportCtl); + inherited Create( nil, nil, aConfig); FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut); FOutputStream := FInputStream; // true for named pipes end; @@ -670,11 +681,11 @@ end; constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle; const aOwnsHandle : Boolean; const aTimeOut : DWORD; - const aTransportCtl : ITransportControl); + const aConfig : IThriftConfiguration); // Named pipe constructor begin - inherited Create( nil, nil, aTransportCtl); - FInputStream := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut); + inherited Create( nil, nil, aConfig); + FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut); FOutputStream := FInputStream; // true for named pipes end; @@ -685,11 +696,11 @@ end; constructor TNamedPipeTransportServerEndImpl.Create( const aPipe : THandle; const aOwnsHandle : Boolean; const aTimeOut : DWORD; - const aTransportCtl : ITransportControl); + const aConfig : IThriftConfiguration); // Named pipe constructor begin FHandle := DuplicatePipeHandle( aPipe); - inherited Create( aPipe, aOwnsHandle, aTimeOut, aTransportCtl); + inherited Create( aPipe, aOwnsHandle, aTimeout, aConfig); end; @@ -709,22 +720,22 @@ end; constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle; const aOwnsHandles : Boolean; const aTimeOut : DWORD; - const aTransportCtl : ITransportControl); + const aConfig : IThriftConfiguration); // Anonymous pipe constructor begin - inherited Create( nil, nil, aTransportCtl); + inherited Create( nil, nil, aConfig); // overlapped is not supported with AnonPipes, see MSDN - FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeOut); - FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeOut); + FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout); + FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout); end; { TPipeServerTransportBase } -constructor TPipeServerTransportBase.Create; +constructor TPipeServerTransportBase.Create( const aConfig : IThriftConfiguration); begin - inherited Create; + inherited Create( aConfig); FStopServer := TEvent.Create(nil,TRUE,FALSE,''); // manual reset end; @@ -761,11 +772,12 @@ end; { TAnonymousPipeServerTransportImpl } - -constructor TAnonymousPipeServerTransportImpl.Create(aBufsize : Cardinal; aTimeOut : DWORD); +constructor TAnonymousPipeServerTransportImpl.Create( const aBufsize : Cardinal; + const aTimeOut : DWORD; + const aConfig : IThriftConfiguration); // Anonymous pipe CTOR begin - inherited Create; + inherited Create(aConfig); FBufsize := aBufSize; FReadHandle := INVALID_HANDLE_VALUE; FWriteHandle := INVALID_HANDLE_VALUE; @@ -794,7 +806,7 @@ begin then raise TTransportExceptionNotOpen.Create('TServerPipe unable to initiate pipe communication'); // create the transport impl - result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE, FTimeOut); + result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE, FTimeOut, Configuration); end; @@ -872,17 +884,19 @@ end; { TNamedPipeServerTransportImpl } -constructor TNamedPipeServerTransportImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal); +constructor TNamedPipeServerTransportImpl.Create( const aPipename : string; + const aBufsize, aMaxConns, aTimeOut : Cardinal; + const aConfig : IThriftConfiguration); // Named Pipe CTOR begin - inherited Create; - ASSERT( aTimeout > 0); + inherited Create( aConfig); FPipeName := aPipename; FBufsize := aBufSize; FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns)); FHandle := INVALID_HANDLE_VALUE; FTimeout := aTimeOut; FConnected := FALSE; + ASSERT( FTimeout > 0); if Copy(FPipeName,1,2) <> '\\' then FPipeName := '\\.\pipe\' + FPipeName; // assume localhost @@ -951,7 +965,7 @@ begin hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE))); try FConnected := FALSE; - result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout); + result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout, Configuration); except ClosePipeHandle(hPipe); raise; diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas index 2f961a001..7a1b48fa2 100644 --- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas +++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas @@ -29,6 +29,7 @@ uses Math, Generics.Collections, Thrift.Collections, + Thrift.Configuration, Thrift.Transport, Thrift.Exception, Thrift.Utils, @@ -36,7 +37,7 @@ uses Thrift.Stream; type - TWinHTTPClientImpl = class( TTransportImpl, IHTTPClient) + TWinHTTPClientImpl = class( TEndpointTransportBase, IHTTPClient) strict private FUri : string; FInputStream : IThriftStream; @@ -58,19 +59,16 @@ type THTTPResponseStream = class( TThriftStreamImpl) strict private FRequest : IWinHTTPRequest; - FTransportControl : ITransportControl; strict protected procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; - procedure CheckReadBytesAvailable( const value : Integer); override; - procedure ConsumeReadBytes( const count : Integer); procedure Open; override; procedure Close; override; procedure Flush; override; function IsOpen: Boolean; override; function ToArray: TBytes; override; public - constructor Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl); + constructor Create( const aRequest : IWinHTTPRequest); destructor Destroy; override; end; @@ -81,7 +79,6 @@ type function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; procedure Write( const pBuf : Pointer; off, len : Integer); override; procedure Flush; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure SetDnsResolveTimeout(const Value: Integer); function GetDnsResolveTimeout: Integer; @@ -103,25 +100,29 @@ type property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout; property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders; public - constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil); + constructor Create( const aUri: string; const aConfig : IThriftConfiguration = nil); destructor Destroy; override; end; implementation +const + WINHTTP_CONNECTION_TIMEOUT = 60 * 1000; + WINHTTP_SENDRECV_TIMEOUT = 30 * 1000; + { TWinHTTPClientImpl } -constructor TWinHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl); +constructor TWinHTTPClientImpl.Create( const aUri: string; const aConfig : IThriftConfiguration); begin - inherited Create( aTransportCtl); + inherited Create( aConfig); FUri := AUri; // defaults according to MSDN FDnsResolveTimeout := 0; // no timeout - FConnectionTimeout := 60 * 1000; - FSendTimeout := 30 * 1000; - FReadTimeout := 30 * 1000; + FConnectionTimeout := WINHTTP_CONNECTION_TIMEOUT; + FSendTimeout := WINHTTP_SENDRECV_TIMEOUT; + FReadTimeout := WINHTTP_SENDRECV_TIMEOUT; FSecureProtocols := DEFAULT_THRIFT_SECUREPROTOCOLS; @@ -288,13 +289,6 @@ begin end; end; -procedure TWinHTTPClientImpl.CheckReadBytesAvailable( const value : Integer); -begin - if FInputStream <> nil - then FInputStream.CheckReadBytesAvailable( value) - else raise TTransportExceptionNotOpen.Create('No request has been sent'); -end; - function TWinHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; begin if FInputStream = nil then begin @@ -303,7 +297,7 @@ begin try Result := FInputStream.Read( pBuf, buflen, off, len); - ConsumeReadBytes( result); + CountConsumedMessageBytes( result); except on E: Exception do raise TTransportExceptionUnknown.Create(E.Message); @@ -313,7 +307,6 @@ end; procedure TWinHTTPClientImpl.SendRequest; var http : IWinHTTPRequest; - ctrl : ITransportControl; pData : PByte; len : Integer; error : Cardinal; @@ -340,8 +333,8 @@ begin else raise TTransportExceptionInterrupted.Create( sMsg); end; - ctrl := TTransportControlImpl.Create( TransportControl.MaxAllowedMessageSize); - FInputStream := THTTPResponseStream.Create( http, ctrl); + FInputStream := THTTPResponseStream.Create( http); + UpdateKnownMessageSize( http.QueryTotalResponseSize); end; procedure TWinHTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer); @@ -355,12 +348,10 @@ end; { TWinHTTPClientImpl.THTTPResponseStream } -constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl); +constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest); begin inherited Create; FRequest := aRequest; - FTransportControl := aTransportCtl; - ASSERT( FTransportControl <> nil); end; destructor TWinHTTPClientImpl.THTTPResponseStream.Destroy; @@ -406,8 +397,6 @@ begin if count >= buflen-offset then count := buflen-offset; - CheckReadBytesAvailable(count); - if count > 0 then begin pTmp := pBuf; Inc( pTmp, offset); @@ -415,20 +404,6 @@ begin ASSERT( Result >= 0); end else Result := 0; - - ConsumeReadBytes( result); -end; - -procedure TWinHTTPClientImpl.THTTPResponseStream.ConsumeReadBytes( const count : Integer); -begin - if FTransportControl <> nil - then FTransportControl.ConsumeReadBytes( count); -end; - -procedure TWinHTTPClientImpl.THTTPResponseStream.CheckReadBytesAvailable( const value : Integer); -begin - if Int64(value) > Int64(FRequest.QueryTotalResponseSize) - then raise TTransportExceptionEndOfFile.Create('Not enough input data'); end; function TWinHTTPClientImpl.THTTPResponseStream.ToArray: TBytes; diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas index 0a9a39e04..af62548db 100644 --- a/lib/delphi/src/Thrift.Transport.pas +++ b/lib/delphi/src/Thrift.Transport.pas @@ -38,6 +38,7 @@ uses Thrift.Socket, {$ENDIF} {$ENDIF} + Thrift.Configuration, Thrift.Collections, Thrift.Exception, Thrift.Utils, @@ -49,28 +50,10 @@ const DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms type - ITransportControl = interface - ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}'] - function MaxAllowedMessageSize : Integer; - procedure ConsumeReadBytes( const count : Integer); - procedure ResetConsumedMessageSize; - end; - - TTransportControlImpl = class( TInterfacedObject, ITransportControl) - strict private - FMaxAllowedMsgSize : Integer; - FRemainingMsgSize : Integer; - strict protected - // ITransportControl - function MaxAllowedMessageSize : Integer; - procedure ConsumeReadBytes( const count : Integer); - procedure ResetConsumedMessageSize; - public - constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); reintroduce; - end; + IStreamTransport = interface; ITransport = interface - ['{938F6EB5-1848-43D5-8AC4-07633C55B229}'] + ['{52F81383-F880-492F-8AA7-A66B85B93D6B}'] function GetIsOpen: Boolean; property IsOpen: Boolean read GetIsOpen; function Peek: Boolean; @@ -87,14 +70,14 @@ type procedure Write( const pBuf : Pointer; len : Integer); overload; procedure Flush; - function TransportControl : ITransportControl; - procedure CheckReadBytesAvailable( const value : Integer); + function Configuration : IThriftConfiguration; + function MaxMessageSize : Integer; + procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); + procedure CheckReadBytesAvailable( const numBytes : Int64); + procedure UpdateKnownMessageSize( const size : Int64); end; - TTransportImpl = class( TInterfacedObject, ITransport) - strict private - FTransportControl : ITransportControl; - + TTransportBase = class abstract( TInterfacedObject) strict protected function GetIsOpen: Boolean; virtual; abstract; property IsOpen: Boolean read GetIsOpen; @@ -112,12 +95,44 @@ type procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract; procedure Flush; virtual; - function TransportControl : ITransportControl; inline; - procedure ConsumeReadBytes( const count : Integer); inline; - procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract; + function Configuration : IThriftConfiguration; virtual; abstract; + procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract; + end; + + // base class for all endpoint transports, e.g. sockets, pipes or HTTP + TEndpointTransportBase = class abstract( TTransportBase, ITransport) + strict private + FRemainingMessageSize : Int64; + FKnownMessageSize : Int64; + FConfiguration : IThriftConfiguration; + strict protected + function Configuration : IThriftConfiguration; override; + function MaxMessageSize : Integer; + property RemainingMessageSize : Int64 read FRemainingMessageSize; + property KnownMessageSize : Int64 read FKnownMessageSize; + procedure ResetConsumedMessageSize( const newSize : Int64 = -1); inline; + procedure UpdateKnownMessageSize(const size : Int64); override; + procedure CheckReadBytesAvailable(const numBytes : Int64); inline; + procedure CountConsumedMessageBytes(const numBytes : Int64); inline; + public + constructor Create( const aConfig : IThriftConfiguration); reintroduce; + end; + // base class for all layered transports, e.g. framed + TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport) + strict private + FTransport : T; + strict protected + property InnerTransport : T read FTransport; + function GetUnderlyingTransport: ITransport; + function Configuration : IThriftConfiguration; override; + procedure UpdateKnownMessageSize( const size : Int64); override; + function MaxMessageSize : Integer; inline; + procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline; + procedure CheckReadBytesAvailable( const numBytes : Int64); virtual; public - constructor Create( const aTransportCtl : ITransportControl); reintroduce; + constructor Create( const aTransport: T); reintroduce; + property UnderlyingTransport: ITransport read GetUnderlyingTransport; end; TTransportException = class abstract( TException) @@ -220,17 +235,23 @@ type end; IServerTransport = interface - ['{C43B87ED-69EA-47C4-B77C-15E288252900}'] + ['{FA01363F-6B40-482F-971E-4A085535EFC8}'] procedure Listen; procedure Close; function Accept( const fnAccepting: TProc): ITransport; + function Configuration : IThriftConfiguration; end; TServerTransportImpl = class( TInterfacedObject, IServerTransport) + strict private + FConfig : IThriftConfiguration; strict protected + function Configuration : IThriftConfiguration; procedure Listen; virtual; abstract; procedure Close; virtual; abstract; - function Accept( const fnAccepting: TProc): ITransport; virtual; abstract; + function Accept( const fnAccepting: TProc): ITransport; virtual; abstract; + public + constructor Create( const aConfig : IThriftConfiguration); end; ITransportFactory = interface @@ -238,11 +259,13 @@ type function GetTransport( const aTransport: ITransport): ITransport; end; - TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory) + TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory) + strict protected function GetTransport( const aTransport: ITransport): ITransport; virtual; end; - TTcpSocketStreamImpl = class( TThriftStreamImpl ) + + TTcpSocketStreamImpl = class( TThriftStreamImpl) {$IFDEF OLD_SOCKETS} strict private type TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error); @@ -261,7 +284,6 @@ type strict protected procedure Write( const pBuf : Pointer; offset, count: Integer); override; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure Open; override; procedure Close; override; procedure Flush; override; @@ -270,9 +292,9 @@ type function ToArray: TBytes; override; public {$IFDEF OLD_SOCKETS} - constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0); + constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT); {$ELSE} - constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0); + constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT); {$ENDIF} end; @@ -284,7 +306,7 @@ type property OutputStream : IThriftStream read GetOutputStream; end; - TStreamTransportImpl = class( TTransportImpl, IStreamTransport) + TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport) strict protected FInputStream : IThriftStream; FOutputStream : IThriftStream; @@ -294,7 +316,6 @@ type function GetInputStream: IThriftStream; function GetOutputStream: IThriftStream; - procedure CheckReadBytesAvailable( const value : Integer); override; strict protected procedure Open; override; procedure Close; override; @@ -302,7 +323,7 @@ type function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; procedure Write( const pBuf : Pointer; off, len : Integer); override; public - constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil); + constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce; destructor Destroy; override; property InputStream : IThriftStream read GetInputStream; @@ -318,12 +339,13 @@ type strict protected procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override; function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; - procedure CheckReadBytesAvailable( const value : Integer); override; procedure Open; override; procedure Close; override; procedure Flush; override; function IsOpen: Boolean; override; function ToArray: TBytes; override; + function Size : Int64; override; + function Position : Int64; override; public constructor Create( const aStream: IThriftStream; const aBufSize : Integer); destructor Destroy; override; @@ -340,38 +362,34 @@ type {$ENDIF} FUseBufferedSocket : Boolean; FOwnsServer : Boolean; - FTransportControl : ITransportControl; strict protected function Accept( const fnAccepting: TProc) : ITransport; override; - property TransportControl : ITransportControl read FTransportControl; public -{$IFDEF OLD_SOCKETS} - constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload; - constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload; -{$ELSE} - constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload; - constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload; -{$ENDIF} + {$IFDEF OLD_SOCKETS} + constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload; + constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload; + {$ELSE} + constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload; + constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload; + {$ENDIF} + destructor Destroy; override; procedure Listen; override; procedure Close; override; end; - TBufferedTransportImpl = class( TTransportImpl ) + TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>) strict private FInputBuffer : IThriftStream; FOutputBuffer : IThriftStream; - FTransport : IStreamTransport; FBufSize : Integer; procedure InitBuffers; - function GetUnderlyingTransport: ITransport; strict protected function GetIsOpen: Boolean; override; procedure Flush; override; - procedure CheckReadBytesAvailable( const value : Integer); override; public type TFactory = class( TTransportFactoryImpl ) @@ -384,7 +402,7 @@ type procedure Close(); override; function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; procedure Write( const pBuf : Pointer; off, len : Integer); override; - property UnderlyingTransport: ITransport read GetUnderlyingTransport; + procedure CheckReadBytesAvailable( const value : Int64); override; property IsOpen: Boolean read GetIsOpen; end; @@ -408,15 +426,16 @@ type strict protected function GetIsOpen: Boolean; override; public - procedure Open; override; {$IFDEF OLD_SOCKETS} - constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload; - constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload; + constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload; + constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload; {$ELSE} - constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload; - constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload; + constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload; + constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload; {$ENDIF} destructor Destroy; override; + + procedure Open; override; procedure Close; override; {$IFDEF OLD_SOCKETS} property TcpClient: TCustomIpClient read FClient; @@ -427,29 +446,25 @@ type property Port: Integer read FPort; end; - TFramedTransportImpl = class( TTransportImpl) - strict protected const - DEFAULT_MAX_LENGTH = 16384000; // this value is used by all Thrift libraries + TFramedTransportImpl = class( TLayeredTransportBase<ITransport>) strict protected type TFramedHeader = Int32; strict protected - FTransport : ITransport; FWriteBuffer : TMemoryStream; FReadBuffer : TMemoryStream; - FMaxFrameSize : Integer; - procedure InitMaxFrameSize; procedure InitWriteBuffer; procedure ReadFrame; procedure Open(); override; - function GetIsOpen: Boolean; override; + function GetIsOpen: Boolean; override; procedure Close(); override; function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; procedure Write( const pBuf : Pointer; off, len : Integer); override; + procedure CheckReadBytesAvailable( const value : Int64); override; procedure Flush; override; - procedure CheckReadBytesAvailable( const value : Integer); override; + public type TFactory = class( TTransportFactoryImpl ) @@ -457,7 +472,6 @@ type function GetTransport( const aTransport: ITransport): ITransport; override; end; - constructor Create( const aTransportCtl : ITransportControl); overload; constructor Create( const aTransport: ITransport); overload; destructor Destroy; override; end; @@ -469,122 +483,182 @@ const implementation -{ TTransportControlImpl } +{ TTransportBase } -constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer); +procedure TTransportBase.Flush; begin - inherited Create; + // nothing to do +end; - if aMaxMessageSize > 0 - then FMaxAllowedMsgSize := aMaxMessageSize - else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE; +function TTransportBase.Peek: Boolean; +begin + Result := IsOpen; +end; - ResetConsumedMessageSize; +function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer; +begin + if Length(buf) > 0 + then result := Read( @buf[0], Length(buf), off, len) + else result := 0; end; -function TTransportControlImpl.MaxAllowedMessageSize : Integer; +function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; begin - result := FMaxAllowedMsgSize; + if Length(buf) > 0 + then result := ReadAll( @buf[0], Length(buf), off, len) + else result := 0; end; -procedure TTransportControlImpl.ResetConsumedMessageSize; +function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; +var ret : Integer; begin - FRemainingMsgSize := MaxAllowedMessageSize; + result := 0; + while result < len do begin + ret := Read( pBuf, buflen, off + result, len - result); + if ret > 0 + then Inc( result, ret) + else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' ); + end; end; +procedure TTransportBase.Write( const buf: TBytes); +begin + if Length(buf) > 0 + then Write( @buf[0], 0, Length(buf)); +end; -procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer); +procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer); begin - if FRemainingMsgSize >= count - then Dec( FRemainingMsgSize, count) - else begin - FRemainingMsgSize := 0; - if FRemainingMsgSize < count - then raise TTransportExceptionEndOfFile.Create('Maximum message size reached'); - end; + if Length(buf) > 0 + then Write( @buf[0], off, len); end; +procedure TTransportBase.Write( const pBuf : Pointer; len : Integer); +begin + Self.Write( pBuf, 0, len); +end; -{ TTransportImpl } -constructor TTransportImpl.Create( const aTransportCtl : ITransportControl); +{ TEndpointTransportBase } + +constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration); begin inherited Create; - if aTransportCtl <> nil - then FTransportControl := aTransportCtl - else FTransportControl := TTransportControlImpl.Create; - ASSERT( FTransportControl <> nil); + if aConfig <> nil + then FConfiguration := aConfig + else FConfiguration := TThriftConfigurationImpl.Create; + + ResetConsumedMessageSize; end; -procedure TTransportImpl.Flush; +function TEndpointTransportBase.Configuration : IThriftConfiguration; begin - // nothing to do + result := FConfiguration; end; -function TTransportImpl.Peek: Boolean; + +function TEndpointTransportBase.MaxMessageSize : Integer; begin - Result := IsOpen; + ASSERT( Configuration <> nil); + result := Configuration.MaxMessageSize; end; -function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer; + +procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64); +// Resets RemainingMessageSize to the configured maximum begin - if Length(buf) > 0 - then result := Read( @buf[0], Length(buf), off, len) - else result := 0; + // full reset + if newSize < 0 then begin + FKnownMessageSize := MaxMessageSize; + FRemainingMessageSize := MaxMessageSize; + Exit; + end; + + // update only: message size can shrink, but not grow + ASSERT( KnownMessageSize <= MaxMessageSize); + if newSize > KnownMessageSize + then TTransportExceptionEndOfFile.Create('MaxMessageSize reached'); + + FKnownMessageSize := newSize; + FRemainingMessageSize := newSize; end; -function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; + +procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64); +// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport). +// Will throw if we already consumed too many bytes. +var consumed : Int64; begin - if Length(buf) > 0 - then result := ReadAll( @buf[0], Length(buf), off, len) - else result := 0; + consumed := KnownMessageSize - RemainingMessageSize; + ResetConsumedMessageSize(size); + CountConsumedMessageBytes(consumed); end; -function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; -var ret : Integer; + +procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64); +// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data begin - result := 0; - while result < len do begin - ret := Read( pBuf, buflen, off + result, len - result); - if ret > 0 - then Inc( result, ret) - else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' ); + if RemainingMessageSize < numBytes + then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached'); +end; + + +procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64); +// Consumes numBytes from the RemainingMessageSize. +begin + if (RemainingMessageSize >= numBytes) + then Dec( FRemainingMessageSize, numBytes) + else begin + FRemainingMessageSize := 0; + raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached'); end; end; -procedure TTransportImpl.Write( const buf: TBytes); +{ TLayeredTransportBase } + +constructor TLayeredTransportBase<T>.Create( const aTransport: T); begin - if Length(buf) > 0 - then Write( @buf[0], 0, Length(buf)); + inherited Create; + FTransport := aTransport; end; -procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer); +function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport; begin - if Length(buf) > 0 - then Write( @buf[0], off, len); + result := InnerTransport; end; -procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer); +function TLayeredTransportBase<T>.Configuration : IThriftConfiguration; begin - Self.Write( pBuf, 0, len); + result := InnerTransport.Configuration; +end; + +procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64); +begin + InnerTransport.UpdateKnownMessageSize( size); end; -function TTransportImpl.TransportControl : ITransportControl; +function TLayeredTransportBase<T>.MaxMessageSize : Integer; begin - result := FTransportControl; + result := InnerTransport.MaxMessageSize; end; -procedure TTransportImpl.ConsumeReadBytes( const count : Integer); +procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1); begin - if FTransportControl <> nil - then FTransportControl.ConsumeReadBytes( count); + InnerTransport.ResetConsumedMessageSize( knownSize); end; +procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64); +begin + InnerTransport.CheckReadBytesAvailable( numBytes); +end; + + + { TTransportException } constructor TTransportException.HiddenCreate(const Msg: string); @@ -676,18 +750,33 @@ begin Result := aTransport; end; + +{ TServerTransportImpl } + +constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration); +begin + inherited Create; + if aConfig <> nil + then FConfig := aConfig + else FConfig := TThriftConfigurationImpl.Create; +end; + +function TServerTransportImpl.Configuration : IThriftConfiguration; +begin + result := FConfig; +end; + { TServerSocket } {$IFDEF OLD_SOCKETS} -constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl); +constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration); {$ELSE} -constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl); +constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration); {$ENDIF} begin - inherited Create; + inherited Create( aConfig); FServer := aServer; - FTransportControl := aTransportCtl; - ASSERT( FTransportControl <> nil); + {$IFDEF OLD_SOCKETS} FClientTimeout := aClientTimeout; @@ -699,17 +788,12 @@ end; {$IFDEF OLD_SOCKETS} -constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl); +constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration); {$ELSE} -constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl); +constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration); {$ENDIF} begin - inherited Create; - - if aTransportCtl <> nil - then FTransportControl := aTransportCtl - else FTransportControl := TTransportControlImpl.Create; - ASSERT( FTransportControl <> nil); + inherited Create( aConfig); {$IFDEF OLD_SOCKETS} FPort := aPort; @@ -772,7 +856,7 @@ begin Exit; end; - trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl); + trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration); client := nil; // trans owns it now if FUseBufferedSocket @@ -791,7 +875,7 @@ begin client := FServer.Accept; try - trans := TSocketImpl.Create(client, True, TransportControl); + trans := TSocketImpl.Create(client, TRUE, Configuration); client := nil; if FUseBufferedSocket then @@ -840,9 +924,9 @@ end; { TSocket } {$IFDEF OLD_SOCKETS} -constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl); +constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration); {$ELSE} -constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl); +constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration); {$ENDIF} var stream : IThriftStream; begin @@ -856,16 +940,17 @@ begin {$ENDIF} stream := TTcpSocketStreamImpl.Create( FClient, FTimeout); - inherited Create( stream, stream, aTransportCtl); + inherited Create( stream, stream, aConfig); end; + {$IFDEF OLD_SOCKETS} -constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl); +constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration); {$ELSE} -constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl); +constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration); {$ENDIF} begin - inherited Create(nil,nil, aTransportCtl); + inherited Create(nil,nil, aConfig); FHost := aHost; FPort := aPort; FTimeout := aTimeout; @@ -1043,30 +1128,12 @@ begin end; -procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer); -var nRequired : Integer; -begin - nRequired := value; - - if FReadBuffer <> nil then begin - Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size)); - if nRequired <= 0 then Exit; - end; - - if FStream <> nil - then FStream.CheckReadBytesAvailable( nRequired) - else raise TTransportExceptionEndOfFile.Create('Not enough input data'); -end; - - function TBufferedStreamImpl.ToArray: TBytes; var len : Integer; begin - len := 0; - - if IsOpen then begin - len := FReadBuffer.Size; - end; + if IsOpen + then len := FReadBuffer.Size + else len := 0; SetLength( Result, len); @@ -1092,11 +1159,24 @@ begin end; end; + +function TBufferedStreamImpl.Size : Int64; +begin + result := FReadBuffer.Size; +end; + + +function TBufferedStreamImpl.Position : Int64; +begin + result := FReadBuffer.Position; +end; + + { TStreamTransportImpl } -constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl); +constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration); begin - inherited Create( aTransportCtl); + inherited Create( aConfig); FInputStream := aInputStream; FOutputStream := aOutputStream; end; @@ -1149,7 +1229,7 @@ begin then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' ); Result := FInputStream.Read( pBuf,buflen, off, len ); - ConsumeReadBytes( result); + CountConsumedMessageBytes( result); end; procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer); @@ -1160,28 +1240,19 @@ begin FOutputStream.Write( pBuf, off, len ); end; -procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer); -begin - if FInputStream <> nil - then FInputStream.CheckReadBytesAvailable( value) - else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' ); -end; - - { TBufferedTransportImpl } constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer); begin ASSERT( aTransport <> nil); - inherited Create( aTransport.TransportControl); - FTransport := aTransport; + inherited Create( aTransport); FBufSize := aBufSize; InitBuffers; end; procedure TBufferedTransportImpl.Close; begin - FTransport.Close; + InnerTransport.Close; FInputBuffer := nil; FOutputBuffer := nil; end; @@ -1195,34 +1266,29 @@ end; function TBufferedTransportImpl.GetIsOpen: Boolean; begin - Result := FTransport.IsOpen; -end; - -function TBufferedTransportImpl.GetUnderlyingTransport: ITransport; -begin - Result := FTransport; + Result := InnerTransport.IsOpen; end; procedure TBufferedTransportImpl.InitBuffers; begin - if FTransport.InputStream <> nil then begin - FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize ); + if InnerTransport.InputStream <> nil then begin + FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize ); end; - if FTransport.OutputStream <> nil then begin - FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize ); + if InnerTransport.OutputStream <> nil then begin + FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize ); end; end; procedure TBufferedTransportImpl.Open; begin - FTransport.Open; + InnerTransport.Open; InitBuffers; // we need to get the buffers to match FTransport substreams again end; function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; begin if FInputBuffer <> nil - then Result := FInputBuffer.Read( pBuf,buflen, off, len) + then Result := FInputBuffer.Read( pBuf,buflen, off, len ) else Result := 0; end; @@ -1233,23 +1299,18 @@ begin end; end; -procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer); -var stm2 : IThriftStream2; - need : Integer; +procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64); +var buffered, need : Int64; begin need := value; // buffered bytes - if Supports( FInputBuffer, IThriftStream2, stm2) then begin - Dec( need, stm2.Size - stm2.Position); - if need <= 0 then Exit; - end; - - if FInputBuffer <> nil - then FInputBuffer.CheckReadBytesAvailable( need) - else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' ); + buffered := FInputBuffer.Size - FInputBuffer.Position; + if buffered < need + then InnerTransport.CheckReadBytesAvailable( need - buffered); end; + { TBufferedTransportImpl.TFactory } function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport; @@ -1260,53 +1321,33 @@ end; { TFramedTransportImpl } -constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl); -begin - inherited Create( aTransportCtl); - - InitMaxFrameSize; - InitWriteBuffer; -end; - constructor TFramedTransportImpl.Create( const aTransport: ITransport); begin ASSERT( aTransport <> nil); - inherited Create( aTransport.TransportControl); + inherited Create( aTransport); - InitMaxFrameSize; InitWriteBuffer; - FTransport := aTransport; end; destructor TFramedTransportImpl.Destroy; begin FWriteBuffer.Free; + FWriteBuffer := nil; FReadBuffer.Free; + FReadBuffer := nil; inherited; end; -procedure TFramedTransportImpl.InitMaxFrameSize; -var maxLen : Integer; -begin - FMaxFrameSize := DEFAULT_MAX_LENGTH; - - // MaxAllowedMessageSize may be smaller, but not larger - if TransportControl <> nil then begin - maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader); - FMaxFrameSize := Min( FMaxFrameSize, maxLen); - end; -end; - procedure TFramedTransportImpl.Close; begin - FTransport.Close; + InnerTransport.Close; end; procedure TFramedTransportImpl.Flush; var buf : TBytes; len : Integer; - data_len : Integer; + data_len : Int64; begin if not IsOpen then raise TTransportExceptionNotOpen.Create('not open'); @@ -1318,9 +1359,9 @@ begin end; data_len := len - SizeOf(TFramedHeader); - if (data_len < 0) then begin - raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' ); - end; + if (0 > data_len) or (data_len > Configuration.MaxFrameSize) + then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')') + else UpdateKnownMessageSize( len); InitWriteBuffer; @@ -1329,13 +1370,13 @@ begin buf[2] := Byte($FF and (data_len shr 8)); buf[3] := Byte($FF and data_len); - FTransport.Write( buf, 0, len ); - FTransport.Flush; + InnerTransport.Write( buf, 0, len ); + InnerTransport.Flush; end; function TFramedTransportImpl.GetIsOpen: Boolean; begin - Result := FTransport.IsOpen; + Result := InnerTransport.IsOpen; end; type @@ -1353,7 +1394,7 @@ end; procedure TFramedTransportImpl.Open; begin - FTransport.Open; + InnerTransport.Open; end; function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; @@ -1382,7 +1423,7 @@ var size : Integer; buff : TBytes; begin - FTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd)); + InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd)); size := ((i32rd[0] and $FF) shl 24) or ((i32rd[1] and $FF) shl 16) or @@ -1394,14 +1435,15 @@ begin raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')'); end; - if size > FMaxFrameSize then begin + if Int64(size) > Int64(Configuration.MaxFrameSize) then begin Close(); - raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')'); + raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')'); end; - FTransport.CheckReadBytesAvailable( size); + UpdateKnownMessageSize(size + SizeOf(size)); + SetLength( buff, size ); - FTransport.ReadAll( buff, 0, size ); + InnerTransport.ReadAll( buff, 0, size ); FreeAndNil( FReadBuffer); FReadBuffer := TMemoryStream.Create; @@ -1422,15 +1464,15 @@ begin end; -procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer); -var nRemaining : Int64; +procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64); +var buffered, need : Int64; begin - if FReadBuffer = nil - then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream'); + need := value; - nRemaining := FReadBuffer.Size - FReadBuffer.Position; - if value > nRemaining - then raise TTransportExceptionEndOfFile.Create('Not enough input data'); + // buffered bytes + buffered := FReadBuffer.Size - FReadBuffer.Position; + if buffered < need + then InnerTransport.CheckReadBytesAvailable( need - buffered); end; @@ -1470,9 +1512,10 @@ end; procedure TTcpSocketStreamImpl.Flush; begin - + // nothing to do end; + function TTcpSocketStreamImpl.IsOpen: Boolean; begin {$IFDEF OLD_SOCKETS} @@ -1557,7 +1600,7 @@ begin {$IFDEF LINUX} result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr); {$ENDIF} - + if result = SOCKET_ERROR then wsaError := WSAGetLastError; @@ -1638,10 +1681,7 @@ begin TWaitForData.wfd_Timeout : begin if (FTimeout = 0) then Exit - else begin - raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError))); - - end; + else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError))); end; else ASSERT( FALSE); @@ -1764,10 +1804,5 @@ end; {$ENDIF} -procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer); -begin - // we can't really tell, no further checks possible -end; - end. diff --git a/lib/delphi/src/Thrift.WinHTTP.pas b/lib/delphi/src/Thrift.WinHTTP.pas index 6d886fe9b..d06006626 100644 --- a/lib/delphi/src/Thrift.WinHTTP.pas +++ b/lib/delphi/src/Thrift.WinHTTP.pas @@ -1224,8 +1224,8 @@ begin dwIndex) then begin dwError := GetLastError; - ASSERT( dwError = ERROR_WINHTTP_HEADER_NOT_FOUND); // anything else would be an real error - result := MAXINT; // we don't know + if dwError <> ERROR_WINHTTP_HEADER_NOT_FOUND then ASSERT(FALSE); // anything else would be an real error + result := MAXINT; // we don't know end; end; diff --git a/lib/delphi/test/Performance/PerfTests.pas b/lib/delphi/test/Performance/PerfTests.pas index 2c820b1f3..e485212b0 100644 --- a/lib/delphi/test/Performance/PerfTests.pas +++ b/lib/delphi/test/Performance/PerfTests.pas @@ -21,6 +21,7 @@ interface uses Windows, Classes, SysUtils, Thrift.Collections, + Thrift.Configuration, Thrift.Test, Thrift.Protocol, Thrift.Protocol.JSON, @@ -34,9 +35,10 @@ uses type TPerformanceTests = class strict private - Testdata : ICrazyNesting; - MemBuffer : TMemoryStream; - Transport : ITransport; + FTestdata : ICrazyNesting; + FMemBuffer : TMemoryStream; + FTransport : ITransport; + FConfig : IThriftConfiguration; procedure ProtocolPeformanceTest; procedure RunTest( const ptyp : TKnownProtocol; const layered : TLayeredTransport); @@ -74,7 +76,7 @@ procedure TPerformanceTests.ProtocolPeformanceTest; var layered : TLayeredTransport; begin Console.WriteLine('Setting up for ProtocolPeformanceTest ...'); - Testdata := TestDataFactory.CreateCrazyNesting(); + FTestdata := TestDataFactory.CreateCrazyNesting(); for layered := Low(TLayeredTransport) to High(TLayeredTransport) do begin RunTest( TKnownProtocol.prot_Binary, layered); @@ -91,10 +93,12 @@ var freq, start, stop : Int64; begin QueryPerformanceFrequency( freq); + FConfig := TThriftConfigurationImpl.Create; + proto := GenericProtocolFactory( ptyp, layered, TRUE); QueryPerformanceCounter( start); - Testdata.Write(proto); - Transport.Flush; + FTestdata.Write(proto); + FTransport.Flush; QueryPerformanceCounter( stop); Console.WriteLine( Format('RunTest(%s): write = %d msec', [ GetProtocolTransportName(ptyp,layered), @@ -121,24 +125,24 @@ const COPY_ENTIRE_STREAM = 0; begin // read happens after write here, so let's take over the written bytes newBuf := TMemoryStream.Create; - if not forWrite then newBuf.CopyFrom( MemBuffer, COPY_ENTIRE_STREAM); - MemBuffer := newBuf; - MemBuffer.Position := 0; + if not forWrite then newBuf.CopyFrom( FMemBuffer, COPY_ENTIRE_STREAM); + FMemBuffer := newBuf; + FMemBuffer.Position := 0; // layered transports anyone? stream := TThriftStreamAdapterDelphi.Create( newBuf, TRUE); if forWrite - then trans := TStreamTransportImpl.Create( nil, stream) - else trans := TStreamTransportImpl.Create( stream, nil); + then trans := TStreamTransportImpl.Create( nil, stream, FConfig) + else trans := TStreamTransportImpl.Create( stream, nil, FConfig); case layered of - trns_Framed : Transport := TFramedTransportImpl.Create( trans); - trns_Buffered : Transport := TBufferedTransportImpl.Create( trans); + trns_Framed : FTransport := TFramedTransportImpl.Create( trans); + trns_Buffered : FTransport := TBufferedTransportImpl.Create( trans); else - Transport := trans; + FTransport := trans; end; - if not Transport.IsOpen - then Transport.Open; + if not FTransport.IsOpen + then FTransport.Open; case ptyp of prot_Binary : result := TBinaryProtocolImpl.Create(trans); diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas index 3562dabb8..1579bd5f6 100644 --- a/lib/delphi/test/TestClient.pas +++ b/lib/delphi/test/TestClient.pas @@ -53,6 +53,8 @@ uses Thrift.Test, Thrift.WinHTTP, Thrift.Utils, + + Thrift.Configuration, Thrift.Collections; type @@ -122,7 +124,7 @@ type procedure InitializeProtocolTransportStack; procedure ShutdownProtocolTransportStack; - function InitializeHttpTransport( const aTimeoutSetting : Integer) : IHTTPClient; + function InitializeHttpTransport( const aTimeoutSetting : Integer; const aConfig : IThriftConfiguration = nil) : IHTTPClient; procedure JSONProtocolReadWriteTest; function PrepareBinaryData( aRandomDist : Boolean; aSize : TTestSize) : TBytes; @@ -1068,6 +1070,7 @@ procedure TClientThread.JSONProtocolReadWriteTest; var prot : IProtocol; stm : TStringStream; list : TThriftList; + config : IThriftConfiguration; binary, binRead, emptyBinary : TBytes; i,iErr : Integer; const @@ -1089,6 +1092,8 @@ begin try StartTestGroup( 'JsonProtocolTest', test_Unknown); + config := TThriftConfigurationImpl.Create; + // prepare binary data binary := PrepareBinaryData( FALSE, Normal); SetLength( emptyBinary, 0); // empty binary data block @@ -1096,7 +1101,7 @@ begin // output setup prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - nil, TThriftStreamAdapterDelphi.Create( stm, FALSE))); + nil, TThriftStreamAdapterDelphi.Create( stm, FALSE), config)); // write Init( list, TType.String_, 9); @@ -1119,7 +1124,7 @@ begin stm.Position := 0; prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - TThriftStreamAdapterDelphi.Create( stm, FALSE), nil)); + TThriftStreamAdapterDelphi.Create( stm, FALSE), nil, config)); // read and compare list := prot.ReadListBegin; @@ -1161,7 +1166,7 @@ begin stm.Position := 0; prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - TThriftStreamAdapterDelphi.Create( stm, FALSE), nil)); + TThriftStreamAdapterDelphi.Create( stm, FALSE), nil, config)); Expect( prot.ReadString = SOLIDUS_EXCPECTED, 'Solidus encoding'); @@ -1172,12 +1177,12 @@ begin stm.Size := 0; prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - nil, TThriftStreamAdapterDelphi.Create( stm, FALSE))); + nil, TThriftStreamAdapterDelphi.Create( stm, FALSE), config)); prot.WriteString( G_CLEF_AND_CYRILLIC_TEXT); stm.Position := 0; prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - TThriftStreamAdapterDelphi.Create( stm, FALSE), nil)); + TThriftStreamAdapterDelphi.Create( stm, FALSE), nil, config)); Expect( prot.ReadString = G_CLEF_AND_CYRILLIC_TEXT, 'Writing JSON with chars > 8 bit'); // Widechars should work with hex-encoding too. Do they? @@ -1187,7 +1192,7 @@ begin stm.Position := 0; prot := TJSONProtocolImpl.Create( TStreamTransportImpl.Create( - TThriftStreamAdapterDelphi.Create( stm, FALSE), nil)); + TThriftStreamAdapterDelphi.Create( stm, FALSE), nil, config)); Expect( prot.ReadString = G_CLEF_AND_CYRILLIC_TEXT, 'Reading JSON with chars > 8 bit'); @@ -1330,7 +1335,7 @@ begin end; -function TClientThread.InitializeHttpTransport( const aTimeoutSetting : Integer) : IHTTPClient; +function TClientThread.InitializeHttpTransport( const aTimeoutSetting : Integer; const aConfig : IThriftConfiguration) : IHTTPClient; var sUrl : string; comps : URL_COMPONENTS; dwChars : DWORD; @@ -1367,8 +1372,8 @@ begin Console.WriteLine('Target URL: '+sUrl); case FSetup.endpoint of - trns_MsxmlHttp : result := TMsxmlHTTPClientImpl.Create( sUrl); - trns_WinHttp : result := TWinHTTPClientImpl.Create( sUrl); + trns_MsxmlHttp : result := TMsxmlHTTPClientImpl.Create( sUrl, aConfig); + trns_WinHttp : result := TWinHTTPClientImpl.Create( sUrl, aConfig); else raise Exception.Create(ENDPOINT_TRANSPORTS[FSetup.endpoint]+' unhandled case'); end; @@ -1396,7 +1401,7 @@ begin case FSetup.endpoint of trns_Sockets: begin Console.WriteLine('Using sockets ('+FSetup.host+' port '+IntToStr(FSetup.port)+')'); - streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port, DEFAULT_THRIFT_TIMEOUT); + streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port); FTransport := streamtrans; end; @@ -1417,7 +1422,7 @@ begin end; trns_AnonPipes: begin - streamtrans := TAnonymousPipeTransportImpl.Create( FSetup.hAnonRead, FSetup.hAnonWrite, FALSE); + streamtrans := TAnonymousPipeTransportImpl.Create( FSetup.hAnonRead, FSetup.hAnonWrite, FALSE, PIPE_TIMEOUT); FTransport := streamtrans; end; diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas index da804fdc0..bbc798bed 100644 --- a/lib/delphi/test/TestServer.pas +++ b/lib/delphi/test/TestServer.pas @@ -36,6 +36,7 @@ uses Thrift.Protocol.JSON, Thrift.Protocol.Compact, Thrift.Collections, + Thrift.Configuration, Thrift.Utils, Thrift.Test, Thrift, @@ -593,7 +594,7 @@ begin trns_NamedPipes : begin Console.WriteLine('- named pipe ('+sPipeName+')'); - namedpipe := TNamedPipeServerTransportImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES); + namedpipe := TNamedPipeServerTransportImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES, INFINITE); servertrans := namedpipe; end; @@ -614,7 +615,7 @@ begin if (trns_Framed in layered) then begin Console.WriteLine('- framed transport'); - TransportFactory := TFramedTransportImpl.TFactory.Create + TransportFactory := TFramedTransportImpl.TFactory.Create; end else begin TransportFactory := TTransportFactoryImpl.Create; diff --git a/lib/delphi/test/client.dpr b/lib/delphi/test/client.dpr index 83727f619..d4875b8a9 100644 --- a/lib/delphi/test/client.dpr +++ b/lib/delphi/test/client.dpr @@ -31,6 +31,7 @@ uses Thrift in '..\src\Thrift.pas', Thrift.Transport in '..\src\Thrift.Transport.pas', Thrift.Socket in '..\src\Thrift.Socket.pas', + Thrift.Configuration in '..\src\Thrift.Configuration.pas', Thrift.Exception in '..\src\Thrift.Exception.pas', Thrift.Transport.Pipes in '..\src\Thrift.Transport.Pipes.pas', Thrift.Transport.WinHTTP in '..\src\Thrift.Transport.WinHTTP.pas', diff --git a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas index 35fdf6f5b..4b6a0a221 100644 --- a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas +++ b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas @@ -35,6 +35,7 @@ uses Thrift.Transport, Thrift.Stream, Thrift.Collections, + Thrift.Configuration, Benchmark, // in gen-delphi folder Aggr, // in gen-delphi folder Multiplex.Test.Common; @@ -93,8 +94,10 @@ end; procedure TTestClient.Setup; var trans : ITransport; + config : IThriftConfiguration; begin - trans := TSocketImpl.Create( 'localhost', 9090); + config := TThriftConfigurationImpl.Create; + trans := TSocketImpl.Create( 'localhost', 9090, DEFAULT_THRIFT_TIMEOUT, config); trans := TFramedTransportImpl.Create( trans); trans.Open; FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE); diff --git a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas index 3860f5ace..a23ff37ea 100644 --- a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas +++ b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas @@ -35,6 +35,7 @@ uses Thrift.Protocol.Multiplex, Thrift.Processor.Multiplex, Thrift.Collections, + Thrift.Configuration, Thrift.Utils, Thrift, Benchmark, // in gen-delphi folder @@ -156,11 +157,14 @@ var aggrProcessor : IProcessor; multiplex : IMultiplexedProcessor; ServerEngine : IServer; + config : IThriftConfiguration; begin try + config := TThriftConfigurationImpl.Create; + // create protocol factory, default to BinaryProtocol ProtocolFactory := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE); - servertrans := TServerSocketImpl.Create( 9090, 0, FALSE); + servertrans := TServerSocketImpl.Create( 9090, DEFAULT_THRIFT_TIMEOUT, FALSE, config); TransportFactory := TFramedTransportImpl.TFactory.Create; benchHandler := TBenchmarkServiceImpl.Create; diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr index a57e93a2e..19f8f6adf 100644 --- a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr +++ b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr @@ -33,6 +33,7 @@ uses Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Stream in '..\..\src\Thrift.Stream.pas', Thrift.TypeRegistry in '..\..\src\Thrift.TypeRegistry.pas', diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr index 81ed3ddc4..307a9c2dc 100644 --- a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr +++ b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr @@ -33,6 +33,7 @@ uses Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas', Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Utils in '..\..\src\Thrift.Utils.pas', diff --git a/lib/delphi/test/serializer/TestSerializer.Tests.pas b/lib/delphi/test/serializer/TestSerializer.Tests.pas index fe69f4efc..83d67b1dc 100644 --- a/lib/delphi/test/serializer/TestSerializer.Tests.pas +++ b/lib/delphi/test/serializer/TestSerializer.Tests.pas @@ -33,6 +33,7 @@ uses Thrift.Protocol.JSON, Thrift.Protocol.Compact, Thrift.Collections, + Thrift.Configuration, Thrift.Server, Thrift.Utils, Thrift.Serializer, @@ -283,8 +284,12 @@ end; class function TTestSerializer.Serialize(const input : IBase; const factory : TFactoryPair) : TBytes; var serial : TSerializer; + config : IThriftConfiguration; begin - serial := TSerializer.Create( factory.prot, factory.trans); + config := TThriftConfigurationImpl.Create; + config.MaxMessageSize := 0; // we don't read anything here + + serial := TSerializer.Create( factory.prot, factory.trans, config); try result := serial.Serialize( input); finally @@ -295,8 +300,12 @@ end; class procedure TTestSerializer.Serialize(const input : IBase; const factory : TFactoryPair; const aStream : TStream); var serial : TSerializer; + config : IThriftConfiguration; begin - serial := TSerializer.Create( factory.prot, factory.trans); + config := TThriftConfigurationImpl.Create; + config.MaxMessageSize := 0; // we don't read anything here + + serial := TSerializer.Create( factory.prot, factory.trans, config); try serial.Serialize( input, aStream); finally @@ -307,8 +316,12 @@ end; class procedure TTestSerializer.Deserialize( const input : TBytes; const target : IBase; const factory : TFactoryPair); var serial : TDeserializer; + config : IThriftConfiguration; begin - serial := TDeserializer.Create( factory.prot, factory.trans, Length(input)); + config := TThriftConfigurationImpl.Create; + config.MaxMessageSize := Length(input); + + serial := TDeserializer.Create( factory.prot, factory.trans, config); try serial.Deserialize( input, target); ValidateReadToEnd( input, serial); @@ -320,8 +333,12 @@ end; class procedure TTestSerializer.Deserialize( const input : TStream; const target : IBase; const factory : TFactoryPair); var serial : TDeserializer; + config : IThriftConfiguration; begin - serial := TDeserializer.Create( factory.prot, factory.trans, input.Size); + config := TThriftConfigurationImpl.Create; + config.MaxMessageSize := input.Size; + + serial := TDeserializer.Create( factory.prot, factory.trans, config); try serial.Deserialize( input, target); ValidateReadToEnd( input, serial); diff --git a/lib/delphi/test/serializer/TestSerializer.dpr b/lib/delphi/test/serializer/TestSerializer.dpr index bb4cc8926..062001461 100644 --- a/lib/delphi/test/serializer/TestSerializer.dpr +++ b/lib/delphi/test/serializer/TestSerializer.dpr @@ -34,6 +34,7 @@ uses Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas', Thrift.Protocol.Compact in '..\..\src\Thrift.Protocol.Compact.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Utils in '..\..\src\Thrift.Utils.pas', Thrift.Serializer in '..\..\src\Thrift.Serializer.pas', @@ -42,8 +43,8 @@ uses Thrift.TypeRegistry in '..\..\src\Thrift.TypeRegistry.pas', System_, DebugProtoTest, - TestSerializer.Tests, - TestSerializer.Data; + TestSerializer.Tests in 'TestSerializer.Tests.pas', + TestSerializer.Data in 'TestSerializer.Data.pas'; var test : TTestSerializer; diff --git a/lib/delphi/test/server.dpr b/lib/delphi/test/server.dpr index 9731dd4fa..954d0b606 100644 --- a/lib/delphi/test/server.dpr +++ b/lib/delphi/test/server.dpr @@ -37,6 +37,7 @@ uses Thrift.Protocol.Multiplex in '..\src\Thrift.Protocol.Multiplex.pas', Thrift.Processor.Multiplex in '..\src\Thrift.Processor.Multiplex.pas', Thrift.Collections in '..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\src\Thrift.Configuration.pas', Thrift.Server in '..\src\Thrift.Server.pas', Thrift.TypeRegistry in '..\src\Thrift.TypeRegistry.pas', Thrift.Utils in '..\src\Thrift.Utils.pas', diff --git a/lib/delphi/test/skip/skiptest_version1.dpr b/lib/delphi/test/skip/skiptest_version1.dpr index 0bfe96fef..c97e50b6f 100644 --- a/lib/delphi/test/skip/skiptest_version1.dpr +++ b/lib/delphi/test/skip/skiptest_version1.dpr @@ -31,6 +31,7 @@ uses Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Utils in '..\..\src\Thrift.Utils.pas', Thrift.WinHTTP in '..\..\src\Thrift.WinHTTP.pas', @@ -70,8 +71,8 @@ var adapt : IThriftStream; begin adapt := TThriftStreamAdapterDelphi.Create( stm, FALSE); if aForInput - then trans := TStreamTransportImpl.Create( adapt, nil) - else trans := TStreamTransportImpl.Create( nil, adapt); + then trans := TStreamTransportImpl.Create( adapt, nil, TThriftConfigurationImpl.Create) + else trans := TStreamTransportImpl.Create( nil, adapt, TThriftConfigurationImpl.Create); result := protfact.GetProtocol( trans); end; diff --git a/lib/delphi/test/skip/skiptest_version2.dpr b/lib/delphi/test/skip/skiptest_version2.dpr index 7893748a0..07c2c9af4 100644 --- a/lib/delphi/test/skip/skiptest_version2.dpr +++ b/lib/delphi/test/skip/skiptest_version2.dpr @@ -31,6 +31,7 @@ uses Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Utils in '..\..\src\Thrift.Utils.pas', Thrift.WinHTTP in '..\..\src\Thrift.WinHTTP.pas', @@ -96,8 +97,8 @@ var adapt : IThriftStream; begin adapt := TThriftStreamAdapterDelphi.Create( stm, FALSE); if aForInput - then trans := TStreamTransportImpl.Create( adapt, nil) - else trans := TStreamTransportImpl.Create( nil, adapt); + then trans := TStreamTransportImpl.Create( adapt, nil, TThriftConfigurationImpl.Create) + else trans := TStreamTransportImpl.Create( nil, adapt, TThriftConfigurationImpl.Create); result := protfact.GetProtocol( trans); end; diff --git a/lib/delphi/test/typeregistry/TestTypeRegistry.dpr b/lib/delphi/test/typeregistry/TestTypeRegistry.dpr index fd5e3dd4e..31c0fb2f1 100644 --- a/lib/delphi/test/typeregistry/TestTypeRegistry.dpr +++ b/lib/delphi/test/typeregistry/TestTypeRegistry.dpr @@ -30,6 +30,7 @@ uses Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas', Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Configuration in '..\..\src\Thrift.Configuration.pas', Thrift.Server in '..\..\src\Thrift.Server.pas', Thrift.Utils in '..\..\src\Thrift.Utils.pas', Thrift.Serializer in '..\..\src\Thrift.Serializer.pas', |