summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-02 01:10:05 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-02 01:10:05 +0000
commit8888c1ea1d6fcb1cc272c741b321d23f6f4438dc (patch)
tree1cfa770f352f48c319c6ab7ea66ed67c11f79156
parentbfc54a683e8acb51cc07e88c42d1626bd60794fd (diff)
downloadATCD-8888c1ea1d6fcb1cc272c741b321d23f6f4438dc.tar.gz
ChangeLogTag:Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a62
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp78
-rw-r--r--TAO/tao/Asynch_Queued_Message.h73
-rw-r--r--TAO/tao/Makefile2
-rw-r--r--TAO/tao/Queued_Message.cpp43
-rw-r--r--TAO/tao/Queued_Message.h98
-rw-r--r--TAO/tao/Queued_Message.inl12
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp79
-rw-r--r--TAO/tao/Synch_Queued_Message.h84
-rw-r--r--TAO/tao/TAO.dsp64
-rw-r--r--TAO/tao/TAO_Static.dsp16
-rw-r--r--TAO/tao/Transport.cpp409
-rw-r--r--TAO/tao/Transport.h17
13 files changed, 772 insertions, 265 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 0197b4a5dce..a7fc781fd85 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,65 @@
+Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Makefile:
+ * tao/TAO.dsp:
+ * tao/TAO_Static.dsp:
+ * tao/Asynch_Queued_Message.h:
+ * tao/Asynch_Queued_Message.cpp:
+ * tao/Synch_Queued_Message.h:
+ * tao/Synch_Queued_Message.cpp:
+ * tao/Queued_Message.h:
+ * tao/Queued_Message.inl:
+ * tao/Queued_Message.cpp:
+ Specialize the Queue_Message class for Synchronous and
+ Asynchronous messages. Their behavior is completely different:
+ synchronous messages (twoways and reliable oneways) are
+ allocated from the stack, they should not copy the CDR stream
+ and thus have to deal with message block chains.
+ Asynchronous messages (oneways and AMIs with SYNC_NONE policy)
+ are allocated from the heap, they must copy their data and thus
+ can reassemble it in a single buffer.
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ Changed the transport to use the new interface in the
+ Queued_Message class.
+ Completely separate the synchronous and asynchronous operation
+ path.
+ The new implementation recovers some functionality lost in
+ previous revisions: multiple messages can be sent in a single
+ iovector.
+
+ * tests/Big_Oneways/Test.idl:
+ * tests/Big_Oneways/Session.h:
+ * tests/Big_Oneways/Session.cpp:
+ Add ping() operation to the Session IDL interface.
+ This is used to validate the session during startup, i.e. ensure
+ that enough connections are available for all the threads.
+
+Sat Mar 31 14:56:37 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Transport.cpp:
+ Fixed memory management and synchronization problems.
+ Invoke the connection_closed() method on all the pending
+ messages if the connection is closed.
+
+ * tao/Queued_Message.h:
+ * tao/Queued_Message.inl:
+ * tao/Queued_Message.cpp:
+ If the connection is closed there is no sense in trying to
+ continue sending the message.
+ The done() method returns 1 if the connection was closed or if
+ the message was completely sent.
+
+ * tao/Reactive_Flushing_Strategy.cpp:
+ Propagate any errors from Transport::schedule_output() and
+ Transport::cancel_output()
+
+ * tests/Big_Request_Muxing/Big_Request_Muxing.dsw:
+ * tests/Big_Request_Muxing/client.dsp:
+ * tests/Big_Request_Muxing/server.dsp:
+ Add MSVC project files
+
Fri Mar 30 17:06:33 2001 Carlos O'Ryan <coryan@uci.edu>
* tests/README:
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
new file mode 100644
index 00000000000..1123eb99bd0
--- /dev/null
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -0,0 +1,78 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Asynch_Queued_Message.h"
+
+ACE_RCSID(tao, Asynch_Queued_Message, "$Id$")
+
+TAO_Asynch_Queued_Message::
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents)
+ : offset_ (0)
+{
+ this->size_ = contents->total_length ();
+ // @@ Use a pool for these guys!!
+ ACE_NEW (this->buffer_, char[this->size_]);
+
+ size_t copy_offset = 0;
+ for (const ACE_Message_Block *i = contents;
+ i != 0;
+ i = i->cont ())
+ {
+ ACE_OS::memcpy (this->buffer_ + copy_offset,
+ i->rd_ptr (),
+ i->length ());
+ copy_offset += i->length ();
+ }
+}
+
+TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
+{
+ // @@ Use a pool for these guys!
+ delete[] this->buffer_;
+}
+
+size_t
+TAO_Asynch_Queued_Message::message_length (void) const
+{
+ return this->size_ - this->offset_;
+}
+
+int
+TAO_Asynch_Queued_Message::all_data_sent (void) const
+{
+ return this->size_ == this->offset_;
+}
+
+void
+TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max,
+ int &iovcnt,
+ iovec iov[]) const
+{
+ ACE_ASSERT (iovcnt_max > iovcnt);
+
+ iov[iovcnt].iov_base = this->buffer_ + this->offset_;
+ iov[iovcnt].iov_len = this->size_ - this->offset_;
+ iovcnt++;
+}
+
+int
+TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
+{
+ size_t remaining_bytes = this->size_ - this->offset_;
+ if (byte_count > remaining_bytes)
+ {
+ this->offset_ = this->size_;
+ byte_count -= remaining_bytes;
+ return 1;
+ }
+ this->offset_ += byte_count;
+ byte_count = 0;
+ return 0;
+}
+
+void
+TAO_Asynch_Queued_Message::destroy (void)
+{
+ // @@ Maybe it comes from a pool, we should do something about it.
+ delete this;
+}
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
new file mode 100644
index 00000000000..d3c683c1238
--- /dev/null
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -0,0 +1,73 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Asynch_Queued_Message.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_ASYNCH_QUEUED_MESSAGE_H
+#define TAO_ASYNCH_QUEUED_MESSAGE_H
+#include "ace/pre.h"
+
+#include "Queued_Message.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_Asynch_Queued_Message
+ *
+ * @brief Specialize TAO_Queued_Message for asynch requests,
+ * i.e. oneways sent with SYNC_NONE policy.
+ *
+ */
+class TAO_Export TAO_Asynch_Queued_Message : public TAO_Queued_Message
+{
+public:
+ /// Constructor
+ /**
+ * @param contents The message block chain that must be sent.
+ *
+ * @todo I'm almost sure this class will require a callback
+ * interface for AMIs sent with SYNC_NONE policy. Those guys
+ * need to hear when the connection timeouts or closes, but
+ * cannot block waiting for the message to be delivered.
+ */
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents);
+
+ /// Destructor
+ virtual ~TAO_Asynch_Queued_Message (void);
+
+ /** Implement the Template Methods from TAO_Queued_Message
+ */
+ //@{
+ virtual size_t message_length (void) const;
+ virtual int all_data_sent (void) const;
+ virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
+ virtual int bytes_transferred (size_t &byte_count);
+ virtual void destroy (void);
+ //@}
+
+private:
+ /// The number of bytes in the buffer
+ size_t size_;
+
+ /// The offset in the buffer
+ /**
+ * Data up to @c offset has been sent already, only the
+ * [offset_,size_) range remains to be sent.
+ */
+ size_t offset_;
+
+ /// The buffer containing the complete message.
+ char *buffer_;
+};
+
+#include "ace/post.h"
+#endif /* TAO_ASYNCH_QUEUED_MESSAGE_H */
diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile
index 4125d329f3e..a015f6b03aa 100644
--- a/TAO/tao/Makefile
+++ b/TAO/tao/Makefile
@@ -242,6 +242,8 @@ ORB_CORE_FILES = \
Block_Flushing_Strategy \
Reactive_Flushing_Strategy \
Queued_Message \
+ Synch_Queued_Message \
+ Asynch_Queued_Message \
Message_Sent_Callback
DYNAMIC_ANY_FILES =
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 94ad78e1662..2d0e91b0388 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -10,13 +10,10 @@
ACE_RCSID(tao, Queued_Message, "$Id$")
-TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents,
- int own_contents,
- TAO_Message_Sent_Callback *callback)
- : contents_ (contents)
- , own_contents_ (own_contents)
+TAO_Queued_Message::TAO_Queued_Message (TAO_Message_Sent_Callback *callback)
+ : data_sent_successfully_ (0)
+ , connection_closed_ (0)
, callback_ (callback)
- , current_block_ (contents)
, next_ (0)
, prev_ (0)
{
@@ -24,21 +21,13 @@ TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents,
TAO_Queued_Message::~TAO_Queued_Message (void)
{
- if (this->own_contents_)
- {
- ACE_Message_Block *i = this->contents_;
- while (i != 0)
- {
- ACE_Message_Block *cont = i->cont (); i->cont (0);
- ACE_Message_Block::release (i);
- i = cont;
- }
- }
}
void
TAO_Queued_Message::connection_closed (void)
{
+ this->connection_closed_ = 1;
+
if (this->callback_ != 0)
{
if (this->done ())
@@ -53,28 +42,6 @@ TAO_Queued_Message::connection_closed (void)
}
void
-TAO_Queued_Message::destroy (void)
-{
- delete this;
-}
-
-void
-TAO_Queued_Message::bytes_transferred (size_t byte_count)
-{
- while (!this->done () && byte_count > 0)
- {
- size_t l = this->current_block_->length ();
- if (byte_count < l)
- {
- this->current_block_->rd_ptr (byte_count);
- return;
- }
- byte_count -= l;
- this->current_block_ = this->current_block_->cont ();
- }
-}
-
-void
TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head,
TAO_Queued_Message *&tail)
{
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 25d4a018b9e..c9e6296c7d5 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -26,8 +26,8 @@ class TAO_Message_Sent_Callback;
/**
* @class TAO_Queued_Message
*
- * @brief Implement an queued message for the outgoing path in the
- * TAO_Transport class
+ * @brief Represent messages queued in the outgoing data path of the
+ * TAO_Transport class.
*
* Please read the documentation in the TAO_Transport class to find
* out more about the design of the outgoing data path.
@@ -66,29 +66,15 @@ class TAO_Export TAO_Queued_Message
public:
/// Constructor
/**
- * @param contents The message block chain that must be sent.
- *
- * @param own_contents If this flag is true then this object assumes
- * ownership of the contents.
- *
* @param callback A callback interface to signal any waiting
* threads about the status of the message. It is null if there are
* no waiting threads.
*/
- TAO_Queued_Message (ACE_Message_Block *contents,
- int own_contents,
- TAO_Message_Sent_Callback *callback = 0);
+ TAO_Queued_Message (TAO_Message_Sent_Callback *callback = 0);
/// Destructor
virtual ~TAO_Queued_Message (void);
- /// Get the message block
- ACE_Message_Block *mb (void) const;
-
- /// The transport has successfully sent more data, adjust internal
- /// status
- void bytes_transferred (size_t byte_count);
-
/// Return 0 if the message has not been completely sent
int done (void) const;
@@ -96,9 +82,6 @@ public:
/// signal waiting threads.
void connection_closed (void);
- /// Reclaim resources
- void destroy (void);
-
/** @name Intrusive list manipulation
*
* The messages are put in a doubled linked list (for easy insertion
@@ -143,18 +126,70 @@ public:
TAO_Queued_Message *&tail);
//@}
-private:
- /// The contents of the message.
+ /** @name Template Methods
+ */
+ //@{
+
+ /// Return the length of the message
/**
- * The message is normally generated by a TAO_OutputCDR stream. The
- * application marshals the payload, possibly generating a chain of
- * message block connected via the 'cont()' field.
+ * If the message has been partially sent it returns the number of
+ * bytes that are still not sent.
*/
- ACE_Message_Block *contents_;
+ virtual size_t message_length (void) const = 0;
- /// If not zero the @c contents_ are owned by this object
- int own_contents_;
+ /// Return 1 if all the data has been sent
+ virtual int all_data_sent (void) const = 0;
+ /// Fill up an io vector using the connects of the message
+ /**
+ * Different versions of this class represent the message using
+ * either a single buffer, or a message block.
+ * This method allows a derived class to fill up the contents of an
+ * io vector, the TAO_Transport class uses this method to group as
+ * many messages as possible in an iovector before sending them to
+ * the OS I/O subsystem.
+ *
+ * @param iovcnt_max The number of elements in iov
+ * @param iovcnt The number of elements already used by iov, this
+ * method should update this counter
+ * @param iov The io vector
+ */
+ virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0;
+
+ /// Update the internal state, data has been sent.
+ /**
+ * After the TAO_Transport class completes a successful (or
+ * partially successful) I/O operation it must update the state of
+ * all the messages queued. This callback method is used by each
+ * message to update its state and determine if all the data has
+ * been sent already.
+ *
+ * @param byte_count The number of bytes succesfully sent. The
+ * TAO_Queued_Message should decrement this value
+ * by the number of bytes that must still be sent.
+ * @return Returns 1 if the TAO_Queued_Message has any more data to
+ * send.
+ */
+ virtual int bytes_transferred (size_t &byte_count) = 0;
+
+ /// Reclaim resources
+ /**
+ * Reliable messages are allocated from the stack, thus they do not
+ * be deallocated.
+ * Asynchronous (SYNC_NONE) messages are allocated from the heap (or
+ * a pool), they need to be reclaimed explicitly.
+ */
+ virtual void destroy (void) = 0;
+ //@}
+
+protected:
+ /// Record if the send was completely successful
+ int data_sent_successfully_;
+
+ /// Set to 1 if the connection was closed
+ int connection_closed_;
+
+private:
/// If not null, this is the object that we signal to indicate that
/// the message was sent.
/**
@@ -163,13 +198,6 @@ private:
*/
TAO_Message_Sent_Callback *callback_;
- /// The current message block
- /**
- * The message may be set in multiple writev() operations. This
- * point keeps track of the next message to send out.
- */
- ACE_Message_Block *current_block_;
-
/// Implement an intrusive double-linked list for the message queue
TAO_Queued_Message *next_;
TAO_Queued_Message *prev_;
diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl
index 4a32424ca18..501ae013eac 100644
--- a/TAO/tao/Queued_Message.inl
+++ b/TAO/tao/Queued_Message.inl
@@ -1,15 +1,13 @@
// $Id$
-ACE_INLINE ACE_Message_Block *
-TAO_Queued_Message::mb (void) const
-{
- return this->current_block_;
-}
-
ACE_INLINE int
TAO_Queued_Message::done (void) const
{
- return this->current_block_ == 0;
+ // @@ Actually we should have a status() method that returns not
+ // only if there is more data, but also indicates if there was a
+ // failure.
+ return (this->data_sent_successfully_ == 1
+ || this->connection_closed_ == 1);
}
ACE_INLINE TAO_Queued_Message *
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
new file mode 100644
index 00000000000..e6aae104b00
--- /dev/null
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -0,0 +1,79 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Synch_Queued_Message.h"
+
+ACE_RCSID(tao, Synch_Queued_Message, "$Id$")
+
+TAO_Synch_Queued_Message::
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents)
+ : contents_ (ACE_const_cast (ACE_Message_Block*,contents))
+ , current_block_ (contents_)
+{
+}
+
+TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void)
+{
+}
+
+size_t
+TAO_Synch_Queued_Message::message_length (void) const
+{
+ if (this->current_block_ == 0)
+ return 0;
+ return this->current_block_->total_length ();
+}
+
+int
+TAO_Synch_Queued_Message::all_data_sent (void) const
+{
+ return this->current_block_ == 0;
+}
+
+void
+TAO_Synch_Queued_Message::fill_iov (int iovcnt_max,
+ int &iovcnt,
+ iovec iov[]) const
+{
+ ACE_ASSERT (iovcnt_max > iovcnt);
+
+ for (const ACE_Message_Block *message_block = this->current_block_;
+ message_block != 0 && iovcnt < iovcnt_max;
+ message_block = message_block->cont ())
+ {
+ size_t message_block_length = message_block->length ();
+
+ // Check if this block has any data to be sent.
+ if (message_block_length > 0)
+ {
+ // Collect the data in the iovec.
+ iov[iovcnt].iov_base = message_block->rd_ptr ();
+ iov[iovcnt].iov_len = message_block_length;
+
+ // Increment iovec counter.
+ iovcnt++;
+ }
+ }
+}
+
+int
+TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
+{
+ while (this->current_block_ != 0 && byte_count > 0)
+ {
+ size_t l = this->current_block_->length ();
+ if (byte_count < l)
+ {
+ this->current_block_->rd_ptr (byte_count);
+ return 0;
+ }
+ byte_count -= l;
+ this->current_block_ = this->current_block_->cont ();
+ }
+ return (this->current_block_ == 0);
+}
+
+void
+TAO_Synch_Queued_Message::destroy (void)
+{
+}
diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h
new file mode 100644
index 00000000000..75f2c258e54
--- /dev/null
+++ b/TAO/tao/Synch_Queued_Message.h
@@ -0,0 +1,84 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Synch_Queued_Message.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_SYNCH_QUEUED_MESSAGE_H
+#define TAO_SYNCH_QUEUED_MESSAGE_H
+#include "ace/pre.h"
+
+#include "Queued_Message.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_Synch_Queued_Message
+ *
+ * @brief Specialize TAO_Queued_Message for synchronous requests,
+ * i.e. twoways and oneways sent with reliability better than
+ * SYNC_NONE.
+ *
+ * Reliable requests block the sending thread until the message is
+ * sent, likewise, the sending thread must be informed if the
+ * connection is closed or the message times out.
+ *
+ * In contrast oneway (and AMI) requests sent with the SYNC_NONE
+ * policy are simple discarded if the connection fails or they
+ * timeout.
+ *
+ * Another important difference is the management of the data buffer:
+ * one SYNC_NONE messages the buffer is immediately copied into a
+ * newly allocated buffer, and must be deallocated. Other types of
+ * requests use the memory allocated by the sending thread.
+ *
+ */
+class TAO_Export TAO_Synch_Queued_Message : public TAO_Queued_Message
+{
+public:
+ /// Constructor
+ /**
+ * @param contents The message block chain that must be sent.
+ */
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents);
+
+ /// Destructor
+ virtual ~TAO_Synch_Queued_Message (void);
+
+ /** Implement the Template Methods from TAO_Queued_Message
+ */
+ //@{
+ virtual size_t message_length (void) const;
+ virtual int all_data_sent (void) const;
+ virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
+ virtual int bytes_transferred (size_t &byte_count);
+ virtual void destroy (void);
+ //@}
+
+private:
+ /// The contents of the message.
+ /**
+ * The message is normally generated by a TAO_OutputCDR stream. The
+ * application marshals the payload, possibly generating a chain of
+ * message block connected via the 'cont()' field.
+ */
+ ACE_Message_Block *contents_;
+
+ /// The current message block
+ /**
+ * The message may be set in multiple writev() operations. This
+ * point keeps track of the next message to send out.
+ */
+ ACE_Message_Block *current_block_;
+};
+
+#include "ace/post.h"
+#endif /* TAO_QUEUED_MESSAGE_H */
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index ff600e2e15e..699da0322b3 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -187,6 +187,10 @@ SOURCE=.\Asynch_Invocation.cpp
# End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -267,14 +271,6 @@ SOURCE=.\CodecFactory_ORBInitializer.cpp
# End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.cpp
# End Source File
# Begin Source File
@@ -803,6 +799,10 @@ SOURCE=.\Sync_Strategies.cpp
# End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -847,6 +847,14 @@ SOURCE=.\Transport.cpp
# End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Transport_Mux_Strategy.cpp
# End Source File
# Begin Source File
@@ -923,6 +931,10 @@ SOURCE=.\Asynch_Invocation.h
# End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1007,14 +1019,6 @@ SOURCE=.\CodecFactory_ORBInitializer.h
# End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.h
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.h
# End Source File
# Begin Source File
@@ -1611,6 +1615,10 @@ SOURCE=.\Sync_Strategies.h
# End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1683,6 +1691,14 @@ SOURCE=.\Transport.inl
# End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Transport_Mux_Strategy.h
# End Source File
# Begin Source File
@@ -1795,14 +1811,6 @@ SOURCE=.\Client_Priority_Policy.i
# End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.inl
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.inl
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.i
# End Source File
# Begin Source File
@@ -2291,6 +2299,14 @@ SOURCE=.\TimeBaseS_T.i
# End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\typecode.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp
index 800b8052230..e2b4c9f35de 100644
--- a/TAO/tao/TAO_Static.dsp
+++ b/TAO/tao/TAO_Static.dsp
@@ -119,6 +119,10 @@ SOURCE=.\Asynch_Invocation.h
# End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -779,6 +783,10 @@ SOURCE=.\Sync_Strategies.h
# End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1543,6 +1551,10 @@ SOURCE=.\Asynch_Invocation.cpp
# End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -2147,6 +2159,10 @@ SOURCE=.\Sync_Strategies.cpp
# End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 730cc32fad5..a0df9546e96 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -11,7 +11,8 @@
#include "Stub.h"
#include "Sync_Strategies.h"
#include "Connection_Handler.h"
-#include "Queued_Message.h"
+#include "Synch_Queued_Message.h"
+#include "Asynch_Queued_Message.h"
#include "Flushing_Strategy.h"
#include "debug.h"
@@ -370,38 +371,82 @@ TAO_Transport::send_current_message (void)
if (this->head_ == 0)
return 1;
- size_t bytes_transferred;
+ // This is the vector used to send data, it must be declared outside
+ // the loop because after the loop there may still be data to be
+ // sent
+ int iovcnt = 0;
+ iovec iov[IOV_MAX];
- ssize_t retval =
- this->send_message_block_chain (this->head_->mb (),
- bytes_transferred);
- if (retval == 0)
+ // We loop over all the elements in the queue ...
+ TAO_Queued_Message *i = this->head_;
+ while (i != 0)
{
- // The connection was closed, return -1 to have the Reactor
- // close this transport and event handler
- return -1;
- }
+ // ... each element fills the iovector ...
+ i->fill_iov (IOV_MAX, iovcnt, iov);
- // Because there can be a partial transfer we need to adjust the
- // number of bytes sent.
- this->head_->bytes_transferred (bytes_transferred);
- if (this->head_->done ())
- {
- // Remove the current message....
- TAO_Queued_Message *head = this->head_;
- head->remove_from_list (this->head_, this->tail_);
+ // ... if the vector is not full we tack another message into
+ // the vector ...
+ if (iovcnt != IOV_MAX)
+ {
+ // Go for the next element in the list
+ i = i->next ();
+ continue;
+ }
+
+ // ... time to send data because the vector is full. We need to
+ // loop because a single message can span multiple IOV_MAX
+ // elements ...
+ while (iovcnt == IOV_MAX)
+ {
+ size_t byte_count;
+
+ // ... send the message ...
+ ssize_t retval =
+ this->send (iov, iovcnt, byte_count);
+
+ // ... now we need to update the queue, removing elements
+ // that have been sent, and updating the last element if it
+ // was only partially sent ...
+ this->bytes_transferred_i (byte_count, i);
+ iovcnt = 0;
+
+ if (retval == 0)
+ {
+ return -1;
+ }
+ else if (retval == -1)
+ {
+ if (errno == EWOULDBLOCK || errno == ETIME)
+ return 0;
+ return -1;
+ }
- head->destroy ();
+ if (i == 0)
+ break;
+
+ /// Message <i> may have been only partially sent...
+ i->fill_iov (IOV_MAX, iovcnt, iov);
+ }
+
+ if (i != 0)
+ i = i->next ();
}
- if (retval == -1)
+ size_t byte_count;
+ ssize_t retval =
+ this->send (iov, iovcnt, byte_count);
+
+ this->bytes_transferred_i (byte_count, i);
+ iovcnt = 0;
+
+ if (retval == 0)
+ {
+ return -1;
+ }
+ else if (retval == -1)
{
- // ... timeouts and flow control are not real errors, the
- // connection is still valid and we must continue sending the
- // current message ...
if (errno == EWOULDBLOCK || errno == ETIME)
return 0;
-
return -1;
}
@@ -434,7 +479,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
must_queue = 1;
}
- TAO_Queued_Message *queued_message = 0;
if (must_queue)
{
// ... simply queue the message ...
@@ -447,165 +491,145 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
this->id ()));
}
- queued_message =
- this->copy_message_block (message_block);
-
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
queued_message->push_back (this->head_, this->tail_);
- }
- else
- {
- // ... in this case we must try to send the message first ...
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- "trying to send the message\n",
- this->id ()));
- }
-
- size_t byte_count;
-
- // @@ I don't think we want to hold the mutex here, however if
- // we release it we need to recheck the status of the transport
- // after we return... once I understand the final form for this
- // code I will re-visit this decision
- ssize_t n = this->send_message_block_chain (message_block,
- byte_count,
- max_wait_time);
- if (n == 0)
- return -1; // EOF
- else if (n == -1)
- {
- // ... if this is just an EWOULDBLOCK we must schedule the
- // message for later ...
- if (errno != EWOULDBLOCK)
- {
- return -1;
- }
- }
-
- // ... let's figure out if the complete message was sent ...
- if (message_block->total_length () == byte_count)
- {
- // Done, just return. Notice that there are no allocations
- // or copies up to this point (though some fancy calling
- // back and forth).
- // This is the common case for the critical path, it should
- // be fast.
- return 0;
- }
- // ... the message was only partially sent, schedule reactive
- // output...
- flushing_strategy->schedule_output (this);
-
- // ... and set it as the current message ...
- if (twoway_flag)
- {
- // ... we are going to block, so there is no need to clone
- // the message block...
- // @@ It seems wasteful to allocate a TAO_Queued_Message in
- // this case, but it is simpler to do it this way.
- ACE_NEW_RETURN (queued_message,
- TAO_Queued_Message (
- ACE_const_cast(ACE_Message_Block*,message_block),
- 0),
- -1);
- }
- else
+ if (this->must_flush_queue_i (stub))
{
- queued_message =
- this->copy_message_block (message_block);
- if (queued_message == 0)
- return -1;
+ ace_mon.release ();
+ int result = flushing_strategy->flush_message (this,
+ this->tail_);
+ return result;
}
+ return 0;
+ }
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued anyway, %d bytes sent\n",
- this->id (),
- byte_count));
+ // ... in this case we must try to send the message first ...
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued message contains %d bytes, %d transferred\n",
- this->id (),
- queued_message->mb ()->total_length (),
- byte_count));
- }
-
- queued_message->bytes_transferred (byte_count);
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "trying to send the message\n",
+ this->id ()));
+ }
- if (TAO_debug_level > 6)
+ size_t byte_count;
+
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this decision
+ ssize_t n = this->send_message_block_chain (message_block,
+ byte_count,
+ max_wait_time);
+ if (n == 0)
+ return -1; // EOF
+ else if (n == -1)
+ {
+ // ... if this is just an EWOULDBLOCK we must schedule the
+ // message for later ...
+ if (errno != EWOULDBLOCK)
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued message still has %d bytes to go\n",
- this->id (),
- queued_message->mb ()->total_length ()));
+ return -1;
}
+ }
- // ... insert at the head of the queue, we can use push_back()
- // because the queue is empty ...
- queued_message->push_back (this->head_, this->tail_);
+ // ... let's figure out if the complete message was sent ...
+ if (message_block->total_length () == byte_count)
+ {
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This is the common case for the critical path, it should
+ // be fast.
+ return 0;
}
- // ... two choices, this is a twoway request or not, if it is
- // then we must only return once the complete message has been
- // sent:
+ // ... the message was only partially sent, schedule reactive
+ // output...
+ flushing_strategy->schedule_output (this);
+ // ... and set it as the current message ...
if (twoway_flag)
{
+ // ... we are going to block, so there is no need to clone
+ // the message block...
+ // @@ It seems wasteful to allocate a TAO_Queued_Message in
+ // this case, but it is simpler to do it this way.
+ TAO_Synch_Queued_Message synch_message (message_block);
+
+ synch_message.bytes_transferred (byte_count);
+ synch_message.push_back (this->head_, this->tail_);
+
// Release the mutex, other threads may modify the queue as we
// block for a long time writing out data.
- ace_mon.release ();
- int result = flushing_strategy->flush_message (this, queued_message);
- queued_message->destroy ();
+ int result;
+ {
+ ace_mon.release ();
+ result = flushing_strategy->flush_message (this,
+ &synch_message);
+
+ ace_mon.acquire ();
+ }
+ synch_message.remove_from_list (this->head_, this->tail_);
+ synch_message.destroy ();
return result;
}
- // ... this is not a twoway. We must check if the buffering
- // constraints have been reached, if so, then we should start
- // flushing out data....
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
- // First let's compute the size of the queue:
- size_t msg_count = 0;
- size_t total_bytes = 0;
- for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ if (TAO_debug_level > 6)
{
- msg_count++;
- total_bytes += i->mb ()->total_length ();
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ " queued anyway, %d bytes sent\n",
+ this->id (),
+ byte_count));
+
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ " queued message contains %d bytes, %d transferred\n",
+ this->id (),
+ queued_message->mb ()->total_length (),
+ byte_count));
+#endif /* 0 */
}
+
+#if 0
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ " queued message still has %d bytes to go\n",
+ this->id (),
+ queued_message->mb ()->total_length ()));
+ }
+#endif /* 0 */
- int set_timer;
- ACE_Time_Value interval;
+ // ... insert at the head of the queue, we can use push_back()
+ // because the queue is empty ...
- if (stub->sync_strategy ().buffering_constraints_reached (stub,
- msg_count,
- total_bytes,
- set_timer,
- interval))
+ queued_message->push_back (this->head_, this->tail_);
+
+ // ... this is not a twoway. We must check if the buffering
+ // constraints have been reached, if so, then we should start
+ // flushing out data....
+
+ if (this->must_flush_queue_i (stub))
{
ace_mon.release ();
- // @@ memory management of the queued messages is getting tricky.
int result = flushing_strategy->flush_message (this,
this->tail_);
return result;
}
- else
- {
- // ... it is not time to flush yet, but maybe we need to set a
- // timer ...
- if (set_timer)
- {
- // @@ We need to schedule the timer. We should also be
- // careful not to schedule one if there is one scheduled
- // already.
- }
- }
return 0;
}
@@ -708,9 +732,6 @@ TAO_Transport::mark_invalid (void)
// @@ Do we need this method at all??
this->orb_core_->transport_cache ().mark_invalid (
this->cache_map_entry_);
-
-
-
}
int
@@ -735,8 +756,14 @@ TAO_Transport::close_connection (void)
// Purge the entry
this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_);
+
+ for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ {
+ i->connection_closed ();
+ }
}
+#if 0
TAO_Queued_Message *
TAO_Transport::copy_message_block (const ACE_Message_Block *message_block)
{
@@ -756,6 +783,7 @@ TAO_Transport::copy_message_block (const ACE_Message_Block *message_block)
return msg;
}
+#endif /* 0 */
ssize_t
TAO_Transport::send (iovec *iov, int iovcnt,
@@ -900,3 +928,72 @@ TAO_Transport::cancel_output (void)
return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
}
+
+int
+TAO_Transport::must_flush_queue_i (TAO_Stub *stub)
+{
+ // First let's compute the size of the queue:
+ size_t msg_count = 0;
+ size_t total_bytes = 0;
+ for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ {
+ msg_count++;
+ total_bytes += i->message_length ();
+ }
+
+ int set_timer;
+ ACE_Time_Value interval;
+
+ if (stub->sync_strategy ().buffering_constraints_reached (stub,
+ msg_count,
+ total_bytes,
+ set_timer,
+ interval))
+ {
+ return 1;
+ }
+
+ // ... it is not time to flush yet, but maybe we need to set a
+ // timer ...
+ if (set_timer)
+ {
+ // @@ We need to schedule the timer. We should also be
+ // careful not to schedule one if there is one scheduled
+ // already.
+ }
+
+ return 0;
+}
+
+void
+TAO_Transport::bytes_transferred_i (size_t byte_count,
+ TAO_Queued_Message *&iterator)
+{
+ TAO_Queued_Message *i = this->head_;
+ while (i != iterator && byte_count > 0)
+ {
+ // Update the state of each queued message
+ i->bytes_transferred (byte_count);
+ // ... if all the data was sent the message must be removed from
+ // the queue...
+ TAO_Queued_Message *tmp = i->next ();
+ if (i->all_data_sent ())
+ {
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ }
+ i = tmp;
+ }
+ // ... all the data has been taken care of ...
+ if (byte_count == 0 || iterator == 0)
+ return;
+
+ iterator->bytes_transferred (byte_count);
+ TAO_Queued_Message *tmp = iterator->next ();
+ if (iterator->all_data_sent ())
+ {
+ iterator->remove_from_list (this->head_, this->tail_);
+ iterator->destroy ();
+ iterator = tmp;
+ }
+}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index f51102285cc..5fee5d7cf6d 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -603,17 +603,17 @@ public:
/// for a timer.
/**
* At this point, only
- * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code>
+ * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code>
* uses this, and it's unclear whether it needs to stay around.
* But, it's here because it uses the associated protocol-specific
* connection handler, and accesses to that must be serialized on
- * the internal lock.
+ * the internal lock.
*
* @param arg argument passed to the handle_timeout() method of the
- * event handler
+ * event handler
* @param delay time interval after which the timer will expire
* @param interval time interval after which the timer will be
- * automatically rescheduled
+ * automatically rescheduled
* @return -1 on failure, a Reactor timer_id value on success
*
* @see ACE_Reactor::schedule_timer()
@@ -693,7 +693,14 @@ private:
int send_current_message (void);
/// Copy the contents of a message block into a Queued_Message
- TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb);
+ /// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb);
+
+ /// Check if the buffering constraints have been reached
+ int must_flush_queue_i (TAO_Stub *stub);
+
+ /// Update the queue, exactly <byte_count> bytes have been sent.
+ void bytes_transferred_i (size_t byte_count,
+ TAO_Queued_Message *&iterator);
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))