diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 114 |
1 files changed, 45 insertions, 69 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index c272e341914..24e2ba58788 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -30,7 +30,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) - , current_message_ (0) { TAO_Client_Strategy_Factory *cf = this->orb_core_->client_factory (); @@ -96,13 +95,12 @@ TAO_Transport::handle_output () { // ... there is no current message or it was completely // sent, time to check the queue.... - int dequeue = this->dequeue_next_message (); - if (dequeue == -1) - { - // ... no more messages in the queue, cancel output... - (void) this->cancel_output (); - return 0; - } + // ... no more messages in the queue, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + flushing_strategy->cancel_output (this); + return 0; } // ... on Win32 we must continue until we get EWOULDBLOCK ... } @@ -240,13 +238,13 @@ TAO_Transport::send_current_message (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); - if (this->current_message_ == 0) + if (this->head_ == 0) return 1; size_t bytes_transferred; ssize_t retval = - this->send_message_block_chain (this->current_message_->mb (), + this->send_message_block_chain (this->head_->mb (), bytes_transferred); if (retval == 0) { @@ -257,14 +255,14 @@ TAO_Transport::send_current_message (void) // Because there can be a partial transfer we need to adjust the // number of bytes sent. - this->current_message_->bytes_transferred (bytes_transferred); - if (this->current_message_->done ()) + this->head_->bytes_transferred (bytes_transferred); + if (this->head_->done ()) { // Remove the current message.... - // @@ We should be using a pool for these guys! - this->current_message_->destroy (); + TAO_Queued_Message *head = this->head_; + head->remove_from_list (this->head_, this->tail_); - this->current_message_ = 0; + head->destroy (); } if (retval == -1) @@ -278,33 +276,8 @@ TAO_Transport::send_current_message (void) return -1; } - if (this->current_message_ == 0) - return 1; - return 0; -} - -int -TAO_Transport::dequeue_next_message (void) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); if (this->head_ == 0) - return -1; - - this->current_message_ = this->head_; - this->current_message_->remove_from_list (this->head_, this->tail_); - - return 0; -} - -int -TAO_Transport::cancel_output (void) -{ - return 0; -} - -int -TAO_Transport::schedule_output (void) -{ + return 1; return 0; } @@ -314,6 +287,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); int queue_empty = (this->head_ == 0); @@ -321,7 +297,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // Let's figure out if the message should be queued without trying // to send first: int must_queue = 0; - if (this->current_message_ != 0) + if (this->head_ != 0) must_queue = 1; else if (!twoway_flag && stub->sync_strategy ().must_queue (queue_empty)) @@ -342,16 +318,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub, this->handle ())); } - size_t length = message_block->total_length (); - ACE_Message_Block *copy = - new ACE_Message_Block (length); - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - copy->copy (i->rd_ptr (), i->length ()); - queued_message = - new TAO_Queued_Message (copy, 1); + this->copy_message_block (message_block); + queued_message->push_back (this->head_, this->tail_); } else @@ -400,7 +369,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // ... the message was only partially sent, schedule reactive // output... - this->schedule_output (); + flushing_strategy->schedule_output (this); // ... and set it as the current message ... if (twoway_flag) @@ -414,16 +383,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub, } else { - size_t length = message_block->total_length (); - ACE_Message_Block *copy = - new ACE_Message_Block (length); - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - copy->copy (i->rd_ptr (), i->length ()); - queued_message = - new TAO_Queued_Message (copy, 1); + this->copy_message_block (message_block); } // @@ Revisit message queue allocations @@ -454,15 +415,15 @@ TAO_Transport::send_message_i (TAO_Stub *stub, queued_message->mb ()->total_length ())); } - this->current_message_ = queued_message; + // ... insert at the head of the queue, we can use push_back() + // because the queue is empty ... + queued_message->push_back (this->head_, this->tail_); } // ... two choices, this is a twoway request or not, if it is // then we must only return once the complete message has been // sent: - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); if (twoway_flag) { // Release the mutex, other threads may modify the queue as we @@ -485,11 +446,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub, msg_count++; total_bytes += i->mb ()->total_length (); } - if (this->current_message_ != 0) - { - msg_count++; - total_bytes += this->current_message_->mb ()->total_length (); - } int set_timer; ACE_Time_Value interval; @@ -549,3 +505,23 @@ TAO_Transport::reactor_signalling (void) { return 0; } + +TAO_Queued_Message * +TAO_Transport::copy_message_block (const ACE_Message_Block *message_block) +{ + size_t length = message_block->total_length (); + + // @@ Use Auto_Ptr<> to cleanup the message block, should the second + // allocation fail + ACE_Message_Block *copy; + ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0); + for (const ACE_Message_Block *i = message_block; + i != 0; + i = i->cont ()) + copy->copy (i->rd_ptr (), i->length ()); + + TAO_Queued_Message *msg; + ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0); + + return msg; +} |