summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJens Geyer <jensg@apache.org>2022-09-09 13:39:33 +0200
committerJens Geyer <Jens-G@users.noreply.github.com>2022-09-09 23:00:09 +0200
commit60970c4e10b0014005bc68f07f4e5c5987b41e3a (patch)
treeea4c4745e1f7509c58a2c435ff5d9cfe5ec4c0cc /lib
parent72d5912424211561adc6f8e7bc502180631d9f8e (diff)
downloadthrift-60970c4e10b0014005bc68f07f4e5c5987b41e3a.tar.gz
THRIFT-5624 suboptimal performance of the c# named pipe server transport in multithread servers
Client: netstd Patch: Jens Geyer
Diffstat (limited to 'lib')
-rw-r--r--lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs20
-rw-r--r--lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs221
2 files changed, 172 insertions, 69 deletions
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index c57db9d60..071c66017 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -28,7 +28,7 @@ namespace Thrift.Transport.Client
{
private NamedPipeClientStream PipeStream;
private readonly int ConnectTimeout;
- private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
+ private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
public TNamedPipeTransport(string pipe, TConfiguration config, int timeout = DEFAULT_CONNECT_TIMEOUT)
: this(".", pipe, config, timeout)
@@ -61,6 +61,8 @@ namespace Thrift.Transport.Client
{
if (PipeStream != null)
{
+ if (PipeStream.IsConnected)
+ PipeStream.Close();
PipeStream.Dispose();
PipeStream = null;
}
@@ -107,20 +109,24 @@ namespace Thrift.Transport.Client
}
}
- public override Task FlushAsync(CancellationToken cancellationToken)
+ public override async Task FlushAsync(CancellationToken cancellationToken)
{
- cancellationToken.ThrowIfCancellationRequested();
-
+ await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
- return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
- if(disposing)
+ if (disposing)
{
- PipeStream?.Dispose();
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Close();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 7c9430046..8ad62aa14 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -24,42 +24,79 @@ using System.Threading.Tasks;
using System.ComponentModel;
using System.Security.AccessControl;
using System.Security.Principal;
+using System.Collections.Generic;
+using System.IO;
+using System.Diagnostics;
+
+#pragma warning disable CS1998 // async no await
namespace Thrift.Transport.Server
{
+ [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
[Flags]
- public enum NamedPipeClientFlags {
+ public enum NamedPipeClientFlags { // bad name
None = 0x00,
OnlyLocalClients = 0x01
};
+ [Flags]
+ public enum NamedPipeServerFlags
+ {
+ None = 0x00,
+ OnlyLocalClients = 0x01,
+ };
+
+
// ReSharper disable once InconsistentNaming
public class TNamedPipeServerTransport : TServerTransport
{
+ // to manage incoming connections, we set up a task for each stream to listen on
+ private struct TaskStreamPair
+ {
+ public NamedPipeServerStream Stream;
+ public Task Task;
+
+ public TaskStreamPair(NamedPipeServerStream stream, Task task)
+ {
+ Stream = stream;
+ Task = task;
+ }
+ }
+
/// <summary>
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string _pipeAddress;
private bool _asyncMode = true;
private volatile bool _isPending = true;
- private NamedPipeServerStream _stream = null;
+ private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>();
private readonly bool _onlyLocalClients = false; // compatibility default
+ private readonly byte _numListenPipes = 1; // compatibility default
- public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags)
+ public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
: base(config)
{
+ if ((numListenPipes < 1) || (numListenPipes > 254))
+ throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
_pipeAddress = pipeAddress;
- _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
+ _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
+ _numListenPipes = (byte)numListenPipes;
}
- [Obsolete("This CTOR is deprecated, please use the other one instead.")]
- public TNamedPipeServerTransport(string pipeAddress, TConfiguration config)
+ [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
+ public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
: base(config)
{
+ if ((numListenPipes < 1) || (numListenPipes > 254))
+ throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
_pipeAddress = pipeAddress;
- _onlyLocalClients = false;
+ _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
+ _numListenPipes = (byte)numListenPipes;
}
+
public override bool IsOpen() {
return true;
}
@@ -69,75 +106,112 @@ namespace Thrift.Transport.Server
// nothing to do here
}
- public override void Close()
+ private static void Close(NamedPipeServerStream pipe)
{
- if (_stream != null)
+ if (pipe != null)
{
try
{
- if (_stream.IsConnected)
- _stream.Disconnect();
- _stream.Dispose();
+ if (pipe.IsConnected)
+ pipe.Disconnect();
}
finally
{
- _stream = null;
- _isPending = false;
+ pipe.Dispose();
}
}
}
+ public override void Close()
+ {
+ try
+ {
+ if (_streams != null)
+ {
+ while(_streams.Count > 0)
+ {
+ Close(_streams[0].Stream);
+ _streams.RemoveAt(0);
+ }
+ }
+ }
+ finally
+ {
+ _streams.Clear();
+ _isPending = false;
+ }
+ }
+
public override bool IsClientPending()
{
return _isPending;
}
- private void EnsurePipeInstance()
+ private void EnsurePipeInstances()
{
- if (_stream == null)
+ // set up a pool for accepting multiple calls when in multithread mode
+ // once connected, we hand that stream over to the processor and create a fresh one
+ try
+ {
+ while (_streams.Count < _numListenPipes)
+ _streams.Add(CreatePipeInstance());
+ }
+ catch
{
- const PipeDirection direction = PipeDirection.InOut;
- const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
- const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
- const int inbuf = 4096;
- const int outbuf = 4096;
- var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
+ // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
+ // if we have at least one pipe to listen on -> Good Enough(tm)
+ if (_streams.Count < 1)
+ throw; // no pipes is really bad
+ }
+ }
+ private TaskStreamPair CreatePipeInstance()
+ {
+ const PipeDirection direction = PipeDirection.InOut;
+ const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
+ const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
+ const int inbuf = 4096;
+ const int outbuf = 4096;
+ var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
- // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
- // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
- // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
- // EITHER WAY,
- // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
- try
+ // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
+ // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
+ // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
+ // EITHER WAY,
+ // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
+
+ NamedPipeServerStream instance;
+ try
+ {
+ var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
+ if ((handle != null) && (!handle.IsInvalid))
{
- var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
- if ((handle != null) && (!handle.IsInvalid))
- {
- _stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
- handle = null; // we don't own it any longer
- }
- else
- {
- handle?.Dispose();
- _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
- }
+ instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
+ handle = null; // we don't own it any longer
}
- catch (NotImplementedException) // Mono still does not support async, fallback to sync
+ else
{
- if (_asyncMode)
- {
- options &= (~PipeOptions.Asynchronous);
- _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
- _asyncMode = false;
- }
- else
- {
- throw;
- }
+ handle?.Dispose();
+ instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
}
+ catch (NotImplementedException) // Mono still does not support async, fallback to sync
+ {
+ if (_asyncMode)
+ {
+ options &= (~PipeOptions.Asynchronous);
+ instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+ _asyncMode = false;
+ }
+ else
+ {
+ throw;
+ }
+ }
+
+ // the task gets added later
+ return new TaskStreamPair( instance, null);
}
@@ -248,14 +322,28 @@ namespace Thrift.Transport.Server
{
try
{
- EnsurePipeInstance();
+ EnsurePipeInstances();
- await _stream.WaitForConnectionAsync(cancellationToken);
+ // fill the list and wait for any task to be completed
+ var tasks = new List<Task>();
+ for (var i = 0; i < _streams.Count; ++i)
+ {
+ if (_streams[i].Task == null)
+ {
+ var pair = _streams[i];
+ pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
+ _streams[i] = pair;
+ }
- var trans = new ServerTransport(_stream, Configuration);
- _stream = null; // pass ownership to ServerTransport
+ tasks.Add(_streams[i].Task);
+ }
- //_isPending = false;
+ // there must be an exact mapping between task index and stream index
+ Debug.Assert(_streams.Count == tasks.Count);
+ var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
+
+ var trans = new ServerTransport(_streams[index].Stream, Configuration);
+ _streams.RemoveAt(index); // pass stream ownership to ServerTransport
return trans;
}
@@ -296,8 +384,13 @@ namespace Thrift.Transport.Server
public override void Close()
{
- PipeStream?.Dispose();
- PipeStream = null;
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Disconnect();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
@@ -341,19 +434,23 @@ namespace Thrift.Transport.Server
}
}
- public override Task FlushAsync(CancellationToken cancellationToken)
+ public override async Task FlushAsync(CancellationToken cancellationToken)
{
- cancellationToken.ThrowIfCancellationRequested();
-
+ await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
- return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
- PipeStream?.Dispose();
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Disconnect();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
}
}