diff options
Diffstat (limited to 'TAO/tao/Pluggable.cpp')
-rw-r--r-- | TAO/tao/Pluggable.cpp | 74 |
1 files changed, 53 insertions, 21 deletions
diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index 703cab202f2..949db6aaa1e 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -57,7 +57,7 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) // Make sure we have a buffering queue and there are messages in it. if (this->buffering_queue_ == 0 || this->buffering_queue_->is_empty ()) - return 0; + return 1; // Get the first message from the queue. ACE_Message_Block *queued_message = 0; @@ -70,22 +70,16 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) result = this->send (queued_message, max_wait_time); - // Socket closed. - if (result == 0) - { - this->dequeue_all (); - return -1; - } - // Cannot send. - if (result == -1) + if (result == -1 || + result == 0) { // Timeout. if (errno == ETIME) { // Since we queue up the message, this is not an error. We // can try next time around. - return 0; + return 1; } // Non-timeout error. else @@ -105,38 +99,76 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) } void +TAO_Transport::reset_sent_message (ACE_Message_Block *message_block, + size_t bytes_delivered) +{ + this->reset_message (message_block, + bytes_delivered, + 0); +} + +void TAO_Transport::reset_queued_message (ACE_Message_Block *message_block, size_t bytes_delivered) { - while (message_block != 0 && - bytes_delivered != 0) + this->reset_message (message_block, + bytes_delivered, + 1); +} + +void +TAO_Transport::reset_message (ACE_Message_Block *message_block, + size_t bytes_delivered, + int queued_message) +{ + while (bytes_delivered != 0) { // Our current message block chain. ACE_Message_Block *current_message_block = message_block; + int completely_delivered_current_message_block_chain = 0; + while (current_message_block != 0 && bytes_delivered != 0) { - size_t adjustment_size = ACE_MIN (current_message_block->length (), bytes_delivered); + size_t current_message_block_length = current_message_block->length (); + + int completely_delivered_current_message_block = + bytes_delivered >= current_message_block_length; + + size_t adjustment_size = ACE_MIN (current_message_block_length, bytes_delivered); // Reset according to send size. current_message_block->rd_ptr (adjustment_size); - // Hand adjust <message_length>. - this->buffering_queue_->message_length (this->buffering_queue_->message_length () - adjustment_size); + // If queued message, adjust the queue. + if (queued_message) + // Hand adjust <message_length>. + this->buffering_queue_->message_length (this->buffering_queue_->message_length () - adjustment_size); // Adjust <bytes_delivered>. bytes_delivered -= adjustment_size; - // Next message block in the continuation chain. - current_message_block = current_message_block->cont (); + if (completely_delivered_current_message_block) + { + // Next message block in the continuation chain. + current_message_block = current_message_block->cont (); + + if (current_message_block == 0) + completely_delivered_current_message_block_chain = 1; + } } - // Go to the next message block chain. - message_block = message_block->next (); + if (completely_delivered_current_message_block_chain) + { + // Go to the next message block chain. + message_block = message_block->next (); - // Release this <current_message_block>. - this->dequeue_head (); + // If queued message, adjust the queue. + if (queued_message) + // Release this <current_message_block>. + this->dequeue_head (); + } } } |