summaryrefslogtreecommitdiff
path: root/lib/cpp/src/thrift/transport/TPipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/thrift/transport/TPipe.cpp')
-rw-r--r--lib/cpp/src/thrift/transport/TPipe.cpp224
1 files changed, 100 insertions, 124 deletions
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 3bb3dac2a..15e4845e4 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -20,11 +20,13 @@
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TPipe.h>
#ifdef _WIN32
- #include <thrift/windows/OverlappedSubmissionThread.h>
- #include <thrift/windows/Sync.h>
+#include <thrift/windows/OverlappedSubmissionThread.h>
+#include <thrift/windows/Sync.h>
#endif
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using namespace std;
@@ -46,9 +48,9 @@ public:
virtual ~TPipeImpl() = 0 {}
virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
virtual void write(const uint8_t* buf, uint32_t len) = 0;
- virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe
+ virtual HANDLE getPipeHandle() = 0; // doubles as the read handle for anon pipe
virtual void setPipeHandle(HANDLE pipehandle) = 0;
- virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
virtual void setWrtPipeHandle(HANDLE) {}
virtual bool isBufferedDataAvailable() { return false; }
virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
@@ -58,15 +60,16 @@ class TNamedPipeImpl : public TPipeImpl {
public:
explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
virtual ~TNamedPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) {
- return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len);
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
}
virtual void write(const uint8_t* buf, uint32_t len) {
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() {return Pipe_.h;}
- virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+ virtual HANDLE getPipeHandle() { return Pipe_.h; }
+ virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
+
private:
TManualResetEvent read_event_;
TManualResetEvent write_event_;
@@ -77,13 +80,14 @@ class TAnonPipeImpl : public TPipeImpl {
public:
TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
virtual ~TAnonPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) {return pipe_read (PipeRd_.h, buf, len);}
- virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len);}
+ virtual uint32_t read(uint8_t* buf, uint32_t len) { return pipe_read(PipeRd_.h, buf, len); }
+ virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len); }
+
+ virtual HANDLE getPipeHandle() { return PipeRd_.h; }
+ virtual void setPipeHandle(HANDLE PipeRd) { PipeRd_.reset(PipeRd); }
+ virtual HANDLE getWrtPipeHandle() { return PipeWrt_.h; }
+ virtual void setWrtPipeHandle(HANDLE PipeWrt) { PipeWrt_.reset(PipeWrt); }
- virtual HANDLE getPipeHandle() {return PipeRd_.h;}
- virtual void setPipeHandle(HANDLE PipeRd) {PipeRd_.reset(PipeRd);}
- virtual HANDLE getWrtPipeHandle() {return PipeWrt_.h;}
- virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);}
private:
TAutoHandle PipeRd_;
TAutoHandle PipeWrt_;
@@ -94,11 +98,8 @@ private:
// than using the regular named pipe implementation
class TWaitableNamedPipeImpl : public TPipeImpl {
public:
- explicit TWaitableNamedPipeImpl(HANDLE pipehandle) :
- Pipe_(pipehandle),
- begin_unread_idx_(0),
- end_unread_idx_(0)
- {
+ explicit TWaitableNamedPipeImpl(HANDLE pipehandle)
+ : Pipe_(pipehandle), begin_unread_idx_(0), end_unread_idx_(0) {
readOverlap_.action = TOverlappedWorkItem::READ;
readOverlap_.h = Pipe_.h;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
@@ -108,7 +109,7 @@ public:
}
virtual ~TWaitableNamedPipeImpl() {
// see if there is an outstanding read request
- if(begin_unread_idx_ == end_unread_idx_) {
+ if (begin_unread_idx_ == end_unread_idx_) {
// if so, cancel it, and wait for the dead completion
thread_->addWorkItem(&cancelOverlap_);
readOverlap_.overlappedResults(false /*ignore errors*/);
@@ -119,10 +120,11 @@ public:
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() {return Pipe_.h;}
- virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
- virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;}
+ virtual HANDLE getPipeHandle() { return Pipe_.h; }
+ virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
+ virtual bool isBufferedDataAvailable() { return begin_unread_idx_ < end_unread_idx_; }
virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
+
private:
void beginAsyncRead(uint8_t* buf, uint32_t len);
uint32_t endAsyncRead();
@@ -138,34 +140,29 @@ private:
uint32_t end_unread_idx_;
};
-void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len)
-{
+void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len) {
begin_unread_idx_ = end_unread_idx_ = 0;
readOverlap_.reset(buf, len, ready_event_.h);
thread_->addWorkItem(&readOverlap_);
- if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING)
- {
+ if (readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
}
}
-uint32_t TWaitableNamedPipeImpl::endAsyncRead()
-{
+uint32_t TWaitableNamedPipeImpl::endAsyncRead() {
return readOverlap_.overlappedResults();
}
-uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
-{
- if(begin_unread_idx_ == end_unread_idx_) {
+uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) {
+ if (begin_unread_idx_ == end_unread_idx_) {
end_unread_idx_ = endAsyncRead();
}
- uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_);
+ uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_ - begin_unread_idx_);
memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
begin_unread_idx_ += bytes_to_copy;
- if(begin_unread_idx_ != end_unread_idx_)
- {
+ if (begin_unread_idx_ != end_unread_idx_) {
assert(len == bytes_to_copy);
// we were able to fulfill the read with just the bytes in our
// buffer, and we still have buffer left
@@ -173,33 +170,29 @@ uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
}
uint32_t bytes_copied = bytes_to_copy;
- //all of the requested data has been read. Kick off an async read for the next round.
+ // all of the requested data has been read. Kick off an async read for the next round.
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
return bytes_copied;
}
-void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len)
-{
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
- memset( &tempOverlap, 0, sizeof(tempOverlap));
+ memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
uint32_t written = 0;
- while(written < len)
- {
- BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap);
+ while (written < len) {
+ BOOL result = ::WriteFile(pipe, buf + written, len - written, NULL, &tempOverlap);
- if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
- {
+ if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if(!result)
- {
+ if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -207,24 +200,21 @@ void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t l
}
}
-uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
-{
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
- memset( &tempOverlap, 0, sizeof(tempOverlap));
+ memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap);
- if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
- {
+ if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if(!result)
- {
+ if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -232,38 +222,27 @@ uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
}
//---- Constructors ----
-TPipe::TPipe(HANDLE Pipe) :
- impl_(new TWaitableNamedPipeImpl(Pipe)),
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{}
-
-TPipe::TPipe(const char *pipename) :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{
+TPipe::TPipe(HANDLE Pipe)
+ : impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), isAnonymous_(false) {
+}
+
+TPipe::TPipe(const char* pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
setPipename(pipename);
}
-TPipe::TPipe(const std::string &pipename) :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{
+TPipe::TPipe(const std::string& pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
setPipename(pipename);
}
-TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
- impl_(new TAnonPipeImpl(PipeRd, PipeWrt)),
- TimeoutSeconds_(3),
- isAnonymous_(true)
-{}
+TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt)
+ : impl_(new TAnonPipeImpl(PipeRd, PipeWrt)), TimeoutSeconds_(3), isAnonymous_(true) {
+}
-TPipe::TPipe() :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{}
+TPipe::TPipe() : TimeoutSeconds_(3), isAnonymous_(false) {
+}
-TPipe::~TPipe() {}
+TPipe::~TPipe() {
+}
//---------------------------------------------------------
// Transport callbacks
@@ -283,27 +262,24 @@ void TPipe::open() {
TAutoHandle hPipe;
do {
DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
- hPipe.reset(CreateFile(
- pipename_.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- 0, // no sharing
- NULL, // default security attributes
- OPEN_EXISTING, // opens existing pipe
- flags,
- NULL)); // no template file
+ hPipe.reset(CreateFile(pipename_.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0, // no sharing
+ NULL, // default security attributes
+ OPEN_EXISTING, // opens existing pipe
+ flags,
+ NULL)); // no template file
if (hPipe.h != INVALID_HANDLE_VALUE)
- break; //success!
+ break; // success!
- if(::GetLastError() != ERROR_PIPE_BUSY)
- {
+ if (::GetLastError() != ERROR_PIPE_BUSY) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
- } while( ::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_*1000) );
+ } while (::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_ * 1000));
- if(hPipe.h == INVALID_HANDLE_VALUE)
- {
+ if (hPipe.h == INVALID_HANDLE_VALUE) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
@@ -312,7 +288,6 @@ void TPipe::open() {
hPipe.release();
}
-
void TPipe::close() {
impl_.reset();
}
@@ -323,17 +298,15 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
return impl_->read(buf, len);
}
-uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len)
-{
- DWORD cbRead;
- int fSuccess = ReadFile(
- pipe, // pipe handle
- buf, // buffer to receive reply
- len, // size of buffer
- &cbRead, // number of bytes read
- NULL); // not overlapped
-
- if ( !fSuccess && GetLastError() != ERROR_MORE_DATA )
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len) {
+ DWORD cbRead;
+ int fSuccess = ReadFile(pipe, // pipe handle
+ buf, // buffer to receive reply
+ len, // size of buffer
+ &cbRead, // number of bytes read
+ NULL); // not overlapped
+
+ if (!fSuccess && GetLastError() != ERROR_MORE_DATA)
return 0; // No more data, possibly because client disconnected.
return cbRead;
@@ -345,17 +318,15 @@ void TPipe::write(const uint8_t* buf, uint32_t len) {
impl_->write(buf, len);
}
-void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len)
-{
- DWORD cbWritten;
- int fSuccess = WriteFile(
- pipe, // pipe handle
- buf, // message
- len, // message length
- &cbWritten, // bytes written
- NULL); // not overlapped
-
- if ( !fSuccess)
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len) {
+ DWORD cbWritten;
+ int fSuccess = WriteFile(pipe, // pipe handle
+ buf, // message
+ len, // message length
+ &cbWritten, // bytes written
+ NULL); // not overlapped
+
+ if (!fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
@@ -367,36 +338,40 @@ string TPipe::getPipename() {
return pipename_;
}
-void TPipe::setPipename(const std::string &pipename) {
- if(pipename.find("\\\\") == -1)
+void TPipe::setPipename(const std::string& pipename) {
+ if (pipename.find("\\\\") == -1)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
HANDLE TPipe::getPipeHandle() {
- if(impl_) return impl_->getPipeHandle();
+ if (impl_)
+ return impl_->getPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
- if(isAnonymous_)
+ if (isAnonymous_)
impl_->setPipeHandle(pipehandle);
else
impl_.reset(new TNamedPipeImpl(pipehandle));
}
HANDLE TPipe::getWrtPipeHandle() {
- if(impl_) return impl_->getWrtPipeHandle();
+ if (impl_)
+ return impl_->getWrtPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
- if(impl_) impl_->setWrtPipeHandle(pipehandle);
+ if (impl_)
+ impl_->setWrtPipeHandle(pipehandle);
}
HANDLE TPipe::getNativeWaitHandle() {
- if(impl_) return impl_->getNativeWaitHandle();
+ if (impl_)
+ return impl_->getNativeWaitHandle();
return INVALID_HANDLE_VALUE;
}
@@ -409,5 +384,6 @@ void TPipe::setConnectTimeout(long seconds) {
}
#endif //_WIN32
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport