summaryrefslogtreecommitdiff
path: root/TAO/tao/Pluggable.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Pluggable.cpp')
-rw-r--r--TAO/tao/Pluggable.cpp74
1 files changed, 53 insertions, 21 deletions
diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp
index 703cab202f2..949db6aaa1e 100644
--- a/TAO/tao/Pluggable.cpp
+++ b/TAO/tao/Pluggable.cpp
@@ -57,7 +57,7 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
// Make sure we have a buffering queue and there are messages in it.
if (this->buffering_queue_ == 0 ||
this->buffering_queue_->is_empty ())
- return 0;
+ return 1;
// Get the first message from the queue.
ACE_Message_Block *queued_message = 0;
@@ -70,22 +70,16 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
result = this->send (queued_message,
max_wait_time);
- // Socket closed.
- if (result == 0)
- {
- this->dequeue_all ();
- return -1;
- }
-
// Cannot send.
- if (result == -1)
+ if (result == -1 ||
+ result == 0)
{
// Timeout.
if (errno == ETIME)
{
// Since we queue up the message, this is not an error. We
// can try next time around.
- return 0;
+ return 1;
}
// Non-timeout error.
else
@@ -105,38 +99,76 @@ TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
}
void
+TAO_Transport::reset_sent_message (ACE_Message_Block *message_block,
+ size_t bytes_delivered)
+{
+ this->reset_message (message_block,
+ bytes_delivered,
+ 0);
+}
+
+void
TAO_Transport::reset_queued_message (ACE_Message_Block *message_block,
size_t bytes_delivered)
{
- while (message_block != 0 &&
- bytes_delivered != 0)
+ this->reset_message (message_block,
+ bytes_delivered,
+ 1);
+}
+
+void
+TAO_Transport::reset_message (ACE_Message_Block *message_block,
+ size_t bytes_delivered,
+ int queued_message)
+{
+ while (bytes_delivered != 0)
{
// Our current message block chain.
ACE_Message_Block *current_message_block = message_block;
+ int completely_delivered_current_message_block_chain = 0;
+
while (current_message_block != 0 &&
bytes_delivered != 0)
{
- size_t adjustment_size = ACE_MIN (current_message_block->length (), bytes_delivered);
+ size_t current_message_block_length = current_message_block->length ();
+
+ int completely_delivered_current_message_block =
+ bytes_delivered >= current_message_block_length;
+
+ size_t adjustment_size = ACE_MIN (current_message_block_length, bytes_delivered);
// Reset according to send size.
current_message_block->rd_ptr (adjustment_size);
- // Hand adjust <message_length>.
- this->buffering_queue_->message_length (this->buffering_queue_->message_length () - adjustment_size);
+ // If queued message, adjust the queue.
+ if (queued_message)
+ // Hand adjust <message_length>.
+ this->buffering_queue_->message_length (this->buffering_queue_->message_length () - adjustment_size);
// Adjust <bytes_delivered>.
bytes_delivered -= adjustment_size;
- // Next message block in the continuation chain.
- current_message_block = current_message_block->cont ();
+ if (completely_delivered_current_message_block)
+ {
+ // Next message block in the continuation chain.
+ current_message_block = current_message_block->cont ();
+
+ if (current_message_block == 0)
+ completely_delivered_current_message_block_chain = 1;
+ }
}
- // Go to the next message block chain.
- message_block = message_block->next ();
+ if (completely_delivered_current_message_block_chain)
+ {
+ // Go to the next message block chain.
+ message_block = message_block->next ();
- // Release this <current_message_block>.
- this->dequeue_head ();
+ // If queued message, adjust the queue.
+ if (queued_message)
+ // Release this <current_message_block>.
+ this->dequeue_head ();
+ }
}
}