diff options
author | Adam Mitz <mitza-oci@users.noreply.github.com> | 2006-08-09 15:39:56 +0000 |
---|---|---|
committer | Adam Mitz <mitza-oci@users.noreply.github.com> | 2006-08-09 15:39:56 +0000 |
commit | 02b6be8e9fc42875d428cda382627512f6c04a53 (patch) | |
tree | dbf7b5750a89613bbe790660b096e99c30b8fed3 /TAO/tao/Transport.cpp | |
parent | 1168389f4bc5ac9ecb4c54042530ffea807e38fb (diff) | |
download | ATCD-02b6be8e9fc42875d428cda382627512f6c04a53.tar.gz |
importing initial work on this ticket into the subversion branch
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 141 |
1 files changed, 106 insertions, 35 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 7f3c7ebf4b2..b132d4ee612 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -27,6 +27,7 @@ #include "ace/OS_NS_stdio.h" #include "ace/Reactor.h" #include "ace/os_include/sys/os_uio.h" +#include "ace/High_Res_Timer.h" /* * Specialization hook to add include files from @@ -478,12 +479,13 @@ TAO_Transport::handle_output (void) } int -TAO_Transport::format_queue_message (TAO_OutputCDR &stream) +TAO_Transport::format_queue_message (TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time) { if (this->messaging_object ()->format_message (stream) != 0) return -1; - return this->queue_message_i (stream.begin()); + return this->queue_message_i (stream.begin (), max_wait_time); } int @@ -503,7 +505,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, size_t &bytes_transferred, ACE_Time_Value *) { - size_t const total_length = mb->total_length (); + const size_t total_length = mb->total_length (); // We are going to block, so there is no need to clone // the message block. @@ -512,7 +514,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = this->drain_queue_i (); + const int n = this->drain_queue_i (); if (n == -1) { @@ -541,14 +543,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // We are going to block, so there is no need to clone // the message block. TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); + const size_t message_length = synch_message.message_length (); - // Push synch_message on to the back of the queue. synch_message.push_back (this->head_, this->tail_); - int const n = - this->send_synch_message_helper_i (synch_message, - max_wait_time); - + const int n = this->send_synch_message_helper_i (synch_message, + 0 /*ignored*/); if (n == -1 || n == 1) { return n; @@ -558,18 +558,30 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // if (max_wait_time != 0 && errno == ETIME) return -1; TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_synchronous_message_i, ") + ACE_TEXT ("error while scheduling flush - %m\n"), + this->id ())); + } + return -1; + } + + // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, + // because we're always going to flush anyway. // Release the mutex, other threads may modify the queue as we // block for a long time writing out data. - int result; { typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, - ace_mon, - reverse, - -1); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); result = flushing_strategy->flush_message (this, &synch_message, @@ -582,7 +594,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, if (errno == ETIME) { - if (this->head_ == &synch_message) + // If partially sent, then we must queue the remainder. + if (message_length != synch_message.message_length ()) { // This is a timeout, there is only one nasty case: the // message has been partially sent! We simply cannot take @@ -597,13 +610,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // and figuring out the request ID would be a bit of a // nightmare. // - - synch_message.remove_from_list (this->head_, this->tail_); TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, TAO_Asynch_Queued_Message ( synch_message.current_block (), this->orb_core_, + 0, // no timeout 0, 1), -1); @@ -682,6 +694,13 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, msg->remove_from_list (this->head_, this->tail_); msg->destroy (); } + else if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_message(this, msg, 0); + } return 1; } @@ -797,7 +816,14 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_transport (this); + } } return 0; @@ -920,8 +946,29 @@ TAO_Transport::drain_queue_i (void) // call. this->sent_byte_count_ = 0; + // Avoid calling this expensive function each time through the loop. Instead + // we'll assume that the time is unlikely to change much during the loop. + // If we are forced to send in the loop then we'll recompute the time. + ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); + while (i != 0) { + if (i->is_expired (now)) + { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ") + ACE_TEXT ("Discarding expired queued message.\n"), + this->id ())); + } + i->state_changed (TAO_LF_Event::LFS_TIMEOUT, + this->orb_core_->leader_follower ()); + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + i = this->head_; + continue; + } // ... each element fills the iovector ... i->fill_iov (ACE_IOV_MAX, iovcnt, iov); @@ -933,6 +980,8 @@ TAO_Transport::drain_queue_i (void) int const retval = this->drain_queue_helper (iovcnt, iov); + now = ACE_High_Res_Timer::gettimeofday_hr (); + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -999,11 +1048,19 @@ TAO_Transport::cleanup_queue_i () this->id ())); } + int byte_count = 0; + int msg_count = 0; + // Cleanup all messages while (this->head_ != 0) { TAO_Queued_Message *i = this->head_; + if (TAO_debug_level > 4) + { + byte_count += i->message_length(); + ++msg_count; + } // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, @@ -1012,7 +1069,15 @@ TAO_Transport::cleanup_queue_i () i->remove_from_list (this->head_, this->tail_); i->destroy (); - } + } + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") + ACE_TEXT ("discarded %d messages, %d bytes.\n"), + this->id (), msg_count, byte_count)); + } } void @@ -1232,6 +1297,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, return 0; } + // If it was partially sent, then we can't allow a timeout + if (byte_count > 0) + max_wait_time = 0; + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, @@ -1262,14 +1331,16 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, this->id ())); } - if (this->queue_message_i(message_block) == -1) + if (this->queue_message_i (message_block, max_wait_time) == -1) { if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("cannot queue message for ") - ACE_TEXT (" - %m\n"), - this->id ())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_asynchronous_message_i, ") + ACE_TEXT ("cannot queue message for - %m\n"), + this->id ())); + } return -1; } @@ -1289,7 +1360,11 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (constraints_reached || try_sending_first) { - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + must_flush = true; + } } if (must_flush) @@ -1305,12 +1380,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, } int -TAO_Transport::queue_message_i(const ACE_Message_Block *message_block) +TAO_Transport::queue_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) { TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, TAO_Asynch_Queued_Message (message_block, this->orb_core_, + max_wait_time, 0, 1), -1); @@ -2387,13 +2464,7 @@ TAO_Transport::post_open (size_t id) // If the wait strategy wants us to be registered with the reactor // then we do so. If registeration is required and it succeeds, // #REFCOUNT# becomes two. - if (this->wait_strategy ()->register_handler () == 0) - { - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); - } - else + if (this->wait_strategy ()->register_handler () != 0) { // Registration failures. |