diff options
Diffstat (limited to 'ace/UPIPE_Stream.cpp')
-rw-r--r-- | ace/UPIPE_Stream.cpp | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/ace/UPIPE_Stream.cpp b/ace/UPIPE_Stream.cpp new file mode 100644 index 00000000000..3c9bed43494 --- /dev/null +++ b/ace/UPIPE_Stream.cpp @@ -0,0 +1,206 @@ +// UPIPE_Stream.cpp +// $Id$ + +#define ACE_BUILD_DLL +#include "ace/UPIPE_Stream.h" + +#if defined (ACE_HAS_THREADS) + +#if !defined (__ACE_INLINE__) +#include "ace/UPIPE_Stream.i" +#endif /* __ACE_INLINE__ */ + + +ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream) + +int +ACE_UPIPE_Stream::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, + void * val) +{ + ACE_TRACE ("ACE_UPIPE_Stream::control"); + return this->stream_.control (cmd, val); +} + +void +ACE_UPIPE_Stream::dump (void) const +{ + ACE_TRACE ("ACE_UPIPE_Stream::dump"); +} + +int +ACE_UPIPE_Stream::close (void) +{ + ACE_TRACE ("ACE_UPIPE_Stream::close"); + // Since the UPIPE should have been closed earlier we won't bother + // checking to see if closing it now fails. + + if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE) + this->ACE_SPIPE::close (); + + // Close down the ACE_stream. + return this->stream_.close (0); +} + +int +ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const +{ + ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr"); + remote_sap = this->remote_addr_; + return 0; +} + +int +ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_UPIPE_Stream::send_msg"); + return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0; +} + +int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p, + ACE_Time_Value *timeout) +{ + return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0; +} + +// Send a buffer. + +int +ACE_UPIPE_Stream::send (const char *buffer, + size_t n, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_UPIPE_Stream::send"); + + ACE_Message_Block *mb_p; + + ACE_NEW_RETURN (mb_p, ACE_Message_Block (n), -1); + + mb_p->copy (buffer, n); + return this->stream_.put (mb_p, timeout) == -1 ? -1 : n; +} + +// Receive a buffer. + +int +ACE_UPIPE_Stream::recv (char *buffer, + size_t n, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_UPIPE_Stream::recv"); + // Index in buffer. + size_t bytes_read = 0; + + while (bytes_read < n) + if (this->mb_last_ != 0) + { + // We have remaining data in our last read Message_Buffer. + + if (this->remaining_ < n) + { + // The remaining data is not enough. + + ACE_OS::memcpy ((void *) &buffer[bytes_read], + this->mb_last_->rd_ptr (), + this->remaining_); + bytes_read += this->remaining_; + this->remaining_ = 0; + delete this->mb_last_; + this->mb_last_ = 0; + return bytes_read; + } + else + { + // The remaining data is at least enough. If there's + // more, we'll get it the next time through. + + ACE_OS::memcpy (&buffer[bytes_read], + this->mb_last_->rd_ptr (), + n); + bytes_read += n; + + // Advance rd_ptr. + this->mb_last_->rd_ptr (n); + this->remaining_ -= n; + + if (this->remaining_ == 0) + { + // Now the Message_Buffer is empty. + + delete this->mb_last_; + this->mb_last_ = 0; + } + } + } + else + { + // We have to get a new Message_Buffer from our stream. + + int result = this->stream_.get (this->mb_last_, timeout); + + if (result == -1) + { + if (errno == EWOULDBLOCK && bytes_read > 0) + // Return the number of bytes read before we timed out. + return bytes_read; + else + return -1; + } + this->remaining_ = this->mb_last_->size (); + } + + return bytes_read; +} + +int +ACE_UPIPE_Stream::send_n (const char *buf, + size_t n, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_UPIPE_Stream::send_n"); + + size_t bytes_written; + ssize_t len; + + for (bytes_written = 0; + bytes_written < n; + bytes_written += len) + { + len = this->send (buf + bytes_written, + n - bytes_written, + timeout); + + if (len == -1) + return -1; + } + + return bytes_written; +} + +int +ACE_UPIPE_Stream::recv_n (char *buf, + size_t n, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_UPIPE_Stream::recv_n"); + size_t bytes_read; + ssize_t len; + + for (bytes_read = 0; + bytes_read < n; + bytes_read += len) + { + len = this->recv (buf + bytes_read, + n - bytes_read, + timeout); + if (len == -1) + return -1; + else if (len == 0) + break; + } + + return bytes_read; +} + +#endif /* ACE_HAS_THREADS */ + |