summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-04 17:59:43 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-04 17:59:43 +0000
commit41b3ffd124988015d833ef559802ffd7545dd774 (patch)
tree7c2edaa6a8cd48d388b4f5b00b5522199e648e20
parent8888c1ea1d6fcb1cc272c741b321d23f6f4438dc (diff)
downloadATCD-41b3ffd124988015d833ef559802ffd7545dd774.tar.gz
ChangeLogTag:Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a24
-rw-r--r--TAO/tao/Invocation.cpp6
-rw-r--r--TAO/tao/Messaging_Policy_i.cpp6
-rw-r--r--TAO/tao/Queued_Message.cpp25
-rw-r--r--TAO/tao/Queued_Message.h12
-rw-r--r--TAO/tao/Queued_Message.inl10
-rw-r--r--TAO/tao/Transport.cpp363
-rw-r--r--TAO/tao/Transport.h36
8 files changed, 177 insertions, 305 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index a7fc781fd85..7dffcaeeff3 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,27 @@
+Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ Remove dead code.
+ Rename some methods to more clearly reflect their intent.
+ Simplify the management for the outgoing data queue. The
+ cleanup_queue() method removes any element that is completely
+ sent, while the drain_queue() method simply tries to send as
+ much data as possible.
+
+ * tao/Queued_Message.h:
+ * tao/Queued_Message.inl:
+ * tao/Queued_Message.cpp:
+ Each derived class can decided if the message has been
+ completely sent very efficiently, no need to keep a local
+ variable for that.
+ We do need variables to keep track of closed connections, failed
+ sends and timeouts.
+
+ * tao/Messaging_Policy_i.cpp:
+ * tao/Invocation.cpp:
+ Improved debugging messages for timeouts
+
Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Makefile:
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index 707cd3378b0..9e6058c32aa 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -657,7 +657,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request,
CORBA::ULong msecs = this->max_wait_time_->msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Timeout on recv is <%u>\n"),
+ "TAO (%P|%t) - Synch_Invocation::invoke_i, "
+ "timeout on recv is <%u>\n",
msecs));
}
@@ -671,7 +672,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request,
CORBA::ULong msecs = this->max_wait_time_->msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Timeout after recv is <%u> status <%d>\n"),
+ "TAO (%P|%t) Synch_Invocation::invoke_i, "
+ "timeout after recv is <%u> status <%d>\n",
msecs,
reply_error));
}
diff --git a/TAO/tao/Messaging_Policy_i.cpp b/TAO/tao/Messaging_Policy_i.cpp
index a7540f9c1db..0aeea2ea279 100644
--- a/TAO/tao/Messaging_Policy_i.cpp
+++ b/TAO/tao/Messaging_Policy_i.cpp
@@ -156,10 +156,9 @@ TAO_RelativeRoundtripTimeoutPolicy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs =
- ACE_static_cast(CORBA::ULong, microseconds / 1000);
+ CORBA::ULong msecs = time_value.msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Timeout is <%u>\n"),
+ ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
msecs));
}
}
@@ -290,4 +289,3 @@ TAO_Sync_Scope_Policy::destroy (CORBA_Environment &)
}
#endif /* TAO_HAS_SYNC_SCOPE_POLICY == 1 */
-
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 2d0e91b0388..7d3b19c45c5 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -11,8 +11,9 @@
ACE_RCSID(tao, Queued_Message, "$Id$")
TAO_Queued_Message::TAO_Queued_Message (TAO_Message_Sent_Callback *callback)
- : data_sent_successfully_ (0)
- , connection_closed_ (0)
+ : connection_closed_ (0)
+ , send_failure_ (0)
+ , timeout_ (0)
, callback_ (callback)
, next_ (0)
, prev_ (0)
@@ -30,14 +31,18 @@ TAO_Queued_Message::connection_closed (void)
if (this->callback_ != 0)
{
- if (this->done ())
- {
- this->callback_->connection_closed ();
- }
- else
- {
- this->callback_->send_failed ();
- }
+ this->callback_->connection_closed ();
+ }
+}
+
+void
+TAO_Queued_Message::send_failure (void)
+{
+ this->send_failure_ = 1;
+
+ if (this->callback_ != 0)
+ {
+ this->callback_->send_failed ();
}
}
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index c9e6296c7d5..4763a1efa83 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -82,6 +82,9 @@ public:
/// signal waiting threads.
void connection_closed (void);
+ /// There was an error while sending the data.
+ void send_failure (void);
+
/** @name Intrusive list manipulation
*
* The messages are put in a doubled linked list (for easy insertion
@@ -183,12 +186,15 @@ public:
//@}
protected:
- /// Record if the send was completely successful
- int data_sent_successfully_;
-
/// Set to 1 if the connection was closed
int connection_closed_;
+ /// Set to 1 if there was a failure while sending the data
+ int send_failure_;
+
+ /// Set to 1 if there was a timeout while sending the data
+ int timeout_;
+
private:
/// If not null, this is the object that we signal to indicate that
/// the message was sent.
diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl
index 501ae013eac..e9cd0a9ff4b 100644
--- a/TAO/tao/Queued_Message.inl
+++ b/TAO/tao/Queued_Message.inl
@@ -1,15 +1,5 @@
// $Id$
-ACE_INLINE int
-TAO_Queued_Message::done (void) const
-{
- // @@ Actually we should have a status() method that returns not
- // only if there is more data, but also indicates if there was a
- // failure.
- return (this->data_sent_successfully_ == 1
- || this->connection_closed_ == 1);
-}
-
ACE_INLINE TAO_Queued_Message *
TAO_Queued_Message::next (void) const
{
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index a0df9546e96..7305809713c 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -118,13 +118,13 @@ TAO_Transport::handle_output ()
{
// The reactor is asking us to send more data, first check if
// there is a current message that needs more sending:
- retval = this->send_current_message ();
+ retval = this->drain_queue ();
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::handle_output, "
- "send_current_message returns %d/%d\n",
+ "drain_queue returns %d/%d\n",
this->id (),
retval, errno));
}
@@ -160,86 +160,6 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set)
handle_set.set_bit (eh->get_handle ());
}
-#if 0
-ssize_t
-TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
-{
- // Make sure we have a buffering queue and there are messages in it.
- if (this->buffering_queue_ == 0 ||
- this->buffering_queue_->is_empty ())
- return 1;
-
- // Now, we can take the lock and try to do something.
- //
- // @@CJC We might be able to reduce the length of time we hold
- // the lock depending on whether or not we need to hold the
- // hold the lock while we're doing queueing activities.
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
-
- // Get the first message from the queue.
- ACE_Message_Block *queued_message = 0;
- ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message);
-
- // @@ What to do here on failures?
- ACE_ASSERT (result != -1);
-
- // @@CJC take lock??
- // Actual network send.
- size_t bytes_transferred = 0;
- result = this->send_i (queued_message,
- max_wait_time,
- &bytes_transferred);
- // @@CJC release lock??
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Indicate some success.
- return bytes_transferred;
- }
-
- // Since we queue up the message, this is not an error. We can
- // try next time around.
- return 1;
- }
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- {
- this->dequeue_all ();
- return -1;
- }
-
- // If successful in sending data, reset the queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Everything was successfully delivered.
- return result;
-}
-
-void
-TAO_Transport::reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered)
-{
- this->reset_message (message_block,
- bytes_delivered,
- 0);
-}
-#endif /* 0 */
-
static void
dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer)
{
@@ -364,98 +284,6 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
}
int
-TAO_Transport::send_current_message (void)
-{
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
-
- if (this->head_ == 0)
- return 1;
-
- // This is the vector used to send data, it must be declared outside
- // the loop because after the loop there may still be data to be
- // sent
- int iovcnt = 0;
- iovec iov[IOV_MAX];
-
- // We loop over all the elements in the queue ...
- TAO_Queued_Message *i = this->head_;
- while (i != 0)
- {
- // ... each element fills the iovector ...
- i->fill_iov (IOV_MAX, iovcnt, iov);
-
- // ... if the vector is not full we tack another message into
- // the vector ...
- if (iovcnt != IOV_MAX)
- {
- // Go for the next element in the list
- i = i->next ();
- continue;
- }
-
- // ... time to send data because the vector is full. We need to
- // loop because a single message can span multiple IOV_MAX
- // elements ...
- while (iovcnt == IOV_MAX)
- {
- size_t byte_count;
-
- // ... send the message ...
- ssize_t retval =
- this->send (iov, iovcnt, byte_count);
-
- // ... now we need to update the queue, removing elements
- // that have been sent, and updating the last element if it
- // was only partially sent ...
- this->bytes_transferred_i (byte_count, i);
- iovcnt = 0;
-
- if (retval == 0)
- {
- return -1;
- }
- else if (retval == -1)
- {
- if (errno == EWOULDBLOCK || errno == ETIME)
- return 0;
- return -1;
- }
-
- if (i == 0)
- break;
-
- /// Message <i> may have been only partially sent...
- i->fill_iov (IOV_MAX, iovcnt, iov);
- }
-
- if (i != 0)
- i = i->next ();
- }
-
- size_t byte_count;
- ssize_t retval =
- this->send (iov, iovcnt, byte_count);
-
- this->bytes_transferred_i (byte_count, i);
- iovcnt = 0;
-
- if (retval == 0)
- {
- return -1;
- }
- else if (retval == -1)
- {
- if (errno == EWOULDBLOCK || errno == ETIME)
- return 0;
- return -1;
- }
-
- if (this->head_ == 0)
- return 1;
- return 0;
-}
-
-int
TAO_Transport::send_message_i (TAO_Stub *stub,
int twoway_flag,
const ACE_Message_Block *message_block,
@@ -575,7 +403,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
ace_mon.acquire ();
}
- synch_message.remove_from_list (this->head_, this->tail_);
synch_message.destroy ();
return result;
}
@@ -592,27 +419,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
" queued anyway, %d bytes sent\n",
this->id (),
byte_count));
-
-#if 0
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued message contains %d bytes, %d transferred\n",
- this->id (),
- queued_message->mb ()->total_length (),
- byte_count));
-#endif /* 0 */
- }
-
-#if 0
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued message still has %d bytes to go\n",
- this->id (),
- queued_message->mb ()->total_length ()));
}
-#endif /* 0 */
// ... insert at the head of the queue, we can use push_back()
// because the queue is empty ...
@@ -763,28 +570,6 @@ TAO_Transport::close_connection (void)
}
}
-#if 0
-TAO_Queued_Message *
-TAO_Transport::copy_message_block (const ACE_Message_Block *message_block)
-{
- size_t length = message_block->total_length ();
-
- // @@ Use Auto_Ptr<> to cleanup the message block, should the second
- // allocation fail
- ACE_Message_Block *copy;
- ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0);
- for (const ACE_Message_Block *i = message_block;
- i != 0;
- i = i->cont ())
- copy->copy (i->rd_ptr (), i->length ());
-
- TAO_Queued_Message *msg;
- ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0);
-
- return msg;
-}
-#endif /* 0 */
-
ssize_t
TAO_Transport::send (iovec *iov, int iovcnt,
size_t &bytes_transferred,
@@ -930,6 +715,117 @@ TAO_Transport::cancel_output (void)
}
int
+TAO_Transport::drain_queue (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
+
+ if (this->head_ == 0)
+ return 1;
+
+ // This is the vector used to send data, it must be declared outside
+ // the loop because after the loop there may still be data to be
+ // sent
+ int iovcnt = 0;
+ iovec iov[IOV_MAX];
+
+ // We loop over all the elements in the queue ...
+ TAO_Queued_Message *i = this->head_;
+ do
+ {
+ // ... each element fills the iovector ...
+ i->fill_iov (IOV_MAX, iovcnt, iov);
+
+ // ... if the vector is not full we tack another message into
+ // the vector ...
+ if (iovcnt != IOV_MAX)
+ {
+ // Go for the next element in the list
+ i = i->next ();
+ continue;
+ }
+
+ // ... time to send data because the vector is full. We need to
+ // loop because a single message can span multiple IOV_MAX
+ // elements ...
+ while (iovcnt == IOV_MAX)
+ {
+ size_t byte_count;
+
+ // ... send the message ...
+ ssize_t retval =
+ this->send (iov, iovcnt, byte_count);
+
+ // ... now we need to update the queue, removing elements
+ // that have been sent, and updating the last element if it
+ // was only partially sent ...
+ this->cleanup_queue (byte_count);
+ iovcnt = 0;
+
+ if (retval == 0)
+ {
+ return -1;
+ }
+ else if (retval == -1)
+ {
+ if (errno == EWOULDBLOCK || errno == ETIME)
+ return 0;
+ return -1;
+ }
+
+ if (this->head_ == 0)
+ break;
+
+ /// Message <i> may have been only partially sent...
+ i->fill_iov (IOV_MAX, iovcnt, iov);
+ }
+ }
+ while (this->head_ != 0);
+
+ size_t byte_count;
+ ssize_t retval =
+ this->send (iov, iovcnt, byte_count);
+
+ this->cleanup_queue (byte_count);
+ iovcnt = 0;
+
+ if (retval == 0)
+ {
+ return -1;
+ }
+ else if (retval == -1)
+ {
+ if (errno == EWOULDBLOCK || errno == ETIME)
+ return 0;
+ return -1;
+ }
+
+ if (this->head_ == 0)
+ return 1;
+
+ return 0;
+}
+
+void
+TAO_Transport::cleanup_queue (size_t byte_count)
+{
+ while (this->head_ != 0 && byte_count > 0)
+ {
+ TAO_Queued_Message *i = this->head_;
+
+ // Update the state of the first message
+ i->bytes_transferred (byte_count);
+
+ // ... if all the data was sent the message must be removed from
+ // the queue...
+ if (i->all_data_sent ())
+ {
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ }
+ }
+}
+
+int
TAO_Transport::must_flush_queue_i (TAO_Stub *stub)
{
// First let's compute the size of the queue:
@@ -964,36 +860,3 @@ TAO_Transport::must_flush_queue_i (TAO_Stub *stub)
return 0;
}
-
-void
-TAO_Transport::bytes_transferred_i (size_t byte_count,
- TAO_Queued_Message *&iterator)
-{
- TAO_Queued_Message *i = this->head_;
- while (i != iterator && byte_count > 0)
- {
- // Update the state of each queued message
- i->bytes_transferred (byte_count);
- // ... if all the data was sent the message must be removed from
- // the queue...
- TAO_Queued_Message *tmp = i->next ();
- if (i->all_data_sent ())
- {
- i->remove_from_list (this->head_, this->tail_);
- i->destroy ();
- }
- i = tmp;
- }
- // ... all the data has been taken care of ...
- if (byte_count == 0 || iterator == 0)
- return;
-
- iterator->bytes_transferred (byte_count);
- TAO_Queued_Message *tmp = iterator->next ();
- if (iterator->all_data_sent ())
- {
- iterator->remove_from_list (this->head_, this->tail_);
- iterator->destroy ();
- iterator = tmp;
- }
-}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 5fee5d7cf6d..6e296a72fe5 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -660,28 +660,7 @@ public:
/// Cancel handle_output() callbacks
int cancel_output (void);
-protected:
- // @@ see if one of these calls send_message()
- /// Remove the first message from the outgoing queue.
- void dequeue_head (void);
-
- /// Update the state of the outgoing queue, assuming that
- /// bytes_delivered bytes have been sent already.
- void reset_queued_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
-
- /// Update the state of the outgoing queue, this time a complete
- /// message was sent.
- void reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
-
- /// Helper function used to implement the two methods above.
- void reset_message (ACE_Message_Block *message_block,
- size_t bytes_delivered,
- int queued_message);
-
private:
-
/// Try to send the current message.
/**
* As the outgoing data is drained this method is invoked to send as
@@ -690,7 +669,16 @@ private:
* Returns 0 if there is more data to send, -1 if there was an error
* and 1 if the message was completely sent.
*/
- int send_current_message (void);
+ int drain_queue (void);
+
+ /// Cleanup the queue.
+ /**
+ * Exactly <byte_count> bytes have been sent, the queue must be
+ * cleaned up as potentially several messages have been completely
+ * sent out.
+ * It leaves on head_ the next message to send out.
+ */
+ void cleanup_queue (size_t byte_count);
/// Copy the contents of a message block into a Queued_Message
/// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb);
@@ -698,10 +686,6 @@ private:
/// Check if the buffering constraints have been reached
int must_flush_queue_i (TAO_Stub *stub);
- /// Update the queue, exactly <byte_count> bytes have been sent.
- void bytes_transferred_i (size_t byte_count,
- TAO_Queued_Message *&iterator);
-
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))