diff options
author | Jens Geyer <jensg@apache.org> | 2022-09-09 13:39:33 +0200 |
---|---|---|
committer | Jens Geyer <Jens-G@users.noreply.github.com> | 2022-09-09 23:00:09 +0200 |
commit | 60970c4e10b0014005bc68f07f4e5c5987b41e3a (patch) | |
tree | ea4c4745e1f7509c58a2c435ff5d9cfe5ec4c0cc /lib | |
parent | 72d5912424211561adc6f8e7bc502180631d9f8e (diff) | |
download | thrift-60970c4e10b0014005bc68f07f4e5c5987b41e3a.tar.gz |
THRIFT-5624 suboptimal performance of the c# named pipe server transport in multithread servers
Client: netstd
Patch: Jens Geyer
Diffstat (limited to 'lib')
-rw-r--r-- | lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs | 20 | ||||
-rw-r--r-- | lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs | 221 |
2 files changed, 172 insertions, 69 deletions
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs index c57db9d60..071c66017 100644 --- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs +++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs @@ -28,7 +28,7 @@ namespace Thrift.Transport.Client { private NamedPipeClientStream PipeStream; private readonly int ConnectTimeout; - private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default + private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default public TNamedPipeTransport(string pipe, TConfiguration config, int timeout = DEFAULT_CONNECT_TIMEOUT) : this(".", pipe, config, timeout) @@ -61,6 +61,8 @@ namespace Thrift.Transport.Client { if (PipeStream != null) { + if (PipeStream.IsConnected) + PipeStream.Close(); PipeStream.Dispose(); PipeStream = null; } @@ -107,20 +109,24 @@ namespace Thrift.Transport.Client } } - public override Task FlushAsync(CancellationToken cancellationToken) + public override async Task FlushAsync(CancellationToken cancellationToken) { - cancellationToken.ThrowIfCancellationRequested(); - + await PipeStream.FlushAsync(cancellationToken); ResetConsumedMessageSize(); - return Task.CompletedTask; } protected override void Dispose(bool disposing) { - if(disposing) + if (disposing) { - PipeStream?.Dispose(); + if (PipeStream != null) + { + if (PipeStream.IsConnected) + PipeStream.Close(); + PipeStream.Dispose(); + PipeStream = null; + } } } } diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs index 7c9430046..8ad62aa14 100644 --- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs +++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs @@ -24,42 +24,79 @@ using System.Threading.Tasks; using System.ComponentModel; using System.Security.AccessControl; using System.Security.Principal; +using System.Collections.Generic; +using System.IO; +using System.Diagnostics; + +#pragma warning disable CS1998 // async no await namespace Thrift.Transport.Server { + [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")] [Flags] - public enum NamedPipeClientFlags { + public enum NamedPipeClientFlags { // bad name None = 0x00, OnlyLocalClients = 0x01 }; + [Flags] + public enum NamedPipeServerFlags + { + None = 0x00, + OnlyLocalClients = 0x01, + }; + + // ReSharper disable once InconsistentNaming public class TNamedPipeServerTransport : TServerTransport { + // to manage incoming connections, we set up a task for each stream to listen on + private struct TaskStreamPair + { + public NamedPipeServerStream Stream; + public Task Task; + + public TaskStreamPair(NamedPipeServerStream stream, Task task) + { + Stream = stream; + Task = task; + } + } + /// <summary> /// This is the address of the Pipe on the localhost. /// </summary> private readonly string _pipeAddress; private bool _asyncMode = true; private volatile bool _isPending = true; - private NamedPipeServerStream _stream = null; + private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>(); private readonly bool _onlyLocalClients = false; // compatibility default + private readonly byte _numListenPipes = 1; // compatibility default - public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags) + public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes) : base(config) { + if ((numListenPipes < 1) || (numListenPipes > 254)) + throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]"); + _pipeAddress = pipeAddress; - _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients); + _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients); + _numListenPipes = (byte)numListenPipes; } - [Obsolete("This CTOR is deprecated, please use the other one instead.")] - public TNamedPipeServerTransport(string pipeAddress, TConfiguration config) + [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")] + public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1) : base(config) { + if ((numListenPipes < 1) || (numListenPipes > 254)) + throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]"); + _pipeAddress = pipeAddress; - _onlyLocalClients = false; + _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients); + _numListenPipes = (byte)numListenPipes; } + public override bool IsOpen() { return true; } @@ -69,75 +106,112 @@ namespace Thrift.Transport.Server // nothing to do here } - public override void Close() + private static void Close(NamedPipeServerStream pipe) { - if (_stream != null) + if (pipe != null) { try { - if (_stream.IsConnected) - _stream.Disconnect(); - _stream.Dispose(); + if (pipe.IsConnected) + pipe.Disconnect(); } finally { - _stream = null; - _isPending = false; + pipe.Dispose(); } } } + public override void Close() + { + try + { + if (_streams != null) + { + while(_streams.Count > 0) + { + Close(_streams[0].Stream); + _streams.RemoveAt(0); + } + } + } + finally + { + _streams.Clear(); + _isPending = false; + } + } + public override bool IsClientPending() { return _isPending; } - private void EnsurePipeInstance() + private void EnsurePipeInstances() { - if (_stream == null) + // set up a pool for accepting multiple calls when in multithread mode + // once connected, we hand that stream over to the processor and create a fresh one + try + { + while (_streams.Count < _numListenPipes) + _streams.Add(CreatePipeInstance()); + } + catch { - const PipeDirection direction = PipeDirection.InOut; - const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances; - const PipeTransmissionMode mode = PipeTransmissionMode.Byte; - const int inbuf = 4096; - const int outbuf = 4096; - var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; + // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls + // if we have at least one pipe to listen on -> Good Enough(tm) + if (_streams.Count < 1) + throw; // no pipes is really bad + } + } + private TaskStreamPair CreatePipeInstance() + { + const PipeDirection direction = PipeDirection.InOut; + const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances; + const PipeTransmissionMode mode = PipeTransmissionMode.Byte; + const int inbuf = 4096; + const int outbuf = 4096; + var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; - // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes: - // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative() - // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative() - // EITHER WAY, - // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings - try + // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes: + // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative() + // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative() + // EITHER WAY, + // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings + + NamedPipeServerStream instance; + try + { + var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients); + if ((handle != null) && (!handle.IsInvalid)) { - var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients); - if ((handle != null) && (!handle.IsInvalid)) - { - _stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle); - handle = null; // we don't own it any longer - } - else - { - handle?.Dispose(); - _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/); - } + instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle); + handle = null; // we don't own it any longer } - catch (NotImplementedException) // Mono still does not support async, fallback to sync + else { - if (_asyncMode) - { - options &= (~PipeOptions.Asynchronous); - _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); - _asyncMode = false; - } - else - { - throw; - } + handle?.Dispose(); + instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/); } } + catch (NotImplementedException) // Mono still does not support async, fallback to sync + { + if (_asyncMode) + { + options &= (~PipeOptions.Asynchronous); + instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); + _asyncMode = false; + } + else + { + throw; + } + } + + // the task gets added later + return new TaskStreamPair( instance, null); } @@ -248,14 +322,28 @@ namespace Thrift.Transport.Server { try { - EnsurePipeInstance(); + EnsurePipeInstances(); - await _stream.WaitForConnectionAsync(cancellationToken); + // fill the list and wait for any task to be completed + var tasks = new List<Task>(); + for (var i = 0; i < _streams.Count; ++i) + { + if (_streams[i].Task == null) + { + var pair = _streams[i]; + pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken); + _streams[i] = pair; + } - var trans = new ServerTransport(_stream, Configuration); - _stream = null; // pass ownership to ServerTransport + tasks.Add(_streams[i].Task); + } - //_isPending = false; + // there must be an exact mapping between task index and stream index + Debug.Assert(_streams.Count == tasks.Count); + var index = Task.WaitAny(tasks.ToArray(), cancellationToken); + + var trans = new ServerTransport(_streams[index].Stream, Configuration); + _streams.RemoveAt(index); // pass stream ownership to ServerTransport return trans; } @@ -296,8 +384,13 @@ namespace Thrift.Transport.Server public override void Close() { - PipeStream?.Dispose(); - PipeStream = null; + if (PipeStream != null) + { + if (PipeStream.IsConnected) + PipeStream.Disconnect(); + PipeStream.Dispose(); + PipeStream = null; + } } public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) @@ -341,19 +434,23 @@ namespace Thrift.Transport.Server } } - public override Task FlushAsync(CancellationToken cancellationToken) + public override async Task FlushAsync(CancellationToken cancellationToken) { - cancellationToken.ThrowIfCancellationRequested(); - + await PipeStream.FlushAsync(cancellationToken); ResetConsumedMessageSize(); - return Task.CompletedTask; } protected override void Dispose(bool disposing) { if (disposing) { - PipeStream?.Dispose(); + if (PipeStream != null) + { + if (PipeStream.IsConnected) + PipeStream.Disconnect(); + PipeStream.Dispose(); + PipeStream = null; + } } } } |