summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJens Geyer <jensg@apache.org>2017-10-25 22:30:23 +0200
committerJens Geyer <jensg@apache.org>2017-10-26 20:36:28 +0200
commitd4df91709b724174aaf8a957f3edac3573be354e (patch)
tree4a86195d2a20df261ea78e947b01228e90d84926
parente54945551f7fcd7d7decc959871f370b489f7fa0 (diff)
downloadthrift-d4df91709b724174aaf8a957f3edac3573be354e.tar.gz
THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C# Patch: Jens Geyer This closes #1402
-rw-r--r--lib/csharp/src/Transport/TNamedPipeClientTransport.cs14
-rw-r--r--lib/csharp/src/Transport/TNamedPipeServerTransport.cs67
-rw-r--r--lib/delphi/src/Thrift.Transport.Pipes.pas136
-rw-r--r--lib/delphi/test/TestClient.pas38
-rw-r--r--test/csharp/TestClient.cs45
-rw-r--r--test/csharp/TestServer.cs43
6 files changed, 207 insertions, 136 deletions
diff --git a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
index f87d1ecd5..ea2e86230 100644
--- a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
@@ -21,6 +21,7 @@
* details.
*/
+using System;
using System.IO.Pipes;
using System.Threading;
@@ -88,7 +89,18 @@ namespace Thrift.Transport
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- client.Write(buf, off, len);
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit
+ while (nBytes > 0)
+ {
+ client.Write(buf, off, nBytes);
+
+ off += nBytes;
+ len -= nBytes;
+ nBytes = Math.Min(len, nBytes);
+ }
}
protected override void Dispose(bool disposing)
diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
index 240f0df6d..a6cfb2e78 100644
--- a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
@@ -239,40 +239,51 @@ namespace Thrift.Transport
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- if (asyncMode)
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit
+ while (nBytes > 0)
{
- Exception eOuter = null;
- var evt = new ManualResetEvent(false);
- stream.BeginWrite(buf, off, len, asyncResult =>
+ if (asyncMode)
{
- try
- {
- if (stream != null)
- stream.EndWrite(asyncResult);
- else
- eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
- }
- catch (Exception e)
- {
- if (stream != null)
- eOuter = e;
- else
- eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
- }
- evt.Set();
- }, null);
+ Exception eOuter = null;
+ var evt = new ManualResetEvent(false);
- evt.WaitOne();
+ stream.BeginWrite(buf, off, nBytes, asyncResult =>
+ {
+ try
+ {
+ if (stream != null)
+ stream.EndWrite(asyncResult);
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+ }
+ catch (Exception e)
+ {
+ if (stream != null)
+ eOuter = e;
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+ }
+ evt.Set();
+ }, null);
+
+ evt.WaitOne();
+
+ if (eOuter != null)
+ throw eOuter; // rethrow exception
+ }
+ else
+ {
+ stream.Write(buf, off, nBytes);
+ }
- if (eOuter != null)
- throw eOuter; // rethrow exception
+ off += nBytes;
+ len -= nBytes;
+ nBytes = Math.Min(len, nBytes);
}
- else
- {
- stream.Write(buf, off, len);
- }
-
}
protected override void Dispose(bool disposing)
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842c1..aace4bb21 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@ end;
procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
- then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+ then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
-
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
- then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
-
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+ then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
+
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe write error');
+ end;
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe write error');
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( DWORD(count) = cbWritten);
+ ASSERT( DWORD(nBytes) = cbWritten);
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
function TPipeStreamBase.ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr : DWORD;
+var cbRead, dwErr, nRemaining : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
const INTERVAL = 10; // ms
@@ -406,48 +425,61 @@ begin
end;
end;
- // read the data (or block INFINITE-ly)
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
- if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
- then result := 0 // No more data, possibly because client disconnected.
- else result := cbRead;
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ // read the data (or block INFINITE-ly)
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+ if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+ then Break; // No more data, possibly because client disconnected.
+
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;
function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
bOk : Boolean;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
-
- // read the data
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
- if not bOk then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
-
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
+
+ // read the data
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+ if not bOk then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
+
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe read error');
+ end;
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe read error');
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
- ASSERT( cbRead = DWORD(count));
- result := cbRead;
+ ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
+ ASSERT( cbRead <= DWORD(nRemaining));
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 37d8546dd..59b2a66c9 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -86,7 +86,7 @@ type
procedure ClientTest;
procedure JSONProtocolReadWriteTest;
- function PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
+ function PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
{$IFDEF StressTest}
procedure StressTest(const client : TThriftTest.Iface);
{$ENDIF}
@@ -546,8 +546,21 @@ begin
i64 := client.testI64(-34359738368);
Expect( i64 = -34359738368, 'testI64(-34359738368) = ' + IntToStr( i64));
- // random binary
- binOut := PrepareBinaryData( TRUE);
+ // random binary small
+ binOut := PrepareBinaryData( TRUE, FALSE);
+ Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+ try
+ binIn := client.testBinary(binOut);
+ Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+ i32 := Min( Length(binOut), Length(binIn));
+ Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+ except
+ on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
+ on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
+ end;
+
+ // random binary huge
+ binOut := PrepareBinaryData( TRUE, TRUE);
Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
try
binIn := client.testBinary(binOut);
@@ -1011,10 +1024,12 @@ end;
{$ENDIF}
-function TClientThread.PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
-var i, nextPos : Integer;
+function TClientThread.PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
+var i : Integer;
begin
- SetLength( result, $100);
+ if aHuge
+ then SetLength( result, $12345) // tests for THRIFT-4372
+ else SetLength( result, $100);
ASSERT( Low(result) = 0);
// linear distribution, unless random is requested
@@ -1027,13 +1042,8 @@ begin
// random distribution of all 256 values
FillChar( result[0], Length(result) * SizeOf(result[0]), $0);
- i := 1;
- while i < Length(result) do begin
- nextPos := Byte( Random($100));
- if result[nextPos] = 0 then begin // unused?
- result[nextPos] := i;
- Inc(i);
- end;
+ for i := Low(result) to High(result) do begin
+ result[i] := Byte( Random($100));
end;
end;
@@ -1080,7 +1090,7 @@ begin
StartTestGroup( 'JsonProtocolTest', test_Unknown);
// prepare binary data
- binary := PrepareBinaryData( FALSE);
+ binary := PrepareBinaryData( FALSE, FALSE);
SetLength( emptyBinary, 0); // empty binary data block
// output setup
diff --git a/test/csharp/TestClient.cs b/test/csharp/TestClient.cs
index 67673ec3b..fad1057d6 100644
--- a/test/csharp/TestClient.cs
+++ b/test/csharp/TestClient.cs
@@ -265,10 +265,11 @@ namespace Test
return BitConverter.ToString(data).Replace("-", string.Empty);
}
- public static byte[] PrepareTestData(bool randomDist)
+ public static byte[] PrepareTestData(bool randomDist, bool huge)
{
- byte[] retval = new byte[0x100];
- int initLen = Math.Min(0x100,retval.Length);
+ // huge = true tests for THRIFT-4372
+ byte[] retval = new byte[huge ? 0x12345 : 0x100];
+ int initLen = retval.Length;
// linear distribution, unless random is requested
if (!randomDist) {
@@ -374,29 +375,33 @@ namespace Test
returnCode |= ErrorBaseTypes;
}
- byte[] binOut = PrepareTestData(true);
- Console.Write("testBinary(" + BytesToHex(binOut) + ")");
- try
+ for (i32 = 0; i32 < 2; ++i32)
{
- byte[] binIn = client.testBinary(binOut);
- Console.WriteLine(" = " + BytesToHex(binIn));
- if (binIn.Length != binOut.Length)
+ var huge = (i32 > 0);
+ byte[] binOut = PrepareTestData(false,huge);
+ Console.Write("testBinary(" + BytesToHex(binOut) + ")");
+ try
{
- Console.WriteLine("*** FAILED ***");
- returnCode |= ErrorBaseTypes;
- }
- for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
- if (binIn[ofs] != binOut[ofs])
+ byte[] binIn = client.testBinary(binOut);
+ Console.WriteLine(" = " + BytesToHex(binIn));
+ if (binIn.Length != binOut.Length)
{
Console.WriteLine("*** FAILED ***");
returnCode |= ErrorBaseTypes;
}
- }
- catch (Thrift.TApplicationException ex)
- {
- Console.WriteLine("*** FAILED ***");
- returnCode |= ErrorBaseTypes;
- Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
+ if (binIn[ofs] != binOut[ofs])
+ {
+ Console.WriteLine("*** FAILED ***");
+ returnCode |= ErrorBaseTypes;
+ }
+ }
+ catch (Thrift.TApplicationException ex)
+ {
+ Console.WriteLine("*** FAILED ***");
+ returnCode |= ErrorBaseTypes;
+ Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ }
}
// binary equals? only with hashcode option enabled ...
diff --git a/test/csharp/TestServer.cs b/test/csharp/TestServer.cs
index 7404ca2ff..e9c7168eb 100644
--- a/test/csharp/TestServer.cs
+++ b/test/csharp/TestServer.cs
@@ -41,27 +41,28 @@ namespace Test
public static int _clientID = -1;
public delegate void TestLogDelegate(string msg, params object[] values);
- public class TradeServerEventHandler : TServerEventHandler
- {
- public int callCount = 0;
- public void preServe()
- {
- callCount++;
- }
- public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
- {
- callCount++;
- return null;
- }
- public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
- {
- callCount++;
- }
- public void processContext(Object serverContext, Thrift.Transport.TTransport transport)
- {
- callCount++;
- }
- };
+ public class TradeServerEventHandler : TServerEventHandler
+ {
+ public int callCount = 0;
+ public void preServe()
+ {
+ callCount++;
+ }
+ public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+ {
+ callCount++;
+ return null;
+ }
+ public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+ {
+ callCount++;
+ }
+ public void processContext(Object serverContext, Thrift.Transport.TTransport transport)
+ {
+ callCount++;
+ }
+ };
+
public class TestHandler : ThriftTest.Iface, Thrift.TControllingHandler
{