diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-04 17:59:43 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-04 17:59:43 +0000 |
commit | 41b3ffd124988015d833ef559802ffd7545dd774 (patch) | |
tree | 7c2edaa6a8cd48d388b4f5b00b5522199e648e20 | |
parent | 8888c1ea1d6fcb1cc272c741b321d23f6f4438dc (diff) | |
download | ATCD-41b3ffd124988015d833ef559802ffd7545dd774.tar.gz |
ChangeLogTag:Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 24 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Messaging_Policy_i.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 25 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 12 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.inl | 10 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 363 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 36 |
8 files changed, 177 insertions, 305 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index a7fc781fd85..7dffcaeeff3 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,27 @@ +Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Transport.h: + * tao/Transport.cpp: + Remove dead code. + Rename some methods to more clearly reflect their intent. + Simplify the management for the outgoing data queue. The + cleanup_queue() method removes any element that is completely + sent, while the drain_queue() method simply tries to send as + much data as possible. + + * tao/Queued_Message.h: + * tao/Queued_Message.inl: + * tao/Queued_Message.cpp: + Each derived class can decided if the message has been + completely sent very efficiently, no need to keep a local + variable for that. + We do need variables to keep track of closed connections, failed + sends and timeouts. + + * tao/Messaging_Policy_i.cpp: + * tao/Invocation.cpp: + Improved debugging messages for timeouts + Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Makefile: diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 707cd3378b0..9e6058c32aa 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -657,7 +657,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request, CORBA::ULong msecs = this->max_wait_time_->msec (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Timeout on recv is <%u>\n"), + "TAO (%P|%t) - Synch_Invocation::invoke_i, " + "timeout on recv is <%u>\n", msecs)); } @@ -671,7 +672,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request, CORBA::ULong msecs = this->max_wait_time_->msec (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Timeout after recv is <%u> status <%d>\n"), + "TAO (%P|%t) Synch_Invocation::invoke_i, " + "timeout after recv is <%u> status <%d>\n", msecs, reply_error)); } diff --git a/TAO/tao/Messaging_Policy_i.cpp b/TAO/tao/Messaging_Policy_i.cpp index a7540f9c1db..0aeea2ea279 100644 --- a/TAO/tao/Messaging_Policy_i.cpp +++ b/TAO/tao/Messaging_Policy_i.cpp @@ -156,10 +156,9 @@ TAO_RelativeRoundtripTimeoutPolicy::set_time_value (ACE_Time_Value &time_value) if (TAO_debug_level > 0) { - CORBA::ULong msecs = - ACE_static_cast(CORBA::ULong, microseconds / 1000); + CORBA::ULong msecs = time_value.msec (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Timeout is <%u>\n"), + ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"), msecs)); } } @@ -290,4 +289,3 @@ TAO_Sync_Scope_Policy::destroy (CORBA_Environment &) } #endif /* TAO_HAS_SYNC_SCOPE_POLICY == 1 */ - diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 2d0e91b0388..7d3b19c45c5 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -11,8 +11,9 @@ ACE_RCSID(tao, Queued_Message, "$Id$") TAO_Queued_Message::TAO_Queued_Message (TAO_Message_Sent_Callback *callback) - : data_sent_successfully_ (0) - , connection_closed_ (0) + : connection_closed_ (0) + , send_failure_ (0) + , timeout_ (0) , callback_ (callback) , next_ (0) , prev_ (0) @@ -30,14 +31,18 @@ TAO_Queued_Message::connection_closed (void) if (this->callback_ != 0) { - if (this->done ()) - { - this->callback_->connection_closed (); - } - else - { - this->callback_->send_failed (); - } + this->callback_->connection_closed (); + } +} + +void +TAO_Queued_Message::send_failure (void) +{ + this->send_failure_ = 1; + + if (this->callback_ != 0) + { + this->callback_->send_failed (); } } diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index c9e6296c7d5..4763a1efa83 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -82,6 +82,9 @@ public: /// signal waiting threads. void connection_closed (void); + /// There was an error while sending the data. + void send_failure (void); + /** @name Intrusive list manipulation * * The messages are put in a doubled linked list (for easy insertion @@ -183,12 +186,15 @@ public: //@} protected: - /// Record if the send was completely successful - int data_sent_successfully_; - /// Set to 1 if the connection was closed int connection_closed_; + /// Set to 1 if there was a failure while sending the data + int send_failure_; + + /// Set to 1 if there was a timeout while sending the data + int timeout_; + private: /// If not null, this is the object that we signal to indicate that /// the message was sent. diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl index 501ae013eac..e9cd0a9ff4b 100644 --- a/TAO/tao/Queued_Message.inl +++ b/TAO/tao/Queued_Message.inl @@ -1,15 +1,5 @@ // $Id$ -ACE_INLINE int -TAO_Queued_Message::done (void) const -{ - // @@ Actually we should have a status() method that returns not - // only if there is more data, but also indicates if there was a - // failure. - return (this->data_sent_successfully_ == 1 - || this->connection_closed_ == 1); -} - ACE_INLINE TAO_Queued_Message * TAO_Queued_Message::next (void) const { diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index a0df9546e96..7305809713c 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -118,13 +118,13 @@ TAO_Transport::handle_output () { // The reactor is asking us to send more data, first check if // there is a current message that needs more sending: - retval = this->send_current_message (); + retval = this->drain_queue (); if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::handle_output, " - "send_current_message returns %d/%d\n", + "drain_queue returns %d/%d\n", this->id (), retval, errno)); } @@ -160,86 +160,6 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set) handle_set.set_bit (eh->get_handle ()); } -#if 0 -ssize_t -TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) -{ - // Make sure we have a buffering queue and there are messages in it. - if (this->buffering_queue_ == 0 || - this->buffering_queue_->is_empty ()) - return 1; - - // Now, we can take the lock and try to do something. - // - // @@CJC We might be able to reduce the length of time we hold - // the lock depending on whether or not we need to hold the - // hold the lock while we're doing queueing activities. - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); - - // Get the first message from the queue. - ACE_Message_Block *queued_message = 0; - ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message); - - // @@ What to do here on failures? - ACE_ASSERT (result != -1); - - // @@CJC take lock?? - // Actual network send. - size_t bytes_transferred = 0; - result = this->send_i (queued_message, - max_wait_time, - &bytes_transferred); - // @@CJC release lock?? - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Indicate some success. - return bytes_transferred; - } - - // Since we queue up the message, this is not an error. We can - // try next time around. - return 1; - } - - // EOF or other errors. - if (result == -1 || - result == 0) - { - this->dequeue_all (); - return -1; - } - - // If successful in sending data, reset the queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Everything was successfully delivered. - return result; -} - -void -TAO_Transport::reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 0); -} -#endif /* 0 */ - static void dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer) { @@ -364,98 +284,6 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, } int -TAO_Transport::send_current_message (void) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); - - if (this->head_ == 0) - return 1; - - // This is the vector used to send data, it must be declared outside - // the loop because after the loop there may still be data to be - // sent - int iovcnt = 0; - iovec iov[IOV_MAX]; - - // We loop over all the elements in the queue ... - TAO_Queued_Message *i = this->head_; - while (i != 0) - { - // ... each element fills the iovector ... - i->fill_iov (IOV_MAX, iovcnt, iov); - - // ... if the vector is not full we tack another message into - // the vector ... - if (iovcnt != IOV_MAX) - { - // Go for the next element in the list - i = i->next (); - continue; - } - - // ... time to send data because the vector is full. We need to - // loop because a single message can span multiple IOV_MAX - // elements ... - while (iovcnt == IOV_MAX) - { - size_t byte_count; - - // ... send the message ... - ssize_t retval = - this->send (iov, iovcnt, byte_count); - - // ... now we need to update the queue, removing elements - // that have been sent, and updating the last element if it - // was only partially sent ... - this->bytes_transferred_i (byte_count, i); - iovcnt = 0; - - if (retval == 0) - { - return -1; - } - else if (retval == -1) - { - if (errno == EWOULDBLOCK || errno == ETIME) - return 0; - return -1; - } - - if (i == 0) - break; - - /// Message <i> may have been only partially sent... - i->fill_iov (IOV_MAX, iovcnt, iov); - } - - if (i != 0) - i = i->next (); - } - - size_t byte_count; - ssize_t retval = - this->send (iov, iovcnt, byte_count); - - this->bytes_transferred_i (byte_count, i); - iovcnt = 0; - - if (retval == 0) - { - return -1; - } - else if (retval == -1) - { - if (errno == EWOULDBLOCK || errno == ETIME) - return 0; - return -1; - } - - if (this->head_ == 0) - return 1; - return 0; -} - -int TAO_Transport::send_message_i (TAO_Stub *stub, int twoway_flag, const ACE_Message_Block *message_block, @@ -575,7 +403,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub, ace_mon.acquire (); } - synch_message.remove_from_list (this->head_, this->tail_); synch_message.destroy (); return result; } @@ -592,27 +419,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, " queued anyway, %d bytes sent\n", this->id (), byte_count)); - -#if 0 - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued message contains %d bytes, %d transferred\n", - this->id (), - queued_message->mb ()->total_length (), - byte_count)); -#endif /* 0 */ - } - -#if 0 - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued message still has %d bytes to go\n", - this->id (), - queued_message->mb ()->total_length ())); } -#endif /* 0 */ // ... insert at the head of the queue, we can use push_back() // because the queue is empty ... @@ -763,28 +570,6 @@ TAO_Transport::close_connection (void) } } -#if 0 -TAO_Queued_Message * -TAO_Transport::copy_message_block (const ACE_Message_Block *message_block) -{ - size_t length = message_block->total_length (); - - // @@ Use Auto_Ptr<> to cleanup the message block, should the second - // allocation fail - ACE_Message_Block *copy; - ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0); - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - copy->copy (i->rd_ptr (), i->length ()); - - TAO_Queued_Message *msg; - ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0); - - return msg; -} -#endif /* 0 */ - ssize_t TAO_Transport::send (iovec *iov, int iovcnt, size_t &bytes_transferred, @@ -930,6 +715,117 @@ TAO_Transport::cancel_output (void) } int +TAO_Transport::drain_queue (void) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); + + if (this->head_ == 0) + return 1; + + // This is the vector used to send data, it must be declared outside + // the loop because after the loop there may still be data to be + // sent + int iovcnt = 0; + iovec iov[IOV_MAX]; + + // We loop over all the elements in the queue ... + TAO_Queued_Message *i = this->head_; + do + { + // ... each element fills the iovector ... + i->fill_iov (IOV_MAX, iovcnt, iov); + + // ... if the vector is not full we tack another message into + // the vector ... + if (iovcnt != IOV_MAX) + { + // Go for the next element in the list + i = i->next (); + continue; + } + + // ... time to send data because the vector is full. We need to + // loop because a single message can span multiple IOV_MAX + // elements ... + while (iovcnt == IOV_MAX) + { + size_t byte_count; + + // ... send the message ... + ssize_t retval = + this->send (iov, iovcnt, byte_count); + + // ... now we need to update the queue, removing elements + // that have been sent, and updating the last element if it + // was only partially sent ... + this->cleanup_queue (byte_count); + iovcnt = 0; + + if (retval == 0) + { + return -1; + } + else if (retval == -1) + { + if (errno == EWOULDBLOCK || errno == ETIME) + return 0; + return -1; + } + + if (this->head_ == 0) + break; + + /// Message <i> may have been only partially sent... + i->fill_iov (IOV_MAX, iovcnt, iov); + } + } + while (this->head_ != 0); + + size_t byte_count; + ssize_t retval = + this->send (iov, iovcnt, byte_count); + + this->cleanup_queue (byte_count); + iovcnt = 0; + + if (retval == 0) + { + return -1; + } + else if (retval == -1) + { + if (errno == EWOULDBLOCK || errno == ETIME) + return 0; + return -1; + } + + if (this->head_ == 0) + return 1; + + return 0; +} + +void +TAO_Transport::cleanup_queue (size_t byte_count) +{ + while (this->head_ != 0 && byte_count > 0) + { + TAO_Queued_Message *i = this->head_; + + // Update the state of the first message + i->bytes_transferred (byte_count); + + // ... if all the data was sent the message must be removed from + // the queue... + if (i->all_data_sent ()) + { + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + } + } +} + +int TAO_Transport::must_flush_queue_i (TAO_Stub *stub) { // First let's compute the size of the queue: @@ -964,36 +860,3 @@ TAO_Transport::must_flush_queue_i (TAO_Stub *stub) return 0; } - -void -TAO_Transport::bytes_transferred_i (size_t byte_count, - TAO_Queued_Message *&iterator) -{ - TAO_Queued_Message *i = this->head_; - while (i != iterator && byte_count > 0) - { - // Update the state of each queued message - i->bytes_transferred (byte_count); - // ... if all the data was sent the message must be removed from - // the queue... - TAO_Queued_Message *tmp = i->next (); - if (i->all_data_sent ()) - { - i->remove_from_list (this->head_, this->tail_); - i->destroy (); - } - i = tmp; - } - // ... all the data has been taken care of ... - if (byte_count == 0 || iterator == 0) - return; - - iterator->bytes_transferred (byte_count); - TAO_Queued_Message *tmp = iterator->next (); - if (iterator->all_data_sent ()) - { - iterator->remove_from_list (this->head_, this->tail_); - iterator->destroy (); - iterator = tmp; - } -} diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 5fee5d7cf6d..6e296a72fe5 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -660,28 +660,7 @@ public: /// Cancel handle_output() callbacks int cancel_output (void); -protected: - // @@ see if one of these calls send_message() - /// Remove the first message from the outgoing queue. - void dequeue_head (void); - - /// Update the state of the outgoing queue, assuming that - /// bytes_delivered bytes have been sent already. - void reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered); - - /// Update the state of the outgoing queue, this time a complete - /// message was sent. - void reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered); - - /// Helper function used to implement the two methods above. - void reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message); - private: - /// Try to send the current message. /** * As the outgoing data is drained this method is invoked to send as @@ -690,7 +669,16 @@ private: * Returns 0 if there is more data to send, -1 if there was an error * and 1 if the message was completely sent. */ - int send_current_message (void); + int drain_queue (void); + + /// Cleanup the queue. + /** + * Exactly <byte_count> bytes have been sent, the queue must be + * cleaned up as potentially several messages have been completely + * sent out. + * It leaves on head_ the next message to send out. + */ + void cleanup_queue (size_t byte_count); /// Copy the contents of a message block into a Queued_Message /// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); @@ -698,10 +686,6 @@ private: /// Check if the buffering constraints have been reached int must_flush_queue_i (TAO_Stub *stub); - /// Update the queue, exactly <byte_count> bytes have been sent. - void bytes_transferred_i (size_t byte_count, - TAO_Queued_Message *&iterator); - /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) |