// UPIPE_Stream.cpp // $Id$ #define ACE_BUILD_DLL #include "ace/UPIPE_Stream.h" #if defined (ACE_HAS_THREADS) #if !defined (__ACE_INLINE__) #include "ace/UPIPE_Stream.i" #endif /* __ACE_INLINE__ */ ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream) ACE_UPIPE_Stream::ACE_UPIPE_Stream (void) : mb_last_ (0), remaining_ (0), reference_count_ (0) { ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM"); } int ACE_UPIPE_Stream::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, void * val) { ACE_TRACE ("ACE_UPIPE_Stream::control"); return this->stream_.control (cmd, val); } void ACE_UPIPE_Stream::dump (void) const { ACE_TRACE ("ACE_UPIPE_Stream::dump"); } int ACE_UPIPE_Stream::close (void) { ACE_TRACE ("ACE_UPIPE_Stream::close"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); this->reference_count_--; if (this->reference_count_ == 0) { // Since the UPIPE should have been closed earlier we won't bother // checking to see if closing it now fails. if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE) this->ACE_SPIPE::close (); // Close down the ACE_stream. return this->stream_.close (0); } return 0; } int ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const { ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr"); remote_sap = this->remote_addr_; return 0; } int ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_UPIPE_Stream::send_msg"); return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0; } int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p, ACE_Time_Value *timeout) { return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0; } // Send a buffer. int ACE_UPIPE_Stream::send (const char *buffer, size_t n, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_UPIPE_Stream::send"); ACE_Message_Block *mb_p; ACE_NEW_RETURN (mb_p, ACE_Message_Block (n), -1); mb_p->copy (buffer, n); return this->stream_.put (mb_p, timeout) == -1 ? -1 : (int) n; } // Receive a buffer. int ACE_UPIPE_Stream::recv (char *buffer, size_t n, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_UPIPE_Stream::recv"); // Index in buffer. size_t bytes_read = 0; while (bytes_read < n) if (this->mb_last_ != 0) { // We have remaining data in our last read Message_Buffer. if (this->remaining_ < n) { // The remaining data is not enough. ACE_OS::memcpy ((void *) &buffer[bytes_read], this->mb_last_->rd_ptr (), this->remaining_); bytes_read += this->remaining_; this->remaining_ = 0; this->mb_last_ = this->mb_last_->release (); return bytes_read; } else { // The remaining data is at least enough. If there's // more, we'll get it the next time through. ACE_OS::memcpy (&buffer[bytes_read], this->mb_last_->rd_ptr (), n); bytes_read += n; // Advance rd_ptr. this->mb_last_->rd_ptr (n); this->remaining_ -= n; if (this->remaining_ == 0) // Now the Message_Buffer is empty. this->mb_last_ = this->mb_last_->release (); } } else { // We have to get a new Message_Buffer from our stream. int result = this->stream_.get (this->mb_last_, timeout); if (result == -1) { if (errno == EWOULDBLOCK && bytes_read > 0) // Return the number of bytes read before we timed out. return bytes_read; else return -1; } this->remaining_ = this->mb_last_->size (); } return bytes_read; } int ACE_UPIPE_Stream::send_n (const char *buf, size_t n, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_UPIPE_Stream::send_n"); size_t bytes_written; ssize_t len; for (bytes_written = 0; bytes_written < n; bytes_written += len) { len = this->send (buf + bytes_written, n - bytes_written, timeout); if (len == -1) return -1; } return bytes_written; } int ACE_UPIPE_Stream::recv_n (char *buf, size_t n, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_UPIPE_Stream::recv_n"); size_t bytes_read; ssize_t len; for (bytes_read = 0; bytes_read < n; bytes_read += len) { len = this->recv (buf + bytes_read, n - bytes_read, timeout); if (len == -1) return -1; else if (len == 0) break; } return bytes_read; } #endif /* ACE_HAS_THREADS */