diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 2003-02-11 16:21:43 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 2003-02-11 16:21:43 +0000 |
commit | 7b8e4114b6e9121492ce7335458a31e1d123b94c (patch) | |
tree | fffde55e8cf648ac3953959d8c88868014b8960c | |
parent | 1a8adc0831aa22f0be77824b98da6787c752d28e (diff) | |
download | ATCD-7b8e4114b6e9121492ce7335458a31e1d123b94c.tar.gz |
Brought over changes from 1.2 into this 1.3 branch. Still need to add a
ChangeLog entry. Everything that I expected to work works, and everything
I expected to be broken is broken, for whatever that's worth.
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 51 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 25 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Generator_Parser_Impl.inl | 13 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.cpp | 23 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.h | 23 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.cpp | 114 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.h | 37 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.inl | 18 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 352 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 140 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 15 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 21 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 411 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 40 | ||||
-rwxr-xr-x | TAO/tests/AMI/run_test.pl | 2 | ||||
-rw-r--r-- | TAO/tests/InterOp-Naming/INS_test_client.cpp | 60 | ||||
-rw-r--r-- | TAO/tests/InterOp-Naming/Makefile | 2 |
17 files changed, 1224 insertions, 123 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 9ac3f504ed7..1e3d72512d8 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -309,6 +309,7 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } +#if 0 int TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) { @@ -566,6 +567,7 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) // Reset the message_state this->message_state_.reset (); } +#endif int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, @@ -1532,6 +1534,7 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } +#if 0 TAO_Queued_Data * TAO_GIOP_Message_Base::make_queued_data (size_t sz) { @@ -1567,9 +1570,57 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz) return qd; } +#endif + + size_t TAO_GIOP_Message_Base::header_length (void) const { return TAO_GIOP_MESSAGE_HEADER_LEN; } + +void +TAO_GIOP_Message_Base::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const +{ + // @@CJC: Try leaving out the declaration for this->message_state_ + // and see what pukes. I don't think we need it any more. + TAO_GIOP_Message_State state; + if (state.take_values_from_message_block (mb) == -1) + { + // what the heck do we do here?! + qd->current_state_ = TAO_Queued_Data::INVALID; + return; + } + + // It'd be nice to have an abstract base for GIOP_Message_State + // so that there could just be a line like: + // qd->take_values_from (state); + // Get the message information + qd->byte_order_ = state.byte_order (); + qd->major_version_ = state.giop_version ().major; + qd->minor_version_ = state.giop_version ().minor; + qd->more_fragments_ = state.more_fragments () ? 1 : 0; + qd->msg_type_= message_type (state); + qd->missing_data_bytes_ = state.payload_size (); +} + +int +TAO_GIOP_Message_Base::check_for_valid_header ( + const ACE_Message_Block &mb + ) const +{ + const char* magic_bytes = "GIOP"; + ACE_ASSERT (ACE_OS::strlen (magic_bytes) < header_length ()); + if (mb.length () < header_length ()) + return 0; + + // Is finding that it's the right length and the magic bytes present + // enough to declare it a valid header? I think so... + return (ACE_OS::memcmp (mb.rd_ptr (), + magic_bytes, + ACE_OS::strlen (magic_bytes)) == 0); +} diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 1ed66ef6b70..41845f24e22 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -93,6 +93,7 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); +#if 0 /// Parse the incoming messages.. virtual int parse_incoming_messages (ACE_Message_Block &message_block); @@ -117,12 +118,34 @@ public: /// @@Bala:Docu?? virtual int consolidate_fragments (TAO_Queued_Data *dqd, const TAO_Queued_Data *sqd); +#endif /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd); + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; + + /// Parse the reply message that we received and return the reply /// information though <reply_info> @@ -172,7 +195,7 @@ protected: /// TAO_PLUGGABLE_MESSAGE_REPLY, /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); + static TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); private: diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl index 18bb7936ffd..bbb9f024c9d 100644 --- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl +++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl @@ -5,9 +5,22 @@ TAO_GIOP_Message_Generator_Parser_Impl:: check_revision (CORBA::Octet incoming_major, CORBA::Octet incoming_minor) { +#if 0 if (incoming_major > TAO_DEF_GIOP_MAJOR || incoming_minor > TAO_DEF_GIOP_MINOR) return 0; return 1; +#endif + CORBA::Boolean ret = 0; // Assume that the revision is bogus + if (incoming_major == 1) + switch (incoming_minor) + { + case 0: + case 1: + case 2: + ret = 1; + } + + return ret; } diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 6395e1afa27..5d923762331 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -237,6 +237,7 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream) } +#if 0 int TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block) { @@ -485,6 +486,7 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/, // We dont know what fragments are??? return -1; } +#endif int TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, @@ -1596,6 +1598,7 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label, } } +#if 0 TAO_Queued_Data * TAO_GIOP_Message_Lite::make_queued_data (size_t sz) { @@ -1627,6 +1630,7 @@ TAO_GIOP_Message_Lite::make_queued_data (size_t sz) return qd; } +#endif int TAO_GIOP_Message_Lite::generate_locate_reply_header ( @@ -1649,3 +1653,22 @@ TAO_GIOP_Message_Lite::header_length (void) const { return TAO_GIOP_LITE_HEADER_LEN; } + +void +TAO_GIOP_Message_Lite::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (qd); + ACE_UNUSED_ARG (mb); +} + +int +TAO_GIOP_Message_Lite::check_for_valid_header ( + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (mb); + return 0; +} diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index afe78e63136..7affd27e6b8 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -85,6 +85,7 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); +#if 0 /// Parse the incoming messages.. virtual int parse_incoming_messages (ACE_Message_Block &message_block); @@ -118,12 +119,34 @@ public: /// @@Bala: Docu??? virtual int consolidate_fragments (TAO_Queued_Data *dqd, const TAO_Queued_Data *sqd); +#endif /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd); + + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; + /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index 72d04c272f4..b86a456a140 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -11,13 +11,44 @@ # include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ +class TAO_Debug_Msg_Emitter_Guard +{ +public: + TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg) + : which_level_(debug_level) + { + this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ]; + ACE_OS::strcpy (this->msg_, msg); + ACE_OS::strcat (this->msg_, " begin\n"); + if (TAO_debug_level >= this->which_level_) + ACE_DEBUG ((LM_DEBUG, this->msg_ )); + } + + ~TAO_Debug_Msg_Emitter_Guard () + { + if (TAO_debug_level >= this->which_level_) + { + char* begin_start = + this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; + ACE_OS::strcpy (begin_start, " end\n"); + ACE_DEBUG ((LM_DEBUG, this->msg_)); + } + delete[] this->msg_; + } + +private: + static const int MAGIC_LENGTH = 8; // " begin\n" + \000 + unsigned int which_level_; + char* msg_; +}; + ACE_RCSID(tao, GIOP_Message_State, "$Id$") TAO_GIOP_Message_State::TAO_GIOP_Message_State ( - TAO_ORB_Core * /*orb_core*/, - TAO_GIOP_Message_Base *base) - : base_ (base), + TAO_ORB_Core * /*orb_core*/, + TAO_GIOP_Message_Base * /*base*/) + : /*base_ (base),*/ giop_version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), byte_order_ (0), @@ -29,7 +60,50 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State ( { } +// This doesn't check the message block's length, so that means that +// the *caller* needs to do that first. +int +TAO_GIOP_Message_State::take_values_from_message_block ( + const ACE_Message_Block& mb + ) +{ + TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::take_values_from_message_block"); + + const char* buf = mb.rd_ptr (); + + // Get the version information + if (this->set_version_info_from_buffer (buf) == -1) + return -1; + + // Get the byte order information... + if (this->set_byte_order_info_from_buffer (buf) == -1) + return -1; + + // Get the message type + this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + + // Get the size of the message.. + this->set_payload_size_from_buffer (buf); + + if (this->message_size_ == 0) + { + if (this->message_type_ == TAO_GIOP_MESSAGERROR) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: GIOP_MESSAGE_ERROR rcv'd.\n")); + return 0; + } + else + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: Message of size zero rcv'd.\n")); + return -1; + } + } + return 0; +} +#if 0 int TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) { @@ -133,21 +207,16 @@ TAO_GIOP_Message_State::parse_magic_bytes (char *buf) return 0; } +#endif int -TAO_GIOP_Message_State::get_version_info (char *buf) +TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_version_info\n")); - } + TAO_Debug_Msg_Emitter_Guard (8, ACE_TEXT("TAO (%P|%t) GIOP_Message_State::set_version_info")); // We have a GIOP message on hand. Get its revision numbers - CORBA::Octet incoming_major = - buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; - CORBA::Octet incoming_minor = - buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( @@ -157,7 +226,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"), + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:") + ACE_TEXT ("bad version <%d.%d>\n"), incoming_major, incoming_minor)); } @@ -172,15 +243,11 @@ TAO_GIOP_Message_State::get_version_info (char *buf) } int -TAO_GIOP_Message_State::get_byte_order_info (char *buf) +TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n")); - } + TAO_Debug_Msg_Emitter_Guard (8, "TAO (%P|%t) GIOP_Message_State::set_byte_order_info_from_buffer"); - // Let us be specific that this is for 1.0 + // Let us be specific that this is for 1.0 if (this->giop_version_.minor == 0 && this->giop_version_.major == 1) { @@ -225,8 +292,9 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf) } void -TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) +TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) { + TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::set_payload_size_from_buffer"); // Move the read pointer rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; @@ -263,7 +331,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf, } CORBA::ULong -TAO_GIOP_Message_State::read_ulong (char *rd_ptr) +TAO_GIOP_Message_State::read_ulong (const char *rd_ptr) { CORBA::ULong x = 0; diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index 594747dc4cc..319e37a3933 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -42,11 +42,15 @@ class TAO_Export TAO_GIOP_Message_State public: /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, - TAO_GIOP_Message_Base *base); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0, + TAO_GIOP_Message_Base *base = 0); + int take_values_from_message_block (const ACE_Message_Block& mb); + +#if 0 /// Parse the message header. int parse_message_header (ACE_Message_Block &incoming); +#endif /// Return the message size CORBA::ULong message_size (void) const; @@ -54,9 +58,24 @@ public: /// Return the message size CORBA::ULong payload_size (void) const; - /// Return the byte order information + /*! + \brief Return the byte order information. + \return 0 big-endian + \return 1 little-endian + */ CORBA::Octet byte_order (void) const; + /*! + \brief Return GIOP version information. + */ + const TAO_GIOP_Message_Version &giop_version () const; + + /// (Requests and Replys) + CORBA::Octet more_fragments () const; + + /// MsgType above + CORBA::Octet message_type () const; + /// Reset the state.. void reset (void); @@ -64,25 +83,27 @@ private: friend class TAO_GIOP_Message_Base; +#if 0 /// Parse the message header. int parse_message_header_i (ACE_Message_Block &incoming); /// Checks for the magic word 'GIOP' in the start of the incoing /// stream int parse_magic_bytes (char *buf); +#endif /// Extracts the version information from the incoming /// stream. Performs a check for whether the version information is /// right and sets the information in the <state> - int get_version_info (char *buf); + int set_version_info_from_buffer (const char *buf); /// Extracts the byte order information from the incoming /// stream. Performs a check for whether the byte order information /// right and sets the information in the <state> - int get_byte_order_info (char *buf); + int set_byte_order_info_from_buffer (const char *buf); /// Gets the size of the payload and set the size in the <state> - void get_payload_size (char *buf); + void set_payload_size_from_buffer (const char *buf); /// Parses the GIOP FRAGMENT_HEADER information from the incoming /// stream. @@ -91,12 +112,14 @@ private: /// Read the unsigned long from the buffer. The <buf> should just /// point to the next 4 bytes data that represent the ULong - CORBA::ULong read_ulong (char *buf); + CORBA::ULong read_ulong (const char *buf); private: +#if 0 /// The GIOP base class.. TAO_GIOP_Message_Base *base_; +#endif // GIOP version information.. TAO_GIOP_Message_Version giop_version_; diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index fe076bee689..f2f0011a7f6 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -33,6 +33,24 @@ TAO_GIOP_Message_State::reset (void) this->missing_data_ = 0; } +ACE_INLINE const TAO_GIOP_Message_Version & +TAO_GIOP_Message_State::giop_version () const +{ + return this->giop_version_; +} + +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::more_fragments () const +{ + return this->more_fragments_; +} + +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::message_type () const +{ + return this->message_type_; +} + #if 0 ACE_INLINE int TAO_GIOP_Message_State::message_fragmented (void) diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index de087da0565..d42f14da589 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,3 +1,13 @@ +// We're doing this because we need to extend ACE_Message_Block, but +// that interface is currently "frozen". Therefore, I'm using static +// functions in this compilation unit that I would normally add as part +// of the interface. +#define private public +#define protected public +#include "ace/Message_Block.h" +#undef private +#undef public + #include "Incoming_Message_Queue.h" #include "ORB_Core.h" #include "debug.h" @@ -41,13 +51,13 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) { // Check to see if the length of the incoming block is less than // that of the <missing_data_> of the tail. - if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) + if (block.length () <= this->queued_data_->missing_data_bytes_) { n = block.length (); } else { - n = this->queued_data_->missing_data_; + n = this->queued_data_->missing_data_bytes_; } // Do the copy @@ -55,7 +65,7 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) n); // Decerement the missing data - this->queued_data_->missing_data_ -= n; + this->queued_data_->missing_data_bytes_ -= n; } return n; @@ -132,48 +142,328 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) /************************************************************************/ TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) - : msg_block_ (0), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (0) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc) - : msg_block_ (mb), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (mb) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) - : msg_block_ (qd.msg_block_->duplicate ()), - missing_data_ (qd.missing_data_), - byte_order_ (qd.byte_order_), - major_version_ (qd.major_version_), - minor_version_ (qd.minor_version_), - more_fragments_ (qd.more_fragments_), - msg_type_ (qd.msg_type_), - next_ (0), - allocator_ (qd.allocator_) + : msg_block_ (qd.msg_block_->duplicate ()) + , current_state_ (qd.current_state_) + , missing_data_bytes_ (qd.missing_data_bytes_) + , byte_order_ (qd.byte_order_) + , major_version_ (qd.major_version_) + , minor_version_ (qd.minor_version_) + , more_fragments_ (qd.more_fragments_) + , msg_type_ (qd.msg_type_) + , next_ (0) + , allocator_ (qd.allocator_) +{ +} + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + TAO_Queued_Data *new_qd = 0; + + // Validate arguments. + if (mb == 0) + goto failure; + + new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // do we have enough bytes to make a complete header? + if (mb->length() >= msging_obj.header_length ()) + { + // Since we have enough bytes to make a complete header, + // the header needs to be valid. Check that now, and punt + // if it's not valid. + if (! msging_obj.check_for_valid_header (*mb)) + { + goto failure; + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + new_qd->msg_block_ = mb; + msging_obj.set_queued_data_from_message_header (new_qd, *mb); + // missing_data_bytes_ now has the full GIOP message size, but + // there might still be stuff in mb. Therefore, we have to adjust + // missing_data_bytes_, i.e., decrease it by the number of "actual + // payload bytes" in mb. + // + // "actual payload bytes" :== length of mb (which included the header) - header length + new_qd->missing_data_bytes_ -= (mb->length () - msging_obj.header_length ()); + //??? mb->rd_ptr (msging_obj.header_length ()); + } + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER; + new_qd->msg_block_ = mb; + new_qd->missing_data_bytes_ = msging_obj.header_length () - mb->length (); + } + + ACE_ASSERT (new_qd->current_state_ != INVALID); + if (TAO_debug_level > 7) + { + const char* s = "?unk?"; + switch (new_qd->current_state_) { + case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break; + case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break; + case INVALID: s = "INVALID"; break; + case COMPLETED: s = "COMPLETED"; break; + } + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:") + ACE_TEXT ("state=%s,missing_data_bytes=%u\n"), + mb->length(), new_qd, s, new_qd->missing_data_bytes_)); + } + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"), + mb, new_qd)); + } + TAO_Queued_Data::release (new_qd); + return 0; +} + +/*! + \brief Act like ACE_Message_Block::clone, but only clone the part btw. rd_ptr and wr_ptr. + */ +// Follow #def's swiped from Message_Block.cpp +#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) +#define ACE_EXECUTION_TIME this->execution_time_ +#define ACE_DEADLINE_TIME this->deadline_time_ +#else +#define ACE_EXECUTION_TIME ACE_Time_Value::zero +#define ACE_DEADLINE_TIME ACE_Time_Value::max_time +#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */ + +static ACE_Data_Block* +clone_span_nocopy (/*const*/ ACE_Data_Block *the_db, + const char* beg, size_t span, + ACE_Message_Block::Message_Flags mask = 0) +{ + // Allocate a new data block through the same allocator as 'the_db' + const ACE_Message_Block::Message_Flags always_clear = + ACE_Message_Block::DONT_DELETE; + + ACE_Data_Block *nb; + ACE_Allocator *db_allocator = the_db->data_block_allocator (); + + ACE_NEW_MALLOC_RETURN (nb, + ACE_static_cast(ACE_Data_Block*, + db_allocator->malloc (sizeof (ACE_Data_Block))), + ACE_Data_Block (span, // size + the_db->msg_type (), // type + 0, // data + the_db->allocator_strategy (), // allocator + the_db->locking_strategy (), // locking strategy + the_db->flags (), // flags + db_allocator), + 0); + + + // Set new flags minus the mask... + nb->clr_flags (mask | always_clear); + + // Copy in the data, and set the pointer + // ACE_OS::memcpy (nb->base (), beg, span); + + return nb; +} + +static ACE_Message_Block* +clone_span (/*const*/ ACE_Message_Block *the_mb, size_t span_size, ACE_Message_Block::Message_Flags mask = 0) { + // Get a pointer to a "cloned" <ACE_Data_Block> (will copy the + // values rather than increment the reference count). + size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); + ACE_Data_Block *db = clone_span_nocopy (the_mb->data_block (), the_mb->rd_ptr (), aligned_size, mask); + if (db == 0) + return 0; + + ACE_Message_Block *nb; + + if(the_mb->message_block_allocator_ == 0) + { + ACE_NEW_RETURN (nb, + ACE_Message_Block (0, // size + ACE_Message_Block::ACE_Message_Type (0), // type + 0, // cont + 0, // data + 0, // allocator + 0, // locking strategy + 0, // flags + the_mb->priority_, // priority + ACE_EXECUTION_TIME, // execution time + ACE_DEADLINE_TIME, // absolute time to deadline + // Get a pointer to a + // "duplicated" <ACE_Data_Block> + // (will simply increment the + // reference count). + db, + db->data_block_allocator (), + the_mb->message_block_allocator_), + 0); + } + else + { + // This is the ACE_NEW_MALLOC macro with the return check removed. + // We need to do it this way because if it fails we need to release + // the cloned data block that was created above. If we used + // ACE_NEW_MALLOC_RETURN, there would be a memory leak because the + // above db pointer would be left dangling. + nb = ACE_static_cast(ACE_Message_Block*,the_mb->message_block_allocator_->malloc (sizeof (ACE_Message_Block))); + if(nb != 0) + new (nb) ACE_Message_Block (0, // size + ACE_Message_Block::ACE_Message_Type (0), // type + 0, // cont + 0, // data + 0, // allocator + 0, // locking strategy + 0, // flags + the_mb->priority_, // priority + ACE_EXECUTION_TIME, // execution time + ACE_DEADLINE_TIME, // absolute time to deadline + db, + db->data_block_allocator (), + the_mb->message_block_allocator_); + } + + if (nb == 0) + { + db->release (); + return 0; + } + + ACE_CDR::mb_align (nb); + nb->copy (the_mb->rd_ptr(), span_size); + + // Clone all the continuation messages if necessary. + if (the_mb->cont () != 0 + && (nb->cont_ = the_mb->cont ()->clone (mask)) == 0) + { + nb->release (); + return 0; + } + return nb; +} + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! + // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN + // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS + // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT + // BEHAVES DIFFERENTLY FROM make_uncompleted_message. + + // Validate arguments. + if (mb.length() < msging_obj.header_length ()) + return 0; + + size_t total_msg_len = 0; + TAO_Queued_Data *new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // We can assume that there are enough bytes for a header, so + // extract the header data. Don't assume that there's enough for + // the payload just yet. + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + msging_obj.set_queued_data_from_message_header (new_qd, mb); + if (new_qd->current_state_ == INVALID) + goto failure; + + // new_qd_->missing_data_bytes_ + protocol header length should be + // *at least* the length of the message. Verify that we have that + // many bytes in the message block and, if we don't, release the new + // qd and fail. + total_msg_len = new_qd->missing_data_bytes_ + msging_obj.header_length (); + if (total_msg_len > mb.length ()) + goto failure; + + // Make a copy of the relevant portion of mb and hang on to it + if ((new_qd->msg_block_ = clone_span (&mb, total_msg_len)) == 0) + goto failure; + + + // Update missing data and the current state + new_qd->missing_data_bytes_ = 0; + new_qd->current_state_ = COMPLETED; + + // Advance the rd_ptr on the message block + mb.rd_ptr (total_msg_len); + + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"), + total_msg_len, &mb, new_qd)); + } + + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"), + &mb, mb.length ())); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + mb.rd_ptr (), mb.length (), + ACE_TEXT (" residual bytes in buffer"))); + + } + TAO_Queued_Data::release (new_qd); + return 0; } /*static*/ TAO_Queued_Data * -TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) +TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc) { TAO_Queued_Data *qd = 0; diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 52d3ea50d34..4999c45e7fa 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -14,7 +14,7 @@ #define TAO_INCOMING_MESSAGE_QUEUE_H #include "ace/pre.h" -#include "Pluggable_Messaging_Utils.h" +#include "Pluggable_Messaging.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -74,14 +74,42 @@ public: /// Return the length of the queue.. CORBA::ULong queue_length (void); - /// Methods for sanity check. Checks to see whether the node on the - /// head or tail is complete or not and ready for further - /// processing. + /*! + @name Node Inspection Predicates + + \brief These methods allow inspection of head and tail nodes for "completeness". + + These methods check to see whether the node on the head or tail is + "complete" and ready for further processing. See each method's + documentation for its definition of "complete". + */ + //@{ + /*! + "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though) + + \return -1 queue is empty + \return 0 tail is not "complete" + \return 1 tail is "complete" + */ int is_tail_complete (void); + + /*! + + "complete" == the GIOP message at the head is not missing any data + AND, if it's the first message in a series of GIOP fragments, all + the fragments have been received, parsed, and placed into the + queue + + \return -1 if queue is empty + \return 0 if head is not "complete" + \return 1 if head is "complete" + */ int is_head_complete (void); + //@} - /// This method checks whether the last message that was queued up - /// was fragmented... + /*! + \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment. + */ int is_tail_fragmented (void); /// Return the size of data that is missing in tail of the queue. @@ -92,11 +120,12 @@ private: friend class TAO_Transport; +#if 0 /// Make a node for the queue. TAO_Queued_Data *get_node (void); +#endif private: - /// A linked listof messages that await processing TAO_Queued_Data *queued_data_; @@ -122,20 +151,69 @@ private: class TAO_Export TAO_Queued_Data { -public: +protected: /// Default Constructor TAO_Queued_Data (ACE_Allocator *alloc = 0); /// Constructor. TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); +public: /// Copy constructor. TAO_Queued_Data (const TAO_Queued_Data &qd); - /// Creation and deletion of a node in the queue. - static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0); + /*! + \name Factory Methods + + These methods manufacture instances of TAO_Queued_Data and return + them. These instances should be removed via TAO_Queued_Data::release. + + Instances are initialized from data in the ACE_Message_Block, + interpreted according to rules defined in the + TAO_Pluggable_Messaging object. + + The manufactured instance adopts the message block \em without + duplicating it; therefore, the caller must duplicate or orphan the + message block. The caller also must insure that the message block + can be released via ACE_Message_Block::release, and that its life + endures beyond the calling scope. + + For the purposes of TAO_Queued_Data, a completed message is a + completely received message as defined by the messaging protocol + object. For GIOP, that means that the number of bytes specified + in the general GIOP header have been completely received. It + specifically DOES NOT mean that all \em fragments have been + received. Fragment reassembly is another matter altogether. + */ + //@{ + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message. + */ + static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message. + */ + // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! + // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN + // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS + // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT + // BEHAVES DIFFERENTLY FROM make_uncompleted_message. + static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + /*! + \brief Creation and deletion of a node in the queue. + \todo Maybe this should be private? + */ +private: + static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0); +public: + //@} static void release (TAO_Queued_Data *qd); + void release (void); /// Duplicate ourselves. This creates a copy of ourselves on the /// heap and returns a pointer to the duplicated node. @@ -145,11 +223,43 @@ public: /// The message block that contains the message. ACE_Message_Block *msg_block_; - /// Data missing in the above message that hasn't been read or - /// processed yet. - CORBA::Long missing_data_; - - /// The byte order of the message that is stored in the node.. + /*! + @name Missing Data details + + The \a missing_data_bytes_ member contains the number of bytes of + data missing from \a msg_block_. However, there can be two places + where data is missing: header and payload. We cannot know how + much data is missing from the payload until we have a complete + header. Fortunately, headers are a fixed length, so we can know + how much we're missing from the header. + + We use \param current_state_ to indicate which portion of the message + \param missing_data_bytes_ refers to, as well as the general state of + the message. + */ + //@{ + /*! + Describes the meaning given to the number stored in \a missing_data_bytes_. + */ + enum Queued_Data_State + { + INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted. + COMPLETED = 0, //!< Message is complete; \a missing_data_ should be zero. + WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_ indicates part of header is missing. + WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_ indicates part of payload is missing. + }; + + /*! + Indicates the current state of the message, including hints at + how to interpret the value stored in \a missing_data_bytes_. + */ + Queued_Data_State current_state_; + + /*! Data missing in the above message that hasn't been read or processed yet. */ + CORBA::ULong missing_data_bytes_; + //@} + + /*! The byte order of the message that is stored in the node. */ CORBA::Octet byte_order_; /// Many protocols like GIOP have a major and minor version diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index d67bd485383..d04512d81b0 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void) return -1; if (this->size_ && - this->queued_data_->missing_data_ == 0) + this->queued_data_->missing_data_bytes_ == 0) return 1; return 0; @@ -31,7 +31,7 @@ TAO_Incoming_Message_Queue::is_head_complete (void) return -1; if (this->size_ && - this->queued_data_->next_->missing_data_ == 0 && + this->queued_data_->next_->missing_data_bytes_ == 0 && this->queued_data_->next_->more_fragments_ == 0) return 1; @@ -55,23 +55,30 @@ ACE_INLINE size_t TAO_Incoming_Message_Queue::missing_data_tail (void) const { if (this->size_ != 0) - return this->queued_data_->missing_data_; + return this->queued_data_->missing_data_bytes_; return 0; } - +#if 0 ACE_INLINE TAO_Queued_Data * TAO_Incoming_Message_Queue::get_node (void) { return TAO_Queued_Data::get_queued_data (); } +#endif /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ +ACE_INLINE void +TAO_Queued_Data::release (void) +{ + TAO_Queued_Data::release (this); +} + /*static*/ ACE_INLINE void TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb) diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 875429df5c1..2d460580d57 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -113,6 +113,7 @@ public: virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; +#if 0 /// Parse the incoming messages.. virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; @@ -137,12 +138,32 @@ public: /// @@Bala:Docu?? virtual int consolidate_fragments (TAO_Queued_Data *dqd, const TAO_Queued_Data *sqd) = 0; +#endif /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) = 0; + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const = 0; /// Parse the reply message that we received and return the reply /// information though <reply_info> diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 3801e66ebcc..1bb21950a6a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -23,6 +23,7 @@ #include "Notify_Handler.h" #include "ace/Message_Block.h" #include "ace/Reactor.h" +#include "ace/Min_Max.h" #if !defined (__ACE_INLINE__) # include "Transport.inl" @@ -105,6 +106,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) + , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) @@ -1290,7 +1292,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, this->id ())); } - // First try to process messages of the head of the incoming queue. + // First try to process messages off the head of the incoming queue. int retval = this->process_queue_head (rh); if (retval <= 0) @@ -1335,25 +1337,27 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); + // We'll loop trying to complete the message this number of times, + // and that's it. + unsigned int number_of_read_attempts = 2; // Align the message block ACE_CDR::mb_align (&message_block); +read_from_the_connection: size_t recv_size = 0; if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = - message_block.space (); + recv_size = message_block.space (); } else { - recv_size = - this->messaging_object ()->header_length (); + recv_size = this->messaging_object ()->header_length (); } // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), recv_size, max_wait_time); @@ -1371,7 +1375,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::handle_input_i: " "read %d bytes\n", this->id (), n)); } @@ -1379,47 +1383,381 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the message and try consolidating the message if - // needed. - retval = this->parse_consolidate_messages (message_block, - rh, - max_wait_time); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) message_block.rd_ptr (), + message_block.length (), + ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); + + // **** BEGIN CJC PMB CHANGES **** + // + // - Does this properly handle GIOP FRAGMENT messages?? + // @@ BALA: I doubt it.. You could get an incomplete message + // fragment or a complete message fragment but the message could + // incomplete. + + // @@ BALA: How do you handle messages that didnt fit the message + // block here? + // + // @@ CJC: I'm not sure what you mean...If we need more space, then + // we allocate more. + + // Check to see if we're still working to complete a message + if (this->uncompleted_message_) + { + // try to complete it + + // on exit from this frame we have one of the following states: + // + // (a) an uncompleted message still in uncompleted_message_ + // AND message_block is empty + // + // (b) uncompleted_message_ zero, the completed message at the + // tail of the incoming queue; message_block could be empty + // or still contain bytes + + // ==> repeat + do + { + /* + * Append the "right number of bytes" to uncompleted_message_ + */ + // ==> right_number_of_bytes = MIN(bytes missing from + // uncompleted_message_, length of message_block); + size_t right_number_of_bytes = + ACE_MIN (this->uncompleted_message_->missing_data_bytes_, + message_block.length () ); - if (retval <= 0) + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "trying to use %u (of %u) " + "bytes to complete message missing %u bytes\n", + this->id (), + right_number_of_bytes, + message_block.length (), + this->uncompleted_message_->missing_data_bytes_)); + } + + // ==> append right_number_of_bytes from message_block + // to uncomplete_message_ & update read pointer of + // message_block; + + // 1. we assume that uncompleted_message_.msg_block_'s + // wr_ptr is properly maintained + // 2. we presume that uncompleted_message_.msg_block was + // allocated with enough space to contain the *entire* + // expected GIOP message, so this copy shouldn't involve an + // additional allocation + this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), + right_number_of_bytes); + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + message_block.rd_ptr (right_number_of_bytes); + + switch (this->uncompleted_message_->current_state_) + { + case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: + if (this->messaging_object()->check_for_valid_header ( + *this->uncompleted_message_->msg_block_)) + { + // ==> update bytes missing from uncompleted_message_ + // with size of message from valid header; + this->messaging_object()->set_queued_data_from_message_header ( + this->uncompleted_message_, + *this->uncompleted_message_->msg_block_); + // ==> change state of uncompleted_event_ to + // WAITING_TO_COMPLETE_PAYLOAD; + this->uncompleted_message_->current_state_ = + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; + + // ==> Resize the message block to have capacity for + // the rest of the incoming message + ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; + mb.size (mb.size () + + this->uncompleted_message_->missing_data_bytes_); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "found a valid header in the message; " + "waiting for %u bytes to complete payload\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + + // Continue the loop...(?to where?) + continue; + } + else + { + // What the heck will we do with a bad header? Just + // better to close the connection and let things + // re-train from there. + return -1; +#if 0 // I don't think I need this clause, but I'm leaving it just in case. + // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + ACE_ASSERT (this->uncompleted_message->missing_data_bytes_ > 0); +#endif + } + break; + + case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: + // ==> if (bytes missing from uncompleted_message_ == 0) + if (this->uncompleted_message_->missing_data_bytes_ == 0) + { + /* + * We completed the message! Hooray! + */ + // ==> place uncompleted_message_ (which is now + // complete!) at the tail of the incoming message + // queue; + + // ---> NOTE: whoever pulls this off the queue must delete it! + this->uncompleted_message_->current_state_ + = TAO_Queued_Data::COMPLETED; + // @@CJC NEED TO CHECK RETURN VALUE HERE! + this->enqueue_incoming_message (this->uncompleted_message_); + // zero out uncompleted_message_; + this->uncompleted_message_ = 0; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "completed and queued message for processing!\n", + this->id ())); + } + + } + else + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still need %u bytes to complete uncompleted message.\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + } + break; + + default: + // @@CJC What do we do here?! + ACE_ASSERT (! "Transport::handle_input_i: unexpected state" + "in uncompleted_message_"); + } + } + // Does the order of the checks matter? In both (a) and (b), + // message_block is empty, but only in (b) is there no + // uncompleted_message_. + // ==> until (message_block is empty || there is no uncompleted_message_); + // or, rewritten in C++ looping constructs + // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); + while (message_block.length() != 0 && this->uncompleted_message_); + } + + // If, at the end, we're still waiting to complete the message, + // we should effectively return. + // + // @@BALA: Returning here will reduce the throughput. We shold try + // reading again to see if we could get more data.. + // + // @@CJC: Good point; maybe we can go to the top again, but only try + // to read at most 2-3 times before we just go on. Obviously, if we + // complete the message (i.e., there's no uncompleted message), then + // we go on to the next step. + if (this->uncompleted_message_) + { + if (number_of_read_attempts--) + { + // We try to read again just in case more data arrived while + // we were doing the stuff above. This way, we can increase + // throughput without much of a penalty. + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still have an uncompleted message; " + "will try %d more times before letting " + "somebody else have a chance.\n", + this->id (), + number_of_read_attempts)); + } + + // Yes, this uses the much-maligned "goto"; get over it. TCP + // implementations use them, too, for just the same sort of + // situation. + goto read_from_the_connection; + } + else + { + // The queue should be empty because it should have been processed + // above. But I wonder if I should put a check in here anyway. + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "giving up reading for now and returning " + "with incoming queue length = %d\n", + this->id (), + this->incoming_message_queue_.queue_length ())); + if (this->uncompleted_message_) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "missing bytes from uncompleted message = %u\n", + this->uncompleted_message_->missing_data_bytes_)); + } + return 1; + } + } + + // At this point, there should be nothing in uncompleted_message_. + // We now need to chop up the bytes in message_block and store any + // complete messages in the incoming message queue. + // + // ==> if (message_block still has data) + if (message_block.length () != 0) { - if (retval == -1 && TAO_debug_level > 0) + TAO_Queued_Data *complete_message = 0; + do { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " - "error while parsing and consolidating\n", - this->id ())); + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") + ACE_TEXT("extracting complete messages\n"))); + ACE_HEX_DUMP ((LM_DEBUG, + message_block.rd_ptr (), + message_block.length (), + ACE_TEXT (" from this message buffer"))); + } + complete_message = + TAO_Queued_Data::make_completed_message ( + message_block, *this->messaging_object ()); + if (complete_message) + this->enqueue_incoming_message (complete_message); } - return retval; + while (complete_message != 0); + // On exit from this frame we have one of the following states: + // (a) message_block is empty + // (b) message_block contains bytes from a partial message } - // Make a node of the message block.. - TAO_Queued_Data qd (&message_block, - this->orb_core_->transport_message_buffer_allocator ()); + // If, at this point, there's still data in message_block, it's + // an incomplete message. Therefore, we stuff it into the + // uncompleted_message_ and clear out message_block. + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + // duplicate message_block remainder into this->uncompleted_message_ + ACE_ASSERT (this->uncompleted_message_ == 0); + ACE_Message_Block *queueable_mb = message_block.clone (); + ACE_ASSERT (queueable_mb != 0); + message_block.rd_ptr (message_block.length ()); + this->uncompleted_message_ = + TAO_Queued_Data::make_uncompleted_message (queueable_mb, + *this->messaging_object ()); + } - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); + // We should have consumed ALL the bytes by now. + ACE_ASSERT (message_block.length () == 0); - // Check whether the message was fragmented.. - if (qd.more_fragments_ || - (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the node that we have as the node is on stack.. - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); + // **** END CJC PMG CHANGES **** - return this->consolidate_fragments (nqd, rh); + // Process the message + return this->process_queue_head (rh); +} + +int +TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) +{ + // some notes... + // + // will probably need to modify process_queue_head to iterate to + // find the first unfragmented message in the queue + + return this->incoming_message_queue_.enqueue_tail (queueable_message); + +#if 0 + // + // This is the code that will deal with fragments + // + + // If the message doesn't indicate that more fragments are on + // the way, then we just queue it up at the tail and go on. + if (! queueable_message->more_fragments_) + // NOTE: we might have to capture the return value from enqueue_tail() + // and convert it to an appropriate return value for this method. + return this->incoming_message_queue_.enqueue_tail (queueable_message); + + // At this point we know that queueable_message is a fragment. What + // we don't know is what kind of fragment it is, or how to deal with + // it. + // + // Let's just explore the mechanics of how things would be done for + // the case of GIOP. When we really do this, we'll have to go a bit + // more abstract. + switch (queueable_message->giop_version) + { + case 1.0: + // Fragments aren't supported in 1.0. This is an error and + // we should reject it somehow. What do we do here? Do we throw + // an exception to the receiving side? Do we throw an exception + // to the sending side? + // + // At the very least, we need to log the fact that we received + // nonsense. + LOG_AND_RETURN_AN_ERROR; + break; + + case 1.1: + // In 1.1, fragments kinda suck because they don't have they're + // own message-specific header. Therefore, we have to do the + // following: + fragment_message = search the incoming queue for a GIOP 1.1 message that has the more_fragments_ bit set; + add this queueable_message->msg_block_ as a continuation block to the last block in fragment_message continuation chain; + + // One note is that TAO_Queued_Data contains version numbers, + // but doesn't indicate the actual protocol to which those + // version numbers refer. That's not a problem, though, because + // instances of TAO_Queued_Data live in a queue, and that queue + // lives in a particular instance of a Transport, and the + // transport instance has an association with a particular + // messaging_object. The concrete messaging object embodies a + // messaging protocol, and must cover all versions of that + // protocol. Therefore, we just need to cover the bases of all + // versions of that one protocol. + break; + + case 1.2: + // In GIOP 1.2, we get a little more context. There's a + // FRAGMENT message-specific header, and inside that is the + // request id with which the fragment is associated. + // + // What this means is that we'll have to add request_id_ as one + // of the TOA_Queued_Data public data members. + fragment_id = extract id from fragment header; + fragment_message = search incoming_queue for a GIOP message with request_id==fragment_id; + // Check for bizarre error conditions first + if (! fragment_message->more_fragments_) + LOG_AND_RETURN_AN_ERROR; + if (fragment_message->GIOP_version != 1.2) + LOG_AND_RETURN_AN_ERROR; + + add this queueable_message->msg_block_ as a continuation block to the last block in fragment_message continuation chain; + break; } - // Process the message - return this->process_parsed_messages (&qd, - rh); + return 0; +#endif } +#if 0 int TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, TAO_Resume_Handle &rh, @@ -1902,6 +2240,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block return this->process_queue_head (rh); } +#endif int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, @@ -2013,6 +2352,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } +#if 0 TAO_Queued_Data * TAO_Transport::make_queued_data (ACE_Message_Block &incoming) { @@ -2063,6 +2403,7 @@ TAO_Transport::make_queued_data (ACE_Message_Block &incoming) return qd; } +#endif int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 31ebe4e18a9..dc542568f9b 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -649,6 +649,7 @@ protected: */ virtual int register_handler_i (void) = 0; +#if 0 /// Called by the handle_input_i (). This method is used to parse /// message read by the handle_input_i () call. It also decides /// whether the message needs consolidation before processing. @@ -696,16 +697,14 @@ protected: /// them, atleast at the head of them. int consolidate_extra_messages (ACE_Message_Block &incoming, TAO_Resume_Handle &rh); +#endif /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); - /// Make a queued data from the <incoming> message block - TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); - - /// Implement send_message_shared() assuming the handler_lock_ is + /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, int message_semantics, @@ -763,6 +762,34 @@ public: int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); + + /*! + \name Incoming Queue Methods + */ + //@{ + /*! + \brief Queue up \a queueable_message as a completely-received incoming message. + + This method queues up a completely-received queueable GIOP message + (i.e., it must be dynamically-allocated). It does not assemble a + complete GIOP message; that should be done prior to calling this + message, and is currently done in handle_input_i. + + This does, however, assure that a completely-received GIOP + FRAGMENT gets associated with any previously-received related + fragments. It does this through collaboration with the messaging + object (since fragment reassembly is protocol specific). + + \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received + + \return 0 successfully enqueued \a queueable_message + + \return -1 failed to enqueue \a queueable_message + \todo How do we indicate \em what may have failed? + */ + int enqueue_incoming_message (TAO_Queued_Data *queueable_message); + //@} + private: /// Helper method that returns the Transport Cache Manager. @@ -939,9 +966,12 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// Queue of the incoming messages.. + /// Queue of the completely-received incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; + /// Place to hold a partially-received (waiting-to-be-completed) message + TAO_Queued_Data * uncompleted_message_; + /// The queue will start draining no later than <queing_deadline_> /// *if* the deadline is ACE_Time_Value current_deadline_; diff --git a/TAO/tests/AMI/run_test.pl b/TAO/tests/AMI/run_test.pl index d08ef80dc23..802c5794cd9 100755 --- a/TAO/tests/AMI/run_test.pl +++ b/TAO/tests/AMI/run_test.pl @@ -11,7 +11,7 @@ use PerlACE::Run_Test; $client_conf = PerlACE::LocalFile ("muxed$PerlACE::svcconf_ext"); $debug_level = '0'; -$iterations = '1'; +$iterations = '10000'; foreach $i (@ARGV) { if ($i eq '-mux') { diff --git a/TAO/tests/InterOp-Naming/INS_test_client.cpp b/TAO/tests/InterOp-Naming/INS_test_client.cpp index 8c543b717c5..18b9c9596a2 100644 --- a/TAO/tests/InterOp-Naming/INS_test_client.cpp +++ b/TAO/tests/InterOp-Naming/INS_test_client.cpp @@ -2,12 +2,45 @@ #include "INSC.h" +#include <tao/Messaging/Messaging.h> +#include <tao/TimeBaseC.h> + +template <class T> +void set_timeout_on_objref (CORBA::ORB_ptr orb, + CORBA::PolicyType pt, + const ACE_Time_Value &timeout, + typename T::_var_type &obj, + T*) +{ + // TimeBase::TimeT rrtt = (timeout.msec() * 1000 + timeout.sec()) * 10000000; + TimeBase::TimeT rrtt = timeout.msec() * 10000; + CORBA::Any rrtt_any; + rrtt_any <<= rrtt; + + CORBA::PolicyList pl; + pl.length (1); + pl[0] = orb->create_policy (pt, rrtt_any); + + try + { + CORBA::Object_var tmp = obj->_set_policy_overrides (pl, CORBA::SET_OVERRIDE); + obj = T::_unchecked_narrow (tmp.in ()); + } + catch (...) + { + obj = T::_nil (); + } + + pl[0]->destroy (); +} int main (int argc, char *argv[]) { int i = 0; + ACE_LOG_MSG->set_flags (ACE_Log_Msg::VERBOSE_LITE); + ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { @@ -73,15 +106,42 @@ main (int argc, char *argv[]) "given name.\n"), -1); + const ACE_Time_Value RTT (20, 500000); + CORBA::Object *dummy = 0; +#if 1 + set_timeout_on_objref (orb.in (), + Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, + RTT, objref, dummy); + INS_var server = INS::_narrow (objref.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; +#else + set_timeout_on_objref (orb.in (), + TAO::CONNECTION_TIMEOUT_POLICY_TYPE, + RTT, objref, dummy); + INS_var server = INS::_narrow (objref.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; +#endif + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) TRYING TO NARROW OBJREF AGAIN.\n")); + INS_var server2 = INS::_narrow (objref.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; ACE_DEBUG ((LM_DEBUG, "Resolved IOR for %s : %s\n", argv[i], orb->object_to_string (server.in ()))); +#if 0 + INS *d2; + set_timeout_on_objref (orb.in (), + Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, + RTT, server, d2); +#endif + CORBA::String_var test_ins_result = server->test_ins (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; diff --git a/TAO/tests/InterOp-Naming/Makefile b/TAO/tests/InterOp-Naming/Makefile index b2d07af51a9..639bc862b56 100644 --- a/TAO/tests/InterOp-Naming/Makefile +++ b/TAO/tests/InterOp-Naming/Makefile @@ -61,7 +61,7 @@ INS_test_server:$(addprefix $(VDIR),$(SIMPLE_SERVER_OBJS)) $(LINK.cc) $(LDFLAGS) -o $@ $^ -lTAO_IORTable $(TAO_SRVR_LIBS) $(POSTLINK) INS_test_client:$(addprefix $(VDIR),$(SIMPLE_CLIENT_OBJS)) - $(LINK.cc) $(LDFLAGS) -o $@ $^ $(TAO_CLNT_LIBS) $(POSTLINK) + $(LINK.cc) $(LDFLAGS) -o $@ $^ -lTAO_Messaging $(TAO_CLNT_LIBS) $(POSTLINK) .PRECIOUS: $(foreach ext, $(IDL_EXT), INS$(ext)) |