/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #ifdef _WIN32 # include # include # include #endif //_WIN32 namespace apache { namespace thrift { namespace transport { #ifdef _WIN32 using namespace std; using boost::shared_ptr; class TPipeServerImpl : boost::noncopyable { public: TPipeServerImpl() {} virtual ~TPipeServerImpl() = 0 {} virtual void interrupt() = 0; virtual void close() = 0; virtual boost::shared_ptr acceptImpl() = 0; virtual HANDLE getPipeHandle() = 0; virtual HANDLE getWrtPipeHandle() = 0; virtual HANDLE getClientRdPipeHandle()= 0; virtual HANDLE getClientWrtPipeHandle()= 0; virtual HANDLE getNativeWaitHandle() {return NULL;} }; class TAnonPipeServer : public TPipeServerImpl { public: TAnonPipeServer() { //The anonymous pipe needs to be created first so that the server can //pass the handles on to the client before the serve (acceptImpl) //blocking call. if (!createAnonPipe()) { GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } } virtual ~TAnonPipeServer() {} virtual void interrupt() {} //not currently implemented virtual void close() { PipeR_.reset(); PipeW_.reset(); ClientAnonRead_.reset(); ClientAnonWrite_.reset(); } virtual boost::shared_ptr acceptImpl(); virtual HANDLE getPipeHandle() {return PipeR_.h;} virtual HANDLE getWrtPipeHandle() {return PipeW_.h;} virtual HANDLE getClientRdPipeHandle() {return ClientAnonRead_.h;} virtual HANDLE getClientWrtPipeHandle() {return ClientAnonWrite_.h;} private: bool createAnonPipe(); TAutoHandle PipeR_; // Anonymous Pipe (R) TAutoHandle PipeW_; // Anonymous Pipe (W) //Client side anonymous pipe handles //? Do we need duplicates to send to client? TAutoHandle ClientAnonRead_; TAutoHandle ClientAnonWrite_; }; class TNamedPipeServer : public TPipeServerImpl { public: TNamedPipeServer( const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections) { connectOverlap_.action = TOverlappedWorkItem::CONNECT; cancelOverlap_.action = TOverlappedWorkItem::CANCELIO; initiateNamedConnect(); } virtual ~TNamedPipeServer() {} virtual void interrupt() { TAutoCrit lock(pipe_protect_); cached_client_.reset(); if(Pipe_.h != INVALID_HANDLE_VALUE) { stopping_ = true; cancelOverlap_.h = Pipe_.h; // This should wake up GetOverlappedResult thread_->addWorkItem(&cancelOverlap_); close(); } } virtual void close() { Pipe_.reset(); } virtual boost::shared_ptr acceptImpl(); virtual HANDLE getPipeHandle() {return Pipe_.h;} virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;} virtual HANDLE getClientRdPipeHandle() {return INVALID_HANDLE_VALUE;} virtual HANDLE getClientWrtPipeHandle() {return INVALID_HANDLE_VALUE;} virtual HANDLE getNativeWaitHandle() {return listen_event_.h;} private: bool createNamedPipe(); void initiateNamedConnect(); TAutoOverlapThread thread_; TOverlappedWorkItem connectOverlap_; TOverlappedWorkItem cancelOverlap_; bool stopping_; std::string pipename_; uint32_t bufsize_; uint32_t maxconns_; TManualResetEvent listen_event_; boost::shared_ptr cached_client_; TAutoHandle Pipe_; TCriticalSection pipe_protect_; }; HANDLE TPipeServer::getNativeWaitHandle() { if(impl_) return impl_->getNativeWaitHandle(); return NULL; } //---- Constructors ---- TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) : bufsize_(bufsize), isAnonymous_(false) { setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); setPipename(pipename); } TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) : bufsize_(bufsize), isAnonymous_(false) { setMaxConnections(maxconnections); setPipename(pipename); } TPipeServer::TPipeServer(const std::string &pipename) : bufsize_(1024), isAnonymous_(false) { setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); setPipename(pipename); } TPipeServer::TPipeServer(int bufsize) : bufsize_(bufsize), isAnonymous_(true) { setMaxConnections(1); impl_.reset(new TAnonPipeServer); } TPipeServer::TPipeServer() : bufsize_(1024), isAnonymous_(true) { setMaxConnections(1); impl_.reset(new TAnonPipeServer); } //---- Destructor ---- TPipeServer::~TPipeServer() {} //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- void TPipeServer::listen() { if(isAnonymous_) return; impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_)); } shared_ptr TPipeServer::acceptImpl() { return impl_->acceptImpl(); } shared_ptr TAnonPipeServer::acceptImpl() { //This 0-byte read serves merely as a blocking call. byte buf; DWORD br; int fSuccess = ReadFile( PipeR_.h, // pipe handle &buf, // buffer to receive reply 0, // size of buffer &br, // number of bytes read NULL); // not overlapped if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) { GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms"); } shared_ptr client(new TPipe(PipeR_.h, PipeW_.h)); return client; } void TNamedPipeServer::initiateNamedConnect() { if (stopping_) return; if (!createNamedPipe()) { GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed"); } // The prior connection has been handled, so close the gate ResetEvent(listen_event_.h); connectOverlap_.reset(NULL, 0, listen_event_.h); connectOverlap_.h = Pipe_.h; thread_->addWorkItem(&connectOverlap_); // Wait for the client to connect; if it succeeds, the // function returns a nonzero value. If the function returns // zero, GetLastError should return ERROR_PIPE_CONNECTED. if( connectOverlap_.success ) { GlobalOutput.printf("Client connected."); cached_client_.reset(new TPipe(Pipe_.h)); Pipe_.release(); // make sure people know that a connection is ready SetEvent(listen_event_.h); return; } DWORD dwErr = connectOverlap_.last_error; switch( dwErr) { case ERROR_PIPE_CONNECTED: GlobalOutput.printf("Client connected."); cached_client_.reset(new TPipe(Pipe_.h)); Pipe_.release(); // make sure people know that a connection is ready SetEvent(listen_event_.h); return; case ERROR_IO_PENDING: return; //acceptImpl will do the appropriate WaitForMultipleObjects default: GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer ConnectNamedPipe failed"); } } shared_ptr TNamedPipeServer::acceptImpl() { { TAutoCrit lock(pipe_protect_); if(cached_client_.get() != NULL) { shared_ptr client; //zero out cached_client, since we are about to return it. client.swap(cached_client_); //kick off the next connection before returning initiateNamedConnect(); return client; //success! } } if(Pipe_.h == INVALID_HANDLE_VALUE) { throw TTransportException( TTransportException::NOT_OPEN, "TNamedPipeServer: someone called accept on a closed pipe server"); } DWORD dwDummy = 0; if(GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) { TAutoCrit lock(pipe_protect_); GlobalOutput.printf("Client connected."); shared_ptr client(new TPipe(Pipe_.h)); Pipe_.release(); //kick off the next connection before returning initiateNamedConnect(); return client; //success! } //if we got here, then we are in an error / shutdown case DWORD gle = GetLastError(); //save error before doing cleanup close(); GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle); throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); } void TPipeServer::interrupt() { if(impl_) impl_->interrupt(); } void TPipeServer::close() { if(impl_) impl_->close(); } bool TNamedPipeServer::createNamedPipe() { //Windows - set security to allow non-elevated apps //to access pipes created by elevated apps. SID_IDENTIFIER_AUTHORITY SIDAuthWorld = SECURITY_WORLD_SID_AUTHORITY; PSID everyone_sid = NULL; AllocateAndInitializeSid(&SIDAuthWorld, 1, SECURITY_WORLD_RID, 0, 0, 0, 0, 0, 0, 0, &everyone_sid); EXPLICIT_ACCESS ea; ZeroMemory(&ea, sizeof(EXPLICIT_ACCESS)); ea.grfAccessPermissions = SPECIFIC_RIGHTS_ALL | STANDARD_RIGHTS_ALL; ea.grfAccessMode = SET_ACCESS; ea.grfInheritance = NO_INHERITANCE; ea.Trustee.TrusteeForm = TRUSTEE_IS_SID; ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP; ea.Trustee.ptstrName = (LPSTR)everyone_sid; PACL acl = NULL; SetEntriesInAcl(1, &ea, NULL, &acl); PSECURITY_DESCRIPTOR sd = (PSECURITY_DESCRIPTOR)LocalAlloc(LPTR,SECURITY_DESCRIPTOR_MIN_LENGTH); InitializeSecurityDescriptor(sd, SECURITY_DESCRIPTOR_REVISION); SetSecurityDescriptorDacl(sd, TRUE, acl, FALSE); SECURITY_ATTRIBUTES sa; sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.lpSecurityDescriptor = sd; sa.bInheritHandle = FALSE; // Create an instance of the named pipe TAutoHandle hPipe(CreateNamedPipe( pipename_.c_str(), // pipe name PIPE_ACCESS_DUPLEX | // read/write access FILE_FLAG_OVERLAPPED, // async mode PIPE_TYPE_BYTE | // byte type pipe PIPE_READMODE_BYTE, // byte read mode maxconns_, // max. instances bufsize_, // output buffer size bufsize_, // input buffer size 0, // client time-out &sa)); // security attributes if(hPipe.h == INVALID_HANDLE_VALUE) { Pipe_.reset(); GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError()); return false; } Pipe_.reset(hPipe.release()); return true; } bool TAnonPipeServer::createAnonPipe() { SECURITY_ATTRIBUTES sa; SECURITY_DESCRIPTOR sd; //security information for pipes InitializeSecurityDescriptor(&sd,SECURITY_DESCRIPTOR_REVISION); SetSecurityDescriptorDacl(&sd, true, NULL, false); sa.lpSecurityDescriptor = &sd; sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.bInheritHandle = true; //allow passing handle to child HANDLE ClientAnonReadH, PipeW_H, ClientAnonWriteH, Pipe_H; if (!CreatePipe(&ClientAnonReadH,&PipeW_H,&sa,0)) //create stdin pipe { GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError()); return false; } if (!CreatePipe(&Pipe_H,&ClientAnonWriteH,&sa,0)) //create stdout pipe { GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError()); CloseHandle(ClientAnonReadH); CloseHandle(PipeW_H); return false; } ClientAnonRead_.reset(ClientAnonReadH); ClientAnonWrite_.reset(ClientAnonWriteH); PipeR_.reset(Pipe_H); PipeW_.reset(PipeW_H); return true; } //--------------------------------------------------------- // Accessors //--------------------------------------------------------- string TPipeServer::getPipename() {return pipename_;} void TPipeServer::setPipename(const std::string &pipename) { if(pipename.find("\\\\") == -1) pipename_ = "\\\\.\\pipe\\" + pipename; else pipename_ = pipename; } int TPipeServer::getBufferSize() {return bufsize_;} void TPipeServer::setBufferSize(int bufsize) {bufsize_ = bufsize;} HANDLE TPipeServer::getPipeHandle() {return impl_?impl_->getPipeHandle() :INVALID_HANDLE_VALUE;} HANDLE TPipeServer::getWrtPipeHandle() {return impl_?impl_->getWrtPipeHandle() :INVALID_HANDLE_VALUE;} HANDLE TPipeServer::getClientRdPipeHandle() {return impl_?impl_->getClientRdPipeHandle() :INVALID_HANDLE_VALUE;} HANDLE TPipeServer::getClientWrtPipeHandle() {return impl_?impl_->getClientWrtPipeHandle():INVALID_HANDLE_VALUE;} bool TPipeServer::getAnonymous() { return isAnonymous_; } void TPipeServer::setAnonymous(bool anon) { isAnonymous_ = anon;} void TPipeServer::setMaxConnections(uint32_t maxconnections) { if(maxconnections == 0) maxconns_ = 1; else if (maxconnections > PIPE_UNLIMITED_INSTANCES) maxconns_ = PIPE_UNLIMITED_INSTANCES; else maxconns_ = maxconnections; } #endif //_WIN32 }}} // apache::thrift::transport