diff options
author | Jens Geyer <jensg@apache.org> | 2017-10-25 22:30:23 +0200 |
---|---|---|
committer | Jens Geyer <jensg@apache.org> | 2017-10-26 20:36:28 +0200 |
commit | d4df91709b724174aaf8a957f3edac3573be354e (patch) | |
tree | 4a86195d2a20df261ea78e947b01228e90d84926 | |
parent | e54945551f7fcd7d7decc959871f370b489f7fa0 (diff) | |
download | thrift-d4df91709b724174aaf8a957f3edac3573be354e.tar.gz |
THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer
This closes #1402
-rw-r--r-- | lib/csharp/src/Transport/TNamedPipeClientTransport.cs | 14 | ||||
-rw-r--r-- | lib/csharp/src/Transport/TNamedPipeServerTransport.cs | 67 | ||||
-rw-r--r-- | lib/delphi/src/Thrift.Transport.Pipes.pas | 136 | ||||
-rw-r--r-- | lib/delphi/test/TestClient.pas | 38 | ||||
-rw-r--r-- | test/csharp/TestClient.cs | 45 | ||||
-rw-r--r-- | test/csharp/TestServer.cs | 43 |
6 files changed, 207 insertions, 136 deletions
diff --git a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs index f87d1ecd5..ea2e86230 100644 --- a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs +++ b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs @@ -21,6 +21,7 @@ * details. */ +using System; using System.IO.Pipes; using System.Threading; @@ -88,7 +89,18 @@ namespace Thrift.Transport throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - client.Write(buf, off, len); + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit + while (nBytes > 0) + { + client.Write(buf, off, nBytes); + + off += nBytes; + len -= nBytes; + nBytes = Math.Min(len, nBytes); + } } protected override void Dispose(bool disposing) diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs index 240f0df6d..a6cfb2e78 100644 --- a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs +++ b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs @@ -239,40 +239,51 @@ namespace Thrift.Transport throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - if (asyncMode) + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit + while (nBytes > 0) { - Exception eOuter = null; - var evt = new ManualResetEvent(false); - stream.BeginWrite(buf, off, len, asyncResult => + if (asyncMode) { - try - { - if (stream != null) - stream.EndWrite(asyncResult); - else - eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted); - } - catch (Exception e) - { - if (stream != null) - eOuter = e; - else - eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message); - } - evt.Set(); - }, null); + Exception eOuter = null; + var evt = new ManualResetEvent(false); - evt.WaitOne(); + stream.BeginWrite(buf, off, nBytes, asyncResult => + { + try + { + if (stream != null) + stream.EndWrite(asyncResult); + else + eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted); + } + catch (Exception e) + { + if (stream != null) + eOuter = e; + else + eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message); + } + evt.Set(); + }, null); + + evt.WaitOne(); + + if (eOuter != null) + throw eOuter; // rethrow exception + } + else + { + stream.Write(buf, off, nBytes); + } - if (eOuter != null) - throw eOuter; // rethrow exception + off += nBytes; + len -= nBytes; + nBytes = Math.Min(len, nBytes); } - else - { - stream.Write(buf, off, len); - } - } protected override void Dispose(bool disposing) diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas index 9b7f842c1..aace4bb21 100644 --- a/lib/delphi/src/Thrift.Transport.Pipes.pas +++ b/lib/delphi/src/Thrift.Transport.Pipes.pas @@ -327,51 +327,70 @@ end; procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer); -var cbWritten : DWORD; +var cbWritten, nBytes : DWORD; begin if not IsOpen then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe'); - if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil) - then raise TTransportExceptionNotOpen.Create('Write to pipe failed'); + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + nBytes := Min( 15*4096, count); // 16 would exceed the limit + while nBytes > 0 do begin + if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil) + then raise TTransportExceptionNotOpen.Create('Write to pipe failed'); + + Inc( offset, cbWritten); + Dec( count, cbWritten); + nBytes := Min( nBytes, count); + end; end; procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer); -var cbWritten, dwWait, dwError : DWORD; +var cbWritten, dwWait, dwError, nBytes : DWORD; overlapped : IOverlappedHelper; begin if not IsOpen then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe'); - overlapped := TOverlappedHelperImpl.Create; - - if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr) - then begin - dwError := GetLastError; - case dwError of - ERROR_IO_PENDING : begin - dwWait := overlapped.WaitFor(FTimeout); - - if (dwWait = WAIT_TIMEOUT) - then raise TTransportExceptionTimedOut.Create('Pipe write timed out'); + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + nBytes := Min( 15*4096, count); // 16 would exceed the limit + while nBytes > 0 do begin + overlapped := TOverlappedHelperImpl.Create; + if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr) + then begin + dwError := GetLastError; + case dwError of + ERROR_IO_PENDING : begin + dwWait := overlapped.WaitFor(FTimeout); + + if (dwWait = WAIT_TIMEOUT) + then raise TTransportExceptionTimedOut.Create('Pipe write timed out'); + + if (dwWait <> WAIT_OBJECT_0) + or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE) + then raise TTransportExceptionUnknown.Create('Pipe write error'); + end; - if (dwWait <> WAIT_OBJECT_0) - or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE) - then raise TTransportExceptionUnknown.Create('Pipe write error'); + else + raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError)); end; - - else - raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError)); end; - end; - ASSERT( DWORD(count) = cbWritten); + ASSERT( DWORD(nBytes) = cbWritten); + + Inc( offset, cbWritten); + Dec( count, cbWritten); + nBytes := Min( nBytes, count); + end; end; function TPipeStreamBase.ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; -var cbRead, dwErr : DWORD; +var cbRead, dwErr, nRemaining : DWORD; bytes, retries : LongInt; bOk : Boolean; const INTERVAL = 10; // ms @@ -406,48 +425,61 @@ begin end; end; - // read the data (or block INFINITE-ly) - bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil); - if (not bOk) and (GetLastError() <> ERROR_MORE_DATA) - then result := 0 // No more data, possibly because client disconnected. - else result := cbRead; + result := 0; + nRemaining := count; + while nRemaining > 0 do begin + // read the data (or block INFINITE-ly) + bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil); + if (not bOk) and (GetLastError() <> ERROR_MORE_DATA) + then Break; // No more data, possibly because client disconnected. + + Dec( nRemaining, cbRead); + Inc( offset, cbRead); + Inc( result, cbRead); + end; end; function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; -var cbRead, dwWait, dwError : DWORD; +var cbRead, dwWait, dwError, nRemaining : DWORD; bOk : Boolean; overlapped : IOverlappedHelper; begin if not IsOpen then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe'); - overlapped := TOverlappedHelperImpl.Create; - - // read the data - bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr); - if not bOk then begin - dwError := GetLastError; - case dwError of - ERROR_IO_PENDING : begin - dwWait := overlapped.WaitFor(FTimeout); - - if (dwWait = WAIT_TIMEOUT) - then raise TTransportExceptionTimedOut.Create('Pipe read timed out'); + result := 0; + nRemaining := count; + while nRemaining > 0 do begin + overlapped := TOverlappedHelperImpl.Create; + + // read the data + bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr); + if not bOk then begin + dwError := GetLastError; + case dwError of + ERROR_IO_PENDING : begin + dwWait := overlapped.WaitFor(FTimeout); + + if (dwWait = WAIT_TIMEOUT) + then raise TTransportExceptionTimedOut.Create('Pipe read timed out'); + + if (dwWait <> WAIT_OBJECT_0) + or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE) + then raise TTransportExceptionUnknown.Create('Pipe read error'); + end; - if (dwWait <> WAIT_OBJECT_0) - or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE) - then raise TTransportExceptionUnknown.Create('Pipe read error'); + else + raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError)); end; - - else - raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError)); end; - end; - ASSERT( cbRead > 0); // see TTransportImpl.ReadAll() - ASSERT( cbRead = DWORD(count)); - result := cbRead; + ASSERT( cbRead > 0); // see TTransportImpl.ReadAll() + ASSERT( cbRead <= DWORD(nRemaining)); + Dec( nRemaining, cbRead); + Inc( offset, cbRead); + Inc( result, cbRead); + end; end; diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas index 37d8546dd..59b2a66c9 100644 --- a/lib/delphi/test/TestClient.pas +++ b/lib/delphi/test/TestClient.pas @@ -86,7 +86,7 @@ type procedure ClientTest; procedure JSONProtocolReadWriteTest; - function PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes; + function PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes; {$IFDEF StressTest} procedure StressTest(const client : TThriftTest.Iface); {$ENDIF} @@ -546,8 +546,21 @@ begin i64 := client.testI64(-34359738368); Expect( i64 = -34359738368, 'testI64(-34359738368) = ' + IntToStr( i64)); - // random binary - binOut := PrepareBinaryData( TRUE); + // random binary small + binOut := PrepareBinaryData( TRUE, FALSE); + Console.WriteLine('testBinary('+BytesToHex(binOut)+')'); + try + binIn := client.testBinary(binOut); + Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn))); + i32 := Min( Length(binOut), Length(binIn)); + Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn)); + except + on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message); + on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message); + end; + + // random binary huge + binOut := PrepareBinaryData( TRUE, TRUE); Console.WriteLine('testBinary('+BytesToHex(binOut)+')'); try binIn := client.testBinary(binOut); @@ -1011,10 +1024,12 @@ end; {$ENDIF} -function TClientThread.PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes; -var i, nextPos : Integer; +function TClientThread.PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes; +var i : Integer; begin - SetLength( result, $100); + if aHuge + then SetLength( result, $12345) // tests for THRIFT-4372 + else SetLength( result, $100); ASSERT( Low(result) = 0); // linear distribution, unless random is requested @@ -1027,13 +1042,8 @@ begin // random distribution of all 256 values FillChar( result[0], Length(result) * SizeOf(result[0]), $0); - i := 1; - while i < Length(result) do begin - nextPos := Byte( Random($100)); - if result[nextPos] = 0 then begin // unused? - result[nextPos] := i; - Inc(i); - end; + for i := Low(result) to High(result) do begin + result[i] := Byte( Random($100)); end; end; @@ -1080,7 +1090,7 @@ begin StartTestGroup( 'JsonProtocolTest', test_Unknown); // prepare binary data - binary := PrepareBinaryData( FALSE); + binary := PrepareBinaryData( FALSE, FALSE); SetLength( emptyBinary, 0); // empty binary data block // output setup diff --git a/test/csharp/TestClient.cs b/test/csharp/TestClient.cs index 67673ec3b..fad1057d6 100644 --- a/test/csharp/TestClient.cs +++ b/test/csharp/TestClient.cs @@ -265,10 +265,11 @@ namespace Test return BitConverter.ToString(data).Replace("-", string.Empty); } - public static byte[] PrepareTestData(bool randomDist) + public static byte[] PrepareTestData(bool randomDist, bool huge) { - byte[] retval = new byte[0x100]; - int initLen = Math.Min(0x100,retval.Length); + // huge = true tests for THRIFT-4372 + byte[] retval = new byte[huge ? 0x12345 : 0x100]; + int initLen = retval.Length; // linear distribution, unless random is requested if (!randomDist) { @@ -374,29 +375,33 @@ namespace Test returnCode |= ErrorBaseTypes; } - byte[] binOut = PrepareTestData(true); - Console.Write("testBinary(" + BytesToHex(binOut) + ")"); - try + for (i32 = 0; i32 < 2; ++i32) { - byte[] binIn = client.testBinary(binOut); - Console.WriteLine(" = " + BytesToHex(binIn)); - if (binIn.Length != binOut.Length) + var huge = (i32 > 0); + byte[] binOut = PrepareTestData(false,huge); + Console.Write("testBinary(" + BytesToHex(binOut) + ")"); + try { - Console.WriteLine("*** FAILED ***"); - returnCode |= ErrorBaseTypes; - } - for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs) - if (binIn[ofs] != binOut[ofs]) + byte[] binIn = client.testBinary(binOut); + Console.WriteLine(" = " + BytesToHex(binIn)); + if (binIn.Length != binOut.Length) { Console.WriteLine("*** FAILED ***"); returnCode |= ErrorBaseTypes; } - } - catch (Thrift.TApplicationException ex) - { - Console.WriteLine("*** FAILED ***"); - returnCode |= ErrorBaseTypes; - Console.WriteLine(ex.Message + " ST: " + ex.StackTrace); + for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs) + if (binIn[ofs] != binOut[ofs]) + { + Console.WriteLine("*** FAILED ***"); + returnCode |= ErrorBaseTypes; + } + } + catch (Thrift.TApplicationException ex) + { + Console.WriteLine("*** FAILED ***"); + returnCode |= ErrorBaseTypes; + Console.WriteLine(ex.Message + " ST: " + ex.StackTrace); + } } // binary equals? only with hashcode option enabled ... diff --git a/test/csharp/TestServer.cs b/test/csharp/TestServer.cs index 7404ca2ff..e9c7168eb 100644 --- a/test/csharp/TestServer.cs +++ b/test/csharp/TestServer.cs @@ -41,27 +41,28 @@ namespace Test public static int _clientID = -1; public delegate void TestLogDelegate(string msg, params object[] values); - public class TradeServerEventHandler : TServerEventHandler - { - public int callCount = 0; - public void preServe() - { - callCount++; - } - public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) - { - callCount++; - return null; - } - public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) - { - callCount++; - } - public void processContext(Object serverContext, Thrift.Transport.TTransport transport) - { - callCount++; - } - }; + public class TradeServerEventHandler : TServerEventHandler + { + public int callCount = 0; + public void preServe() + { + callCount++; + } + public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) + { + callCount++; + return null; + } + public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) + { + callCount++; + } + public void processContext(Object serverContext, Thrift.Transport.TTransport transport) + { + callCount++; + } + }; + public class TestHandler : ThriftTest.Iface, Thrift.TControllingHandler { |