// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using Microsoft.Win32.SafeHandles;
using System;
using System.IO.Pipes;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.ComponentModel;
using System.Security.AccessControl;
using System.Security.Principal;
using System.Collections.Generic;
using System.IO;
using System.Diagnostics;
#pragma warning disable CS1998 // async no await
namespace Thrift.Transport.Server
{
[Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
[Flags]
public enum NamedPipeClientFlags { // bad name
None = 0x00,
OnlyLocalClients = 0x01
};
[Flags]
public enum NamedPipeServerFlags
{
None = 0x00,
OnlyLocalClients = 0x01,
};
// ReSharper disable once InconsistentNaming
public class TNamedPipeServerTransport : TServerTransport
{
// to manage incoming connections, we set up a task for each stream to listen on
private struct TaskStreamPair
{
public NamedPipeServerStream Stream;
public Task Task;
public TaskStreamPair(NamedPipeServerStream stream, Task task)
{
Stream = stream;
Task = task;
}
}
///
/// This is the address of the Pipe on the localhost.
///
private readonly string _pipeAddress;
private bool _asyncMode = true;
private volatile bool _isPending = true;
private readonly List _streams = new List();
private readonly bool _onlyLocalClients = false; // compatibility default
private readonly byte _numListenPipes = 1; // compatibility default
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
: base(config)
{
if ((numListenPipes < 1) || (numListenPipes > 254))
throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
_pipeAddress = pipeAddress;
_onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
_numListenPipes = (byte)numListenPipes;
}
[Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
: base(config)
{
if ((numListenPipes < 1) || (numListenPipes > 254))
throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
_pipeAddress = pipeAddress;
_onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
_numListenPipes = (byte)numListenPipes;
}
public override bool IsOpen() {
return true;
}
public override void Listen()
{
// nothing to do here
}
private static void Close(NamedPipeServerStream pipe)
{
if (pipe != null)
{
try
{
if (pipe.IsConnected)
pipe.Disconnect();
}
finally
{
pipe.Dispose();
}
}
}
public override void Close()
{
try
{
if (_streams != null)
{
while(_streams.Count > 0)
{
Close(_streams[0].Stream);
_streams.RemoveAt(0);
}
}
}
finally
{
_streams.Clear();
_isPending = false;
}
}
public override bool IsClientPending()
{
return _isPending;
}
private void EnsurePipeInstances()
{
// set up a pool for accepting multiple calls when in multithread mode
// once connected, we hand that stream over to the processor and create a fresh one
try
{
while (_streams.Count < _numListenPipes)
_streams.Add(CreatePipeInstance());
}
catch
{
// we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
// if we have at least one pipe to listen on -> Good Enough(tm)
if (_streams.Count < 1)
throw; // no pipes is really bad
}
}
private TaskStreamPair CreatePipeInstance()
{
const PipeDirection direction = PipeDirection.InOut;
const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
const int inbuf = 4096;
const int outbuf = 4096;
var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
// TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
// - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
// - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
// EITHER WAY,
// - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
NamedPipeServerStream instance;
try
{
var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
if ((handle != null) && (!handle.IsInvalid))
{
instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
handle = null; // we don't own it any longer
}
else
{
handle?.Dispose();
instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
}
catch (NotImplementedException) // Mono still does not support async, fallback to sync
{
if (_asyncMode)
{
options &= (~PipeOptions.Asynchronous);
instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
_asyncMode = false;
}
else
{
throw;
}
}
// the task gets added later
return new TaskStreamPair( instance, null);
}
#region CreatePipeNative workaround
[StructLayout(LayoutKind.Sequential)]
internal class SECURITY_ATTRIBUTES
{
internal int nLength = 0;
internal IntPtr lpSecurityDescriptor = IntPtr.Zero;
internal int bInheritHandle = 0;
}
private const string Kernel32 = "kernel32.dll";
[DllImport(Kernel32, SetLastError = true, CharSet = CharSet.Unicode)]
internal static extern IntPtr CreateNamedPipe(
string lpName, uint dwOpenMode, uint dwPipeMode,
uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut,
SECURITY_ATTRIBUTES pipeSecurityDescriptor
);
// Workaround: create the pipe via API call
// we have to do it this way, since NamedPipeServerStream() for netstd still lacks a few CTORs and/or arguments
// and _stream.SetAccessControl(pipesec); only keeps throwing ACCESS_DENIED errors at us
// References:
// - https://github.com/dotnet/corefx/issues/30170 (closed, continued in 31190)
// - https://github.com/dotnet/corefx/issues/31190 System.IO.Pipes.AccessControl package does not work
// - https://github.com/dotnet/corefx/issues/24040 NamedPipeServerStream: Provide support for WRITE_DAC
// - https://github.com/dotnet/corefx/issues/34400 Have a mechanism for lower privileged user to connect to a privileged user's pipe
private static SafePipeHandle CreatePipeNative(string name, int inbuf, int outbuf, bool OnlyLocalClients)
{
if (! RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
return null; // Windows only
var pinningHandle = new GCHandle();
try
{
// owner gets full access, everyone else read/write
var pipesec = new PipeSecurity();
using (var currentIdentity = WindowsIdentity.GetCurrent())
{
var sidOwner = currentIdentity.Owner;
var sidWorld = new SecurityIdentifier(WellKnownSidType.WorldSid, null);
pipesec.SetOwner(sidOwner);
pipesec.AddAccessRule(new PipeAccessRule(sidOwner, PipeAccessRights.FullControl, AccessControlType.Allow));
pipesec.AddAccessRule(new PipeAccessRule(sidWorld, PipeAccessRights.ReadWrite, AccessControlType.Allow));
}
// create a security descriptor and assign it to the security attribs
var secAttrs = new SECURITY_ATTRIBUTES();
byte[] sdBytes = pipesec.GetSecurityDescriptorBinaryForm();
pinningHandle = GCHandle.Alloc(sdBytes, GCHandleType.Pinned);
unsafe {
fixed (byte* pSD = sdBytes) {
secAttrs.lpSecurityDescriptor = (IntPtr)pSD;
}
}
// a bunch of constants we will need shortly
const uint PIPE_ACCESS_DUPLEX = 0x00000003;
const uint FILE_FLAG_OVERLAPPED = 0x40000000;
const uint WRITE_DAC = 0x00040000;
const uint PIPE_TYPE_BYTE = 0x00000000;
const uint PIPE_READMODE_BYTE = 0x00000000;
const uint PIPE_UNLIMITED_INSTANCES = 255;
const uint PIPE_ACCEPT_REMOTE_CLIENTS = 0x00000000; // Connections from remote clients can be accepted and checked against the security descriptor for the pipe.
const uint PIPE_REJECT_REMOTE_CLIENTS = 0x00000008; // Connections from remote clients are automatically rejected.
// any extra flags we want to add
uint dwPipeModeXtra
= (OnlyLocalClients ? PIPE_REJECT_REMOTE_CLIENTS : PIPE_ACCEPT_REMOTE_CLIENTS)
;
// create the pipe via API call
var rawHandle = CreateNamedPipe(
@"\\.\pipe\" + name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | dwPipeModeXtra,
PIPE_UNLIMITED_INSTANCES, (uint)inbuf, (uint)outbuf,
5 * 1000,
secAttrs
);
// make a SafePipeHandle() from it
var handle = new SafePipeHandle(rawHandle, true);
if (handle.IsInvalid)
throw new Win32Exception(Marshal.GetLastWin32Error());
// return it (to be packaged)
return handle;
}
finally
{
if (pinningHandle.IsAllocated)
pinningHandle.Free();
}
}
#endregion
protected override async ValueTask AcceptImplementationAsync(CancellationToken cancellationToken)
{
try
{
EnsurePipeInstances();
// fill the list and wait for any task to be completed
var tasks = new List();
for (var i = 0; i < _streams.Count; ++i)
{
if (_streams[i].Task == null)
{
var pair = _streams[i];
pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
_streams[i] = pair;
}
tasks.Add(_streams[i].Task);
}
// there must be an exact mapping between task index and stream index
Debug.Assert(_streams.Count == tasks.Count);
var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
var trans = new ServerTransport(_streams[index].Stream, Configuration);
_streams.RemoveAt(index); // pass stream ownership to ServerTransport
return trans;
}
catch (TTransportException)
{
Close();
throw;
}
catch (TaskCanceledException)
{
Close();
throw; // let it bubble up
}
catch (Exception e)
{
Close();
throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
}
}
private class ServerTransport : TEndpointTransport
{
private NamedPipeServerStream PipeStream;
public ServerTransport(NamedPipeServerStream stream, TConfiguration config)
: base(config)
{
PipeStream = stream;
}
public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override Task OpenAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
public override void Close()
{
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Disconnect();
PipeStream.Dispose();
PipeStream = null;
}
}
public override async ValueTask ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
CheckReadBytesAvailable(length);
#if NETSTANDARD2_0
var numBytes = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
#else
var numBytes = await PipeStream.ReadAsync(buffer.AsMemory(offset, length), cancellationToken);
#endif
CountConsumedMessageBytes(numBytes);
return numBytes;
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
// if necessary, send the data in chunks
// there's a system limit around 0x10000 bytes that we hit otherwise
// MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
while (nBytes > 0)
{
#if NET5_0_OR_GREATER
await PipeStream.WriteAsync(buffer.AsMemory(offset, nBytes), cancellationToken);
#else
await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
#endif
offset += nBytes;
length -= nBytes;
nBytes = Math.Min(nBytes, length);
}
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Disconnect();
PipeStream.Dispose();
PipeStream = null;
}
}
}
}
}
}