diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 257 |
1 files changed, 91 insertions, 166 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 94beb0d4f9b..d3ca90cee6f 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -51,169 +51,20 @@ TAO_Transport::~TAO_Transport (void) // delete this->buffering_queue_; - for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + TAO_Queued_Message *i = this->head_; + while (i != 0) { // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->connection_closed (); - i->destroy (); - } -} - -#if 0 -ssize_t -TAO_Transport::send_or_buffer (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) -{ - if (stub == 0 || two_way) - return this->send (message_block, max_wait_time); + TAO_Queued_Message *tmp = i; + i = i->next (); - TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); - - return sync_strategy.send (*this, - *stub, - message_block, - max_wait_time); -} - -ssize_t -TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) -{ - // Make sure we have a buffering queue and there are messages in it. - if (this->buffering_queue_ == 0 || - this->buffering_queue_->is_empty ()) - return 1; - - // Get the first message from the queue. - ACE_Message_Block *queued_message = 0; - ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message); - - // @@ What to do here on failures? - ACE_ASSERT (result != -1); - - // Actual network send. - size_t bytes_transferred = 0; - result = this->send (queued_message, - max_wait_time, - &bytes_transferred); - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Indicate some success. - return bytes_transferred; - } - - // Since we queue up the message, this is not an error. We can - // try next time around. - return 1; + tmp->destroy (); } - - // EOF or other errors. - if (result == -1 || - result == 0) - { - this->dequeue_all (); - return -1; - } - - // If successful in sending data, reset the queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Everything was successfully delivered. - return result; -} - -void -TAO_Transport::reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 0); -} - -void -TAO_Transport::reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 1); } -void -TAO_Transport::reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message) -{ - while (bytes_delivered != 0) - { - // Our current message block chain. - ACE_Message_Block *current_message_block = message_block; - - int completely_delivered_current_message_block_chain = 0; - - while (current_message_block != 0 && - bytes_delivered != 0) - { - size_t current_message_block_length = - current_message_block->length (); - - int completely_delivered_current_message_block = - bytes_delivered >= current_message_block_length; - - size_t adjustment_size = - ACE_MIN (current_message_block_length, bytes_delivered); - - // Reset according to send size. - current_message_block->rd_ptr (adjustment_size); - - // If queued message, adjust the queue. - if (queued_message) - // Hand adjust <message_length>. - this->buffering_queue_->message_length ( - this->buffering_queue_->message_length () - adjustment_size); - - // Adjust <bytes_delivered>. - bytes_delivered -= adjustment_size; - - if (completely_delivered_current_message_block) - { - // Next message block in the continuation chain. - current_message_block = current_message_block->cont (); - - if (current_message_block == 0) - completely_delivered_current_message_block_chain = 1; - } - } - - if (completely_delivered_current_message_block_chain) - { - // Go to the next message block chain. - message_block = message_block->next (); - - // If queued message, adjust the queue. - if (queued_message) - // Release this <current_message_block>. - this->dequeue_head (); - } - } -} -#endif /* 0 */ - int TAO_Transport::handle_output () { @@ -252,16 +103,90 @@ TAO_Transport::handle_output () } int +TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, + size_t &bytes_transferred, + ACE_Time_Value *timeout) +{ + bytes_transferred = 0; + + iovec iov[IOV_MAX]; + int iovcnt = 0; + + while (message_block != 0) + { + size_t message_block_length = + message_block->length (); + + // Check if this block has any data to be sent. + if (message_block_length > 0) + { + // Collect the data in the iovec. + iov[iovcnt].iov_base = message_block->rd_ptr (); + iov[iovcnt].iov_len = message_block_length; + + // Increment iovec counter. + iovcnt++; + + // The buffer is full make a OS call. @@ TODO find a way to + // find IOV_MAX for platforms that do not define it rather + // than simply setting IOV_MAX to some arbitrary value such + // as 16. + if (iovcnt == IOV_MAX) + { + size_t current_transfer = 0; + + ssize_t result = + this->send (iov, iovcnt, current_transfer, timeout); + + + // Errors. + if (result == -1 || result == 0) + return result; + + // Add to total bytes transferred. + bytes_transferred += current_transfer; + + // Reset iovec counter. + iovcnt = 0; + } + } + + // Select the next message block in the chain. + message_block = message_block->cont (); + } + + // Check for remaining buffers to be sent. This will happen when + // IOV_MAX is not a multiple of the number of message blocks. + if (iovcnt != 0) + { + size_t current_transfer = 0; + + ssize_t result = + this->send (iov, iovcnt, current_transfer, timeout); + // Errors. + if (result == -1 || result == 0) + return result; + + // Add to total bytes transferred. + bytes_transferred += current_transfer; + } + + // Return total bytes transferred. + return bytes_transferred; +} + +int TAO_Transport::send_current_message (void) { if (this->current_message_ == 0) return 1; - size_t byte_count; - ssize_t n = this->send (this->current_message_->mb (), - 0, /* it is non-blocking */ - &byte_count); - if (n == 0) + size_t bytes_transferred; + + ssize_t retval = + this->send_message_block_chain (this->current_message_->mb (), + bytes_transferred); + if (retval == 0) { // The connection was closed, return -1 to have the Reactor // close this transport and event handler @@ -270,7 +195,7 @@ TAO_Transport::send_current_message (void) // Because there can be a partial transfer we need to adjust the // number of bytes sent. - this->current_message_->bytes_transferred (byte_count); + this->current_message_->bytes_transferred (bytes_transferred); if (this->current_message_->done ()) { // Remove the current message.... @@ -279,13 +204,13 @@ TAO_Transport::send_current_message (void) this->current_message_ = 0; - if (n == -1) + if (retval == -1) return -1; return 1; } - if (n == -1) + if (retval == -1) { // ... timeouts and flow control are not real errors, the // connection is still valid and we must continue sending the @@ -347,16 +272,16 @@ TAO_Transport::send_message_i (TAO_Stub *stub, if (non_queued_message) { // ... in this case we must try to send the message first ... - + size_t byte_count; // @@ I don't think we want to hold the mutex here, however if // we release it we need to recheck the status of the transport // after we return... once I understand the final form for this // code I will re-visit this stuff. - ssize_t n = this->send (message_block, - 0, // non-blocking - &byte_count); + ssize_t n = this->send_message_block_chain (message_block, + byte_count, + 0 /* non-blocking */); if (n == 0) return -1; else if (n == -1 && errno != EWOULDBLOCK) |