diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-02 01:10:05 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-02 01:10:05 +0000 |
commit | 8888c1ea1d6fcb1cc272c741b321d23f6f4438dc (patch) | |
tree | 1cfa770f352f48c319c6ab7ea66ed67c11f79156 | |
parent | bfc54a683e8acb51cc07e88c42d1626bd60794fd (diff) | |
download | ATCD-8888c1ea1d6fcb1cc272c741b321d23f6f4438dc.tar.gz |
ChangeLogTag:Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 62 | ||||
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.cpp | 78 | ||||
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.h | 73 | ||||
-rw-r--r-- | TAO/tao/Makefile | 2 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 43 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 98 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.inl | 12 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.cpp | 79 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.h | 84 | ||||
-rw-r--r-- | TAO/tao/TAO.dsp | 64 | ||||
-rw-r--r-- | TAO/tao/TAO_Static.dsp | 16 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 409 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 17 |
13 files changed, 772 insertions, 265 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 0197b4a5dce..a7fc781fd85 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,65 @@ +Sun Apr 01 15:34:32 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Makefile: + * tao/TAO.dsp: + * tao/TAO_Static.dsp: + * tao/Asynch_Queued_Message.h: + * tao/Asynch_Queued_Message.cpp: + * tao/Synch_Queued_Message.h: + * tao/Synch_Queued_Message.cpp: + * tao/Queued_Message.h: + * tao/Queued_Message.inl: + * tao/Queued_Message.cpp: + Specialize the Queue_Message class for Synchronous and + Asynchronous messages. Their behavior is completely different: + synchronous messages (twoways and reliable oneways) are + allocated from the stack, they should not copy the CDR stream + and thus have to deal with message block chains. + Asynchronous messages (oneways and AMIs with SYNC_NONE policy) + are allocated from the heap, they must copy their data and thus + can reassemble it in a single buffer. + + * tao/Transport.h: + * tao/Transport.cpp: + Changed the transport to use the new interface in the + Queued_Message class. + Completely separate the synchronous and asynchronous operation + path. + The new implementation recovers some functionality lost in + previous revisions: multiple messages can be sent in a single + iovector. + + * tests/Big_Oneways/Test.idl: + * tests/Big_Oneways/Session.h: + * tests/Big_Oneways/Session.cpp: + Add ping() operation to the Session IDL interface. + This is used to validate the session during startup, i.e. ensure + that enough connections are available for all the threads. + +Sat Mar 31 14:56:37 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Transport.cpp: + Fixed memory management and synchronization problems. + Invoke the connection_closed() method on all the pending + messages if the connection is closed. + + * tao/Queued_Message.h: + * tao/Queued_Message.inl: + * tao/Queued_Message.cpp: + If the connection is closed there is no sense in trying to + continue sending the message. + The done() method returns 1 if the connection was closed or if + the message was completely sent. + + * tao/Reactive_Flushing_Strategy.cpp: + Propagate any errors from Transport::schedule_output() and + Transport::cancel_output() + + * tests/Big_Request_Muxing/Big_Request_Muxing.dsw: + * tests/Big_Request_Muxing/client.dsp: + * tests/Big_Request_Muxing/server.dsp: + Add MSVC project files + Fri Mar 30 17:06:33 2001 Carlos O'Ryan <coryan@uci.edu> * tests/README: diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp new file mode 100644 index 00000000000..1123eb99bd0 --- /dev/null +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -0,0 +1,78 @@ +// -*- C++ -*- +// $Id$ + +#include "Asynch_Queued_Message.h" + +ACE_RCSID(tao, Asynch_Queued_Message, "$Id$") + +TAO_Asynch_Queued_Message:: + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents) + : offset_ (0) +{ + this->size_ = contents->total_length (); + // @@ Use a pool for these guys!! + ACE_NEW (this->buffer_, char[this->size_]); + + size_t copy_offset = 0; + for (const ACE_Message_Block *i = contents; + i != 0; + i = i->cont ()) + { + ACE_OS::memcpy (this->buffer_ + copy_offset, + i->rd_ptr (), + i->length ()); + copy_offset += i->length (); + } +} + +TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void) +{ + // @@ Use a pool for these guys! + delete[] this->buffer_; +} + +size_t +TAO_Asynch_Queued_Message::message_length (void) const +{ + return this->size_ - this->offset_; +} + +int +TAO_Asynch_Queued_Message::all_data_sent (void) const +{ + return this->size_ == this->offset_; +} + +void +TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max, + int &iovcnt, + iovec iov[]) const +{ + ACE_ASSERT (iovcnt_max > iovcnt); + + iov[iovcnt].iov_base = this->buffer_ + this->offset_; + iov[iovcnt].iov_len = this->size_ - this->offset_; + iovcnt++; +} + +int +TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count) +{ + size_t remaining_bytes = this->size_ - this->offset_; + if (byte_count > remaining_bytes) + { + this->offset_ = this->size_; + byte_count -= remaining_bytes; + return 1; + } + this->offset_ += byte_count; + byte_count = 0; + return 0; +} + +void +TAO_Asynch_Queued_Message::destroy (void) +{ + // @@ Maybe it comes from a pool, we should do something about it. + delete this; +} diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h new file mode 100644 index 00000000000..d3c683c1238 --- /dev/null +++ b/TAO/tao/Asynch_Queued_Message.h @@ -0,0 +1,73 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Asynch_Queued_Message.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_ASYNCH_QUEUED_MESSAGE_H +#define TAO_ASYNCH_QUEUED_MESSAGE_H +#include "ace/pre.h" + +#include "Queued_Message.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Asynch_Queued_Message + * + * @brief Specialize TAO_Queued_Message for asynch requests, + * i.e. oneways sent with SYNC_NONE policy. + * + */ +class TAO_Export TAO_Asynch_Queued_Message : public TAO_Queued_Message +{ +public: + /// Constructor + /** + * @param contents The message block chain that must be sent. + * + * @todo I'm almost sure this class will require a callback + * interface for AMIs sent with SYNC_NONE policy. Those guys + * need to hear when the connection timeouts or closes, but + * cannot block waiting for the message to be delivered. + */ + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents); + + /// Destructor + virtual ~TAO_Asynch_Queued_Message (void); + + /** Implement the Template Methods from TAO_Queued_Message + */ + //@{ + virtual size_t message_length (void) const; + virtual int all_data_sent (void) const; + virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; + virtual int bytes_transferred (size_t &byte_count); + virtual void destroy (void); + //@} + +private: + /// The number of bytes in the buffer + size_t size_; + + /// The offset in the buffer + /** + * Data up to @c offset has been sent already, only the + * [offset_,size_) range remains to be sent. + */ + size_t offset_; + + /// The buffer containing the complete message. + char *buffer_; +}; + +#include "ace/post.h" +#endif /* TAO_ASYNCH_QUEUED_MESSAGE_H */ diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index 4125d329f3e..a015f6b03aa 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -242,6 +242,8 @@ ORB_CORE_FILES = \ Block_Flushing_Strategy \ Reactive_Flushing_Strategy \ Queued_Message \ + Synch_Queued_Message \ + Asynch_Queued_Message \ Message_Sent_Callback DYNAMIC_ANY_FILES = diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 94ad78e1662..2d0e91b0388 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -10,13 +10,10 @@ ACE_RCSID(tao, Queued_Message, "$Id$") -TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents, - int own_contents, - TAO_Message_Sent_Callback *callback) - : contents_ (contents) - , own_contents_ (own_contents) +TAO_Queued_Message::TAO_Queued_Message (TAO_Message_Sent_Callback *callback) + : data_sent_successfully_ (0) + , connection_closed_ (0) , callback_ (callback) - , current_block_ (contents) , next_ (0) , prev_ (0) { @@ -24,21 +21,13 @@ TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents, TAO_Queued_Message::~TAO_Queued_Message (void) { - if (this->own_contents_) - { - ACE_Message_Block *i = this->contents_; - while (i != 0) - { - ACE_Message_Block *cont = i->cont (); i->cont (0); - ACE_Message_Block::release (i); - i = cont; - } - } } void TAO_Queued_Message::connection_closed (void) { + this->connection_closed_ = 1; + if (this->callback_ != 0) { if (this->done ()) @@ -53,28 +42,6 @@ TAO_Queued_Message::connection_closed (void) } void -TAO_Queued_Message::destroy (void) -{ - delete this; -} - -void -TAO_Queued_Message::bytes_transferred (size_t byte_count) -{ - while (!this->done () && byte_count > 0) - { - size_t l = this->current_block_->length (); - if (byte_count < l) - { - this->current_block_->rd_ptr (byte_count); - return; - } - byte_count -= l; - this->current_block_ = this->current_block_->cont (); - } -} - -void TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head, TAO_Queued_Message *&tail) { diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 25d4a018b9e..c9e6296c7d5 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -26,8 +26,8 @@ class TAO_Message_Sent_Callback; /** * @class TAO_Queued_Message * - * @brief Implement an queued message for the outgoing path in the - * TAO_Transport class + * @brief Represent messages queued in the outgoing data path of the + * TAO_Transport class. * * Please read the documentation in the TAO_Transport class to find * out more about the design of the outgoing data path. @@ -66,29 +66,15 @@ class TAO_Export TAO_Queued_Message public: /// Constructor /** - * @param contents The message block chain that must be sent. - * - * @param own_contents If this flag is true then this object assumes - * ownership of the contents. - * * @param callback A callback interface to signal any waiting * threads about the status of the message. It is null if there are * no waiting threads. */ - TAO_Queued_Message (ACE_Message_Block *contents, - int own_contents, - TAO_Message_Sent_Callback *callback = 0); + TAO_Queued_Message (TAO_Message_Sent_Callback *callback = 0); /// Destructor virtual ~TAO_Queued_Message (void); - /// Get the message block - ACE_Message_Block *mb (void) const; - - /// The transport has successfully sent more data, adjust internal - /// status - void bytes_transferred (size_t byte_count); - /// Return 0 if the message has not been completely sent int done (void) const; @@ -96,9 +82,6 @@ public: /// signal waiting threads. void connection_closed (void); - /// Reclaim resources - void destroy (void); - /** @name Intrusive list manipulation * * The messages are put in a doubled linked list (for easy insertion @@ -143,18 +126,70 @@ public: TAO_Queued_Message *&tail); //@} -private: - /// The contents of the message. + /** @name Template Methods + */ + //@{ + + /// Return the length of the message /** - * The message is normally generated by a TAO_OutputCDR stream. The - * application marshals the payload, possibly generating a chain of - * message block connected via the 'cont()' field. + * If the message has been partially sent it returns the number of + * bytes that are still not sent. */ - ACE_Message_Block *contents_; + virtual size_t message_length (void) const = 0; - /// If not zero the @c contents_ are owned by this object - int own_contents_; + /// Return 1 if all the data has been sent + virtual int all_data_sent (void) const = 0; + /// Fill up an io vector using the connects of the message + /** + * Different versions of this class represent the message using + * either a single buffer, or a message block. + * This method allows a derived class to fill up the contents of an + * io vector, the TAO_Transport class uses this method to group as + * many messages as possible in an iovector before sending them to + * the OS I/O subsystem. + * + * @param iovcnt_max The number of elements in iov + * @param iovcnt The number of elements already used by iov, this + * method should update this counter + * @param iov The io vector + */ + virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0; + + /// Update the internal state, data has been sent. + /** + * After the TAO_Transport class completes a successful (or + * partially successful) I/O operation it must update the state of + * all the messages queued. This callback method is used by each + * message to update its state and determine if all the data has + * been sent already. + * + * @param byte_count The number of bytes succesfully sent. The + * TAO_Queued_Message should decrement this value + * by the number of bytes that must still be sent. + * @return Returns 1 if the TAO_Queued_Message has any more data to + * send. + */ + virtual int bytes_transferred (size_t &byte_count) = 0; + + /// Reclaim resources + /** + * Reliable messages are allocated from the stack, thus they do not + * be deallocated. + * Asynchronous (SYNC_NONE) messages are allocated from the heap (or + * a pool), they need to be reclaimed explicitly. + */ + virtual void destroy (void) = 0; + //@} + +protected: + /// Record if the send was completely successful + int data_sent_successfully_; + + /// Set to 1 if the connection was closed + int connection_closed_; + +private: /// If not null, this is the object that we signal to indicate that /// the message was sent. /** @@ -163,13 +198,6 @@ private: */ TAO_Message_Sent_Callback *callback_; - /// The current message block - /** - * The message may be set in multiple writev() operations. This - * point keeps track of the next message to send out. - */ - ACE_Message_Block *current_block_; - /// Implement an intrusive double-linked list for the message queue TAO_Queued_Message *next_; TAO_Queued_Message *prev_; diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl index 4a32424ca18..501ae013eac 100644 --- a/TAO/tao/Queued_Message.inl +++ b/TAO/tao/Queued_Message.inl @@ -1,15 +1,13 @@ // $Id$ -ACE_INLINE ACE_Message_Block * -TAO_Queued_Message::mb (void) const -{ - return this->current_block_; -} - ACE_INLINE int TAO_Queued_Message::done (void) const { - return this->current_block_ == 0; + // @@ Actually we should have a status() method that returns not + // only if there is more data, but also indicates if there was a + // failure. + return (this->data_sent_successfully_ == 1 + || this->connection_closed_ == 1); } ACE_INLINE TAO_Queued_Message * diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp new file mode 100644 index 00000000000..e6aae104b00 --- /dev/null +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -0,0 +1,79 @@ +// -*- C++ -*- +// $Id$ + +#include "Synch_Queued_Message.h" + +ACE_RCSID(tao, Synch_Queued_Message, "$Id$") + +TAO_Synch_Queued_Message:: + TAO_Synch_Queued_Message (const ACE_Message_Block *contents) + : contents_ (ACE_const_cast (ACE_Message_Block*,contents)) + , current_block_ (contents_) +{ +} + +TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void) +{ +} + +size_t +TAO_Synch_Queued_Message::message_length (void) const +{ + if (this->current_block_ == 0) + return 0; + return this->current_block_->total_length (); +} + +int +TAO_Synch_Queued_Message::all_data_sent (void) const +{ + return this->current_block_ == 0; +} + +void +TAO_Synch_Queued_Message::fill_iov (int iovcnt_max, + int &iovcnt, + iovec iov[]) const +{ + ACE_ASSERT (iovcnt_max > iovcnt); + + for (const ACE_Message_Block *message_block = this->current_block_; + message_block != 0 && iovcnt < iovcnt_max; + message_block = message_block->cont ()) + { + size_t message_block_length = message_block->length (); + + // Check if this block has any data to be sent. + if (message_block_length > 0) + { + // Collect the data in the iovec. + iov[iovcnt].iov_base = message_block->rd_ptr (); + iov[iovcnt].iov_len = message_block_length; + + // Increment iovec counter. + iovcnt++; + } + } +} + +int +TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count) +{ + while (this->current_block_ != 0 && byte_count > 0) + { + size_t l = this->current_block_->length (); + if (byte_count < l) + { + this->current_block_->rd_ptr (byte_count); + return 0; + } + byte_count -= l; + this->current_block_ = this->current_block_->cont (); + } + return (this->current_block_ == 0); +} + +void +TAO_Synch_Queued_Message::destroy (void) +{ +} diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h new file mode 100644 index 00000000000..75f2c258e54 --- /dev/null +++ b/TAO/tao/Synch_Queued_Message.h @@ -0,0 +1,84 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Synch_Queued_Message.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_SYNCH_QUEUED_MESSAGE_H +#define TAO_SYNCH_QUEUED_MESSAGE_H +#include "ace/pre.h" + +#include "Queued_Message.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Synch_Queued_Message + * + * @brief Specialize TAO_Queued_Message for synchronous requests, + * i.e. twoways and oneways sent with reliability better than + * SYNC_NONE. + * + * Reliable requests block the sending thread until the message is + * sent, likewise, the sending thread must be informed if the + * connection is closed or the message times out. + * + * In contrast oneway (and AMI) requests sent with the SYNC_NONE + * policy are simple discarded if the connection fails or they + * timeout. + * + * Another important difference is the management of the data buffer: + * one SYNC_NONE messages the buffer is immediately copied into a + * newly allocated buffer, and must be deallocated. Other types of + * requests use the memory allocated by the sending thread. + * + */ +class TAO_Export TAO_Synch_Queued_Message : public TAO_Queued_Message +{ +public: + /// Constructor + /** + * @param contents The message block chain that must be sent. + */ + TAO_Synch_Queued_Message (const ACE_Message_Block *contents); + + /// Destructor + virtual ~TAO_Synch_Queued_Message (void); + + /** Implement the Template Methods from TAO_Queued_Message + */ + //@{ + virtual size_t message_length (void) const; + virtual int all_data_sent (void) const; + virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; + virtual int bytes_transferred (size_t &byte_count); + virtual void destroy (void); + //@} + +private: + /// The contents of the message. + /** + * The message is normally generated by a TAO_OutputCDR stream. The + * application marshals the payload, possibly generating a chain of + * message block connected via the 'cont()' field. + */ + ACE_Message_Block *contents_; + + /// The current message block + /** + * The message may be set in multiple writev() operations. This + * point keeps track of the next message to send out. + */ + ACE_Message_Block *current_block_; +}; + +#include "ace/post.h" +#endif /* TAO_QUEUED_MESSAGE_H */ diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index ff600e2e15e..699da0322b3 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -187,6 +187,10 @@ SOURCE=.\Asynch_Invocation.cpp # End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -267,14 +271,6 @@ SOURCE=.\CodecFactory_ORBInitializer.cpp # End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.cpp
# End Source File
# Begin Source File
@@ -803,6 +799,10 @@ SOURCE=.\Sync_Strategies.cpp # End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -847,6 +847,14 @@ SOURCE=.\Transport.cpp # End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Transport_Mux_Strategy.cpp
# End Source File
# Begin Source File
@@ -923,6 +931,10 @@ SOURCE=.\Asynch_Invocation.h # End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1007,14 +1019,6 @@ SOURCE=.\CodecFactory_ORBInitializer.h # End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.h
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.h
# End Source File
# Begin Source File
@@ -1611,6 +1615,10 @@ SOURCE=.\Sync_Strategies.h # End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1683,6 +1691,14 @@ SOURCE=.\Transport.inl # End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Transport_Mux_Strategy.h
# End Source File
# Begin Source File
@@ -1795,14 +1811,6 @@ SOURCE=.\Client_Priority_Policy.i # End Source File
# Begin Source File
-SOURCE=.\Transport_Cache_Manager.inl
-# End Source File
-# Begin Source File
-
-SOURCE=.\Transport_Descriptor_Interface.inl
-# End Source File
-# Begin Source File
-
SOURCE=.\Connection_Handler.i
# End Source File
# Begin Source File
@@ -2291,6 +2299,14 @@ SOURCE=.\TimeBaseS_T.i # End Source File
# Begin Source File
+SOURCE=.\Transport_Cache_Manager.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\Transport_Descriptor_Interface.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\typecode.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp index 800b8052230..e2b4c9f35de 100644 --- a/TAO/tao/TAO_Static.dsp +++ b/TAO/tao/TAO_Static.dsp @@ -119,6 +119,10 @@ SOURCE=.\Asynch_Invocation.h # End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -779,6 +783,10 @@ SOURCE=.\Sync_Strategies.h # End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -1543,6 +1551,10 @@ SOURCE=.\Asynch_Invocation.cpp # End Source File
# Begin Source File
+SOURCE=.\Asynch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Asynch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -2147,6 +2159,10 @@ SOURCE=.\Sync_Strategies.cpp # End Source File
# Begin Source File
+SOURCE=.\Synch_Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 730cc32fad5..a0df9546e96 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -11,7 +11,8 @@ #include "Stub.h" #include "Sync_Strategies.h" #include "Connection_Handler.h" -#include "Queued_Message.h" +#include "Synch_Queued_Message.h" +#include "Asynch_Queued_Message.h" #include "Flushing_Strategy.h" #include "debug.h" @@ -370,38 +371,82 @@ TAO_Transport::send_current_message (void) if (this->head_ == 0) return 1; - size_t bytes_transferred; + // This is the vector used to send data, it must be declared outside + // the loop because after the loop there may still be data to be + // sent + int iovcnt = 0; + iovec iov[IOV_MAX]; - ssize_t retval = - this->send_message_block_chain (this->head_->mb (), - bytes_transferred); - if (retval == 0) + // We loop over all the elements in the queue ... + TAO_Queued_Message *i = this->head_; + while (i != 0) { - // The connection was closed, return -1 to have the Reactor - // close this transport and event handler - return -1; - } + // ... each element fills the iovector ... + i->fill_iov (IOV_MAX, iovcnt, iov); - // Because there can be a partial transfer we need to adjust the - // number of bytes sent. - this->head_->bytes_transferred (bytes_transferred); - if (this->head_->done ()) - { - // Remove the current message.... - TAO_Queued_Message *head = this->head_; - head->remove_from_list (this->head_, this->tail_); + // ... if the vector is not full we tack another message into + // the vector ... + if (iovcnt != IOV_MAX) + { + // Go for the next element in the list + i = i->next (); + continue; + } + + // ... time to send data because the vector is full. We need to + // loop because a single message can span multiple IOV_MAX + // elements ... + while (iovcnt == IOV_MAX) + { + size_t byte_count; + + // ... send the message ... + ssize_t retval = + this->send (iov, iovcnt, byte_count); + + // ... now we need to update the queue, removing elements + // that have been sent, and updating the last element if it + // was only partially sent ... + this->bytes_transferred_i (byte_count, i); + iovcnt = 0; + + if (retval == 0) + { + return -1; + } + else if (retval == -1) + { + if (errno == EWOULDBLOCK || errno == ETIME) + return 0; + return -1; + } - head->destroy (); + if (i == 0) + break; + + /// Message <i> may have been only partially sent... + i->fill_iov (IOV_MAX, iovcnt, iov); + } + + if (i != 0) + i = i->next (); } - if (retval == -1) + size_t byte_count; + ssize_t retval = + this->send (iov, iovcnt, byte_count); + + this->bytes_transferred_i (byte_count, i); + iovcnt = 0; + + if (retval == 0) + { + return -1; + } + else if (retval == -1) { - // ... timeouts and flow control are not real errors, the - // connection is still valid and we must continue sending the - // current message ... if (errno == EWOULDBLOCK || errno == ETIME) return 0; - return -1; } @@ -434,7 +479,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub, must_queue = 1; } - TAO_Queued_Message *queued_message = 0; if (must_queue) { // ... simply queue the message ... @@ -447,165 +491,145 @@ TAO_Transport::send_message_i (TAO_Stub *stub, this->id ())); } - queued_message = - this->copy_message_block (message_block); - + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); queued_message->push_back (this->head_, this->tail_); - } - else - { - // ... in this case we must try to send the message first ... - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - "trying to send the message\n", - this->id ())); - } - - size_t byte_count; - - // @@ I don't think we want to hold the mutex here, however if - // we release it we need to recheck the status of the transport - // after we return... once I understand the final form for this - // code I will re-visit this decision - ssize_t n = this->send_message_block_chain (message_block, - byte_count, - max_wait_time); - if (n == 0) - return -1; // EOF - else if (n == -1) - { - // ... if this is just an EWOULDBLOCK we must schedule the - // message for later ... - if (errno != EWOULDBLOCK) - { - return -1; - } - } - - // ... let's figure out if the complete message was sent ... - if (message_block->total_length () == byte_count) - { - // Done, just return. Notice that there are no allocations - // or copies up to this point (though some fancy calling - // back and forth). - // This is the common case for the critical path, it should - // be fast. - return 0; - } - // ... the message was only partially sent, schedule reactive - // output... - flushing_strategy->schedule_output (this); - - // ... and set it as the current message ... - if (twoway_flag) - { - // ... we are going to block, so there is no need to clone - // the message block... - // @@ It seems wasteful to allocate a TAO_Queued_Message in - // this case, but it is simpler to do it this way. - ACE_NEW_RETURN (queued_message, - TAO_Queued_Message ( - ACE_const_cast(ACE_Message_Block*,message_block), - 0), - -1); - } - else + if (this->must_flush_queue_i (stub)) { - queued_message = - this->copy_message_block (message_block); - if (queued_message == 0) - return -1; + ace_mon.release (); + int result = flushing_strategy->flush_message (this, + this->tail_); + return result; } + return 0; + } - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued anyway, %d bytes sent\n", - this->id (), - byte_count)); + // ... in this case we must try to send the message first ... - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued message contains %d bytes, %d transferred\n", - this->id (), - queued_message->mb ()->total_length (), - byte_count)); - } - - queued_message->bytes_transferred (byte_count); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "trying to send the message\n", + this->id ())); + } - if (TAO_debug_level > 6) + size_t byte_count; + + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + ssize_t n = this->send_message_block_chain (message_block, + byte_count, + max_wait_time); + if (n == 0) + return -1; // EOF + else if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later ... + if (errno != EWOULDBLOCK) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued message still has %d bytes to go\n", - this->id (), - queued_message->mb ()->total_length ())); + return -1; } + } - // ... insert at the head of the queue, we can use push_back() - // because the queue is empty ... - queued_message->push_back (this->head_, this->tail_); + // ... let's figure out if the complete message was sent ... + if (message_block->total_length () == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; } - // ... two choices, this is a twoway request or not, if it is - // then we must only return once the complete message has been - // sent: + // ... the message was only partially sent, schedule reactive + // output... + flushing_strategy->schedule_output (this); + // ... and set it as the current message ... if (twoway_flag) { + // ... we are going to block, so there is no need to clone + // the message block... + // @@ It seems wasteful to allocate a TAO_Queued_Message in + // this case, but it is simpler to do it this way. + TAO_Synch_Queued_Message synch_message (message_block); + + synch_message.bytes_transferred (byte_count); + synch_message.push_back (this->head_, this->tail_); + // Release the mutex, other threads may modify the queue as we // block for a long time writing out data. - ace_mon.release (); - int result = flushing_strategy->flush_message (this, queued_message); - queued_message->destroy (); + int result; + { + ace_mon.release (); + result = flushing_strategy->flush_message (this, + &synch_message); + + ace_mon.acquire (); + } + synch_message.remove_from_list (this->head_, this->tail_); + synch_message.destroy (); return result; } - // ... this is not a twoway. We must check if the buffering - // constraints have been reached, if so, then we should start - // flushing out data.... + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); - // First let's compute the size of the queue: - size_t msg_count = 0; - size_t total_bytes = 0; - for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + if (TAO_debug_level > 6) { - msg_count++; - total_bytes += i->mb ()->total_length (); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + " queued anyway, %d bytes sent\n", + this->id (), + byte_count)); + +#if 0 + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + " queued message contains %d bytes, %d transferred\n", + this->id (), + queued_message->mb ()->total_length (), + byte_count)); +#endif /* 0 */ } + +#if 0 + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + " queued message still has %d bytes to go\n", + this->id (), + queued_message->mb ()->total_length ())); + } +#endif /* 0 */ - int set_timer; - ACE_Time_Value interval; + // ... insert at the head of the queue, we can use push_back() + // because the queue is empty ... - if (stub->sync_strategy ().buffering_constraints_reached (stub, - msg_count, - total_bytes, - set_timer, - interval)) + queued_message->push_back (this->head_, this->tail_); + + // ... this is not a twoway. We must check if the buffering + // constraints have been reached, if so, then we should start + // flushing out data.... + + if (this->must_flush_queue_i (stub)) { ace_mon.release (); - // @@ memory management of the queued messages is getting tricky. int result = flushing_strategy->flush_message (this, this->tail_); return result; } - else - { - // ... it is not time to flush yet, but maybe we need to set a - // timer ... - if (set_timer) - { - // @@ We need to schedule the timer. We should also be - // careful not to schedule one if there is one scheduled - // already. - } - } return 0; } @@ -708,9 +732,6 @@ TAO_Transport::mark_invalid (void) // @@ Do we need this method at all?? this->orb_core_->transport_cache ().mark_invalid ( this->cache_map_entry_); - - - } int @@ -735,8 +756,14 @@ TAO_Transport::close_connection (void) // Purge the entry this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_); + + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + i->connection_closed (); + } } +#if 0 TAO_Queued_Message * TAO_Transport::copy_message_block (const ACE_Message_Block *message_block) { @@ -756,6 +783,7 @@ TAO_Transport::copy_message_block (const ACE_Message_Block *message_block) return msg; } +#endif /* 0 */ ssize_t TAO_Transport::send (iovec *iov, int iovcnt, @@ -900,3 +928,72 @@ TAO_Transport::cancel_output (void) return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); } + +int +TAO_Transport::must_flush_queue_i (TAO_Stub *stub) +{ + // First let's compute the size of the queue: + size_t msg_count = 0; + size_t total_bytes = 0; + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + msg_count++; + total_bytes += i->message_length (); + } + + int set_timer; + ACE_Time_Value interval; + + if (stub->sync_strategy ().buffering_constraints_reached (stub, + msg_count, + total_bytes, + set_timer, + interval)) + { + return 1; + } + + // ... it is not time to flush yet, but maybe we need to set a + // timer ... + if (set_timer) + { + // @@ We need to schedule the timer. We should also be + // careful not to schedule one if there is one scheduled + // already. + } + + return 0; +} + +void +TAO_Transport::bytes_transferred_i (size_t byte_count, + TAO_Queued_Message *&iterator) +{ + TAO_Queued_Message *i = this->head_; + while (i != iterator && byte_count > 0) + { + // Update the state of each queued message + i->bytes_transferred (byte_count); + // ... if all the data was sent the message must be removed from + // the queue... + TAO_Queued_Message *tmp = i->next (); + if (i->all_data_sent ()) + { + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + } + i = tmp; + } + // ... all the data has been taken care of ... + if (byte_count == 0 || iterator == 0) + return; + + iterator->bytes_transferred (byte_count); + TAO_Queued_Message *tmp = iterator->next (); + if (iterator->all_data_sent ()) + { + iterator->remove_from_list (this->head_, this->tail_); + iterator->destroy (); + iterator = tmp; + } +} diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index f51102285cc..5fee5d7cf6d 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -603,17 +603,17 @@ public: /// for a timer. /** * At this point, only - * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code> + * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code> * uses this, and it's unclear whether it needs to stay around. * But, it's here because it uses the associated protocol-specific * connection handler, and accesses to that must be serialized on - * the internal lock. + * the internal lock. * * @param arg argument passed to the handle_timeout() method of the - * event handler + * event handler * @param delay time interval after which the timer will expire * @param interval time interval after which the timer will be - * automatically rescheduled + * automatically rescheduled * @return -1 on failure, a Reactor timer_id value on success * * @see ACE_Reactor::schedule_timer() @@ -693,7 +693,14 @@ private: int send_current_message (void); /// Copy the contents of a message block into a Queued_Message - TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); + /// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); + + /// Check if the buffering constraints have been reached + int must_flush_queue_i (TAO_Stub *stub); + + /// Update the queue, exactly <byte_count> bytes have been sent. + void bytes_transferred_i (size_t byte_count, + TAO_Queued_Message *&iterator); /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) |