summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Cleeland <chris.cleeland@gmail.com>2003-02-11 16:21:43 +0000
committerChris Cleeland <chris.cleeland@gmail.com>2003-02-11 16:21:43 +0000
commit7b8e4114b6e9121492ce7335458a31e1d123b94c (patch)
treefffde55e8cf648ac3953959d8c88868014b8960c
parent1a8adc0831aa22f0be77824b98da6787c752d28e (diff)
downloadATCD-7b8e4114b6e9121492ce7335458a31e1d123b94c.tar.gz
Brought over changes from 1.2 into this 1.3 branch. Still need to add a
ChangeLog entry. Everything that I expected to work works, and everything I expected to be broken is broken, for whatever that's worth.
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp51
-rw-r--r--TAO/tao/GIOP_Message_Base.h25
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_Impl.inl13
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp23
-rw-r--r--TAO/tao/GIOP_Message_Lite.h23
-rw-r--r--TAO/tao/GIOP_Message_State.cpp114
-rw-r--r--TAO/tao/GIOP_Message_State.h37
-rw-r--r--TAO/tao/GIOP_Message_State.inl18
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp352
-rw-r--r--TAO/tao/Incoming_Message_Queue.h140
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl15
-rw-r--r--TAO/tao/Pluggable_Messaging.h21
-rw-r--r--TAO/tao/Transport.cpp411
-rw-r--r--TAO/tao/Transport.h40
-rwxr-xr-xTAO/tests/AMI/run_test.pl2
-rw-r--r--TAO/tests/InterOp-Naming/INS_test_client.cpp60
-rw-r--r--TAO/tests/InterOp-Naming/Makefile2
17 files changed, 1224 insertions, 123 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 9ac3f504ed7..1e3d72512d8 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -309,6 +309,7 @@ TAO_GIOP_Message_Base::message_type (
return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
}
+#if 0
int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
@@ -566,6 +567,7 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
// Reset the message_state
this->message_state_.reset ();
}
+#endif
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
@@ -1532,6 +1534,7 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
+#if 0
TAO_Queued_Data *
TAO_GIOP_Message_Base::make_queued_data (size_t sz)
{
@@ -1567,9 +1570,57 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
return qd;
}
+#endif
+
+
size_t
TAO_GIOP_Message_Base::header_length (void) const
{
return TAO_GIOP_MESSAGE_HEADER_LEN;
}
+
+void
+TAO_GIOP_Message_Base::set_queued_data_from_message_header (
+ TAO_Queued_Data *qd,
+ const ACE_Message_Block &mb
+ ) const
+{
+ // @@CJC: Try leaving out the declaration for this->message_state_
+ // and see what pukes. I don't think we need it any more.
+ TAO_GIOP_Message_State state;
+ if (state.take_values_from_message_block (mb) == -1)
+ {
+ // what the heck do we do here?!
+ qd->current_state_ = TAO_Queued_Data::INVALID;
+ return;
+ }
+
+ // It'd be nice to have an abstract base for GIOP_Message_State
+ // so that there could just be a line like:
+ // qd->take_values_from (state);
+ // Get the message information
+ qd->byte_order_ = state.byte_order ();
+ qd->major_version_ = state.giop_version ().major;
+ qd->minor_version_ = state.giop_version ().minor;
+ qd->more_fragments_ = state.more_fragments () ? 1 : 0;
+ qd->msg_type_= message_type (state);
+ qd->missing_data_bytes_ = state.payload_size ();
+}
+
+int
+TAO_GIOP_Message_Base::check_for_valid_header (
+ const ACE_Message_Block &mb
+ ) const
+{
+ const char* magic_bytes = "GIOP";
+ ACE_ASSERT (ACE_OS::strlen (magic_bytes) < header_length ());
+ if (mb.length () < header_length ())
+ return 0;
+
+ // Is finding that it's the right length and the magic bytes present
+ // enough to declare it a valid header? I think so...
+ return (ACE_OS::memcmp (mb.rd_ptr (),
+ magic_bytes,
+ ACE_OS::strlen (magic_bytes)) == 0);
+}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 1ed66ef6b70..41845f24e22 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -93,6 +93,7 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
+#if 0
/// Parse the incoming messages..
virtual int parse_incoming_messages (ACE_Message_Block &message_block);
@@ -117,12 +118,34 @@ public:
/// @@Bala:Docu??
virtual int consolidate_fragments (TAO_Queued_Data *dqd,
const TAO_Queued_Data *sqd);
+#endif
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd);
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
+
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. Does
+ */
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
+
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
+
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const;
+
+
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
@@ -172,7 +195,7 @@ protected:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
+ static TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
private:
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
index 18bb7936ffd..bbb9f024c9d 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
+++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
@@ -5,9 +5,22 @@ TAO_GIOP_Message_Generator_Parser_Impl::
check_revision (CORBA::Octet incoming_major,
CORBA::Octet incoming_minor)
{
+#if 0
if (incoming_major > TAO_DEF_GIOP_MAJOR ||
incoming_minor > TAO_DEF_GIOP_MINOR)
return 0;
return 1;
+#endif
+ CORBA::Boolean ret = 0; // Assume that the revision is bogus
+ if (incoming_major == 1)
+ switch (incoming_minor)
+ {
+ case 0:
+ case 1:
+ case 2:
+ ret = 1;
+ }
+
+ return ret;
}
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index 6395e1afa27..5d923762331 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -237,6 +237,7 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
}
+#if 0
int
TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
{
@@ -485,6 +486,7 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/,
// We dont know what fragments are???
return -1;
}
+#endif
int
TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
@@ -1596,6 +1598,7 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
}
}
+#if 0
TAO_Queued_Data *
TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
{
@@ -1627,6 +1630,7 @@ TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
return qd;
}
+#endif
int
TAO_GIOP_Message_Lite::generate_locate_reply_header (
@@ -1649,3 +1653,22 @@ TAO_GIOP_Message_Lite::header_length (void) const
{
return TAO_GIOP_LITE_HEADER_LEN;
}
+
+void
+TAO_GIOP_Message_Lite::set_queued_data_from_message_header (
+ TAO_Queued_Data *qd,
+ const ACE_Message_Block &mb
+ ) const
+{
+ ACE_UNUSED_ARG (qd);
+ ACE_UNUSED_ARG (mb);
+}
+
+int
+TAO_GIOP_Message_Lite::check_for_valid_header (
+ const ACE_Message_Block &mb
+ ) const
+{
+ ACE_UNUSED_ARG (mb);
+ return 0;
+}
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index afe78e63136..7affd27e6b8 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -85,6 +85,7 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
+#if 0
/// Parse the incoming messages..
virtual int parse_incoming_messages (ACE_Message_Block &message_block);
@@ -118,12 +119,34 @@ public:
/// @@Bala: Docu???
virtual int consolidate_fragments (TAO_Queued_Data *dqd,
const TAO_Queued_Data *sqd);
+#endif
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd);
+
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
+
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. Does
+ */
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
+
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
+
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const;
+
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index 72d04c272f4..b86a456a140 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -11,13 +11,44 @@
# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
+class TAO_Debug_Msg_Emitter_Guard
+{
+public:
+ TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg)
+ : which_level_(debug_level)
+ {
+ this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ];
+ ACE_OS::strcpy (this->msg_, msg);
+ ACE_OS::strcat (this->msg_, " begin\n");
+ if (TAO_debug_level >= this->which_level_)
+ ACE_DEBUG ((LM_DEBUG, this->msg_ ));
+ }
+
+ ~TAO_Debug_Msg_Emitter_Guard ()
+ {
+ if (TAO_debug_level >= this->which_level_)
+ {
+ char* begin_start =
+ this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1;
+ ACE_OS::strcpy (begin_start, " end\n");
+ ACE_DEBUG ((LM_DEBUG, this->msg_));
+ }
+ delete[] this->msg_;
+ }
+
+private:
+ static const int MAGIC_LENGTH = 8; // " begin\n" + \000
+ unsigned int which_level_;
+ char* msg_;
+};
+
ACE_RCSID(tao, GIOP_Message_State, "$Id$")
TAO_GIOP_Message_State::TAO_GIOP_Message_State (
- TAO_ORB_Core * /*orb_core*/,
- TAO_GIOP_Message_Base *base)
- : base_ (base),
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
+ : /*base_ (base),*/
giop_version_ (TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR),
byte_order_ (0),
@@ -29,7 +60,50 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State (
{
}
+// This doesn't check the message block's length, so that means that
+// the *caller* needs to do that first.
+int
+TAO_GIOP_Message_State::take_values_from_message_block (
+ const ACE_Message_Block& mb
+ )
+{
+ TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::take_values_from_message_block");
+
+ const char* buf = mb.rd_ptr ();
+
+ // Get the version information
+ if (this->set_version_info_from_buffer (buf) == -1)
+ return -1;
+
+ // Get the byte order information...
+ if (this->set_byte_order_info_from_buffer (buf) == -1)
+ return -1;
+
+ // Get the message type
+ this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
+
+ // Get the size of the message..
+ this->set_payload_size_from_buffer (buf);
+
+ if (this->message_size_ == 0)
+ {
+ if (this->message_type_ == TAO_GIOP_MESSAGERROR)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: GIOP_MESSAGE_ERROR rcv'd.\n"));
+ return 0;
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: Message of size zero rcv'd.\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+#if 0
int
TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
{
@@ -133,21 +207,16 @@ TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
return 0;
}
+#endif
int
-TAO_GIOP_Message_State::get_version_info (char *buf)
+TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::get_version_info\n"));
- }
+ TAO_Debug_Msg_Emitter_Guard (8, ACE_TEXT("TAO (%P|%t) GIOP_Message_State::set_version_info"));
// We have a GIOP message on hand. Get its revision numbers
- CORBA::Octet incoming_major =
- buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
- CORBA::Octet incoming_minor =
- buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
// Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
@@ -157,7 +226,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"),
+ ACE_TEXT ("TAO (%P|%t) - ")
+ ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:")
+ ACE_TEXT ("bad version <%d.%d>\n"),
incoming_major, incoming_minor));
}
@@ -172,15 +243,11 @@ TAO_GIOP_Message_State::get_version_info (char *buf)
}
int
-TAO_GIOP_Message_State::get_byte_order_info (char *buf)
+TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n"));
- }
+ TAO_Debug_Msg_Emitter_Guard (8, "TAO (%P|%t) GIOP_Message_State::set_byte_order_info_from_buffer");
- // Let us be specific that this is for 1.0
+ // Let us be specific that this is for 1.0
if (this->giop_version_.minor == 0 &&
this->giop_version_.major == 1)
{
@@ -225,8 +292,9 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf)
}
void
-TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
+TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr)
{
+ TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::set_payload_size_from_buffer");
// Move the read pointer
rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
@@ -263,7 +331,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf,
}
CORBA::ULong
-TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
+TAO_GIOP_Message_State::read_ulong (const char *rd_ptr)
{
CORBA::ULong x = 0;
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index 594747dc4cc..319e37a3933 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -42,11 +42,15 @@ class TAO_Export TAO_GIOP_Message_State
public:
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
- TAO_GIOP_Message_Base *base);
+ TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0,
+ TAO_GIOP_Message_Base *base = 0);
+ int take_values_from_message_block (const ACE_Message_Block& mb);
+
+#if 0
/// Parse the message header.
int parse_message_header (ACE_Message_Block &incoming);
+#endif
/// Return the message size
CORBA::ULong message_size (void) const;
@@ -54,9 +58,24 @@ public:
/// Return the message size
CORBA::ULong payload_size (void) const;
- /// Return the byte order information
+ /*!
+ \brief Return the byte order information.
+ \return 0 big-endian
+ \return 1 little-endian
+ */
CORBA::Octet byte_order (void) const;
+ /*!
+ \brief Return GIOP version information.
+ */
+ const TAO_GIOP_Message_Version &giop_version () const;
+
+ /// (Requests and Replys)
+ CORBA::Octet more_fragments () const;
+
+ /// MsgType above
+ CORBA::Octet message_type () const;
+
/// Reset the state..
void reset (void);
@@ -64,25 +83,27 @@ private:
friend class TAO_GIOP_Message_Base;
+#if 0
/// Parse the message header.
int parse_message_header_i (ACE_Message_Block &incoming);
/// Checks for the magic word 'GIOP' in the start of the incoing
/// stream
int parse_magic_bytes (char *buf);
+#endif
/// Extracts the version information from the incoming
/// stream. Performs a check for whether the version information is
/// right and sets the information in the <state>
- int get_version_info (char *buf);
+ int set_version_info_from_buffer (const char *buf);
/// Extracts the byte order information from the incoming
/// stream. Performs a check for whether the byte order information
/// right and sets the information in the <state>
- int get_byte_order_info (char *buf);
+ int set_byte_order_info_from_buffer (const char *buf);
/// Gets the size of the payload and set the size in the <state>
- void get_payload_size (char *buf);
+ void set_payload_size_from_buffer (const char *buf);
/// Parses the GIOP FRAGMENT_HEADER information from the incoming
/// stream.
@@ -91,12 +112,14 @@ private:
/// Read the unsigned long from the buffer. The <buf> should just
/// point to the next 4 bytes data that represent the ULong
- CORBA::ULong read_ulong (char *buf);
+ CORBA::ULong read_ulong (const char *buf);
private:
+#if 0
/// The GIOP base class..
TAO_GIOP_Message_Base *base_;
+#endif
// GIOP version information..
TAO_GIOP_Message_Version giop_version_;
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
index fe076bee689..f2f0011a7f6 100644
--- a/TAO/tao/GIOP_Message_State.inl
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -33,6 +33,24 @@ TAO_GIOP_Message_State::reset (void)
this->missing_data_ = 0;
}
+ACE_INLINE const TAO_GIOP_Message_Version &
+TAO_GIOP_Message_State::giop_version () const
+{
+ return this->giop_version_;
+}
+
+ACE_INLINE CORBA::Octet
+TAO_GIOP_Message_State::more_fragments () const
+{
+ return this->more_fragments_;
+}
+
+ACE_INLINE CORBA::Octet
+TAO_GIOP_Message_State::message_type () const
+{
+ return this->message_type_;
+}
+
#if 0
ACE_INLINE int
TAO_GIOP_Message_State::message_fragmented (void)
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index de087da0565..d42f14da589 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,3 +1,13 @@
+// We're doing this because we need to extend ACE_Message_Block, but
+// that interface is currently "frozen". Therefore, I'm using static
+// functions in this compilation unit that I would normally add as part
+// of the interface.
+#define private public
+#define protected public
+#include "ace/Message_Block.h"
+#undef private
+#undef public
+
#include "Incoming_Message_Queue.h"
#include "ORB_Core.h"
#include "debug.h"
@@ -41,13 +51,13 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
+ if (block.length () <= this->queued_data_->missing_data_bytes_)
{
n = block.length ();
}
else
{
- n = this->queued_data_->missing_data_;
+ n = this->queued_data_->missing_data_bytes_;
}
// Do the copy
@@ -55,7 +65,7 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
n);
// Decerement the missing data
- this->queued_data_->missing_data_ -= n;
+ this->queued_data_->missing_data_bytes_ -= n;
}
return n;
@@ -132,48 +142,328 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
/************************************************************************/
TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
- : msg_block_ (0),
- missing_data_ (0),
- byte_order_ (0),
- major_version_ (0),
- minor_version_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
+ : msg_block_ (0)
+ , current_state_ (INVALID)
+ , missing_data_bytes_ (0)
+ , byte_order_ (0)
+ , major_version_ (0)
+ , minor_version_ (0)
+ , more_fragments_ (0)
+ , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ , next_ (0)
+ , allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
ACE_Allocator *alloc)
- : msg_block_ (mb),
- missing_data_ (0),
- byte_order_ (0),
- major_version_ (0),
- minor_version_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
+ : msg_block_ (mb)
+ , current_state_ (INVALID)
+ , missing_data_bytes_ (0)
+ , byte_order_ (0)
+ , major_version_ (0)
+ , minor_version_ (0)
+ , more_fragments_ (0)
+ , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ , next_ (0)
+ , allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
- : msg_block_ (qd.msg_block_->duplicate ()),
- missing_data_ (qd.missing_data_),
- byte_order_ (qd.byte_order_),
- major_version_ (qd.major_version_),
- minor_version_ (qd.minor_version_),
- more_fragments_ (qd.more_fragments_),
- msg_type_ (qd.msg_type_),
- next_ (0),
- allocator_ (qd.allocator_)
+ : msg_block_ (qd.msg_block_->duplicate ())
+ , current_state_ (qd.current_state_)
+ , missing_data_bytes_ (qd.missing_data_bytes_)
+ , byte_order_ (qd.byte_order_)
+ , major_version_ (qd.major_version_)
+ , minor_version_ (qd.minor_version_)
+ , more_fragments_ (qd.more_fragments_)
+ , msg_type_ (qd.msg_type_)
+ , next_ (0)
+ , allocator_ (qd.allocator_)
+{
+}
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc)
+{
+ TAO_Queued_Data *new_qd = 0;
+
+ // Validate arguments.
+ if (mb == 0)
+ goto failure;
+
+ new_qd = make_queued_data (alloc);
+ if (new_qd == 0)
+ goto failure;
+
+ // do we have enough bytes to make a complete header?
+ if (mb->length() >= msging_obj.header_length ())
+ {
+ // Since we have enough bytes to make a complete header,
+ // the header needs to be valid. Check that now, and punt
+ // if it's not valid.
+ if (! msging_obj.check_for_valid_header (*mb))
+ {
+ goto failure;
+ }
+ else
+ {
+ new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
+ new_qd->msg_block_ = mb;
+ msging_obj.set_queued_data_from_message_header (new_qd, *mb);
+ // missing_data_bytes_ now has the full GIOP message size, but
+ // there might still be stuff in mb. Therefore, we have to adjust
+ // missing_data_bytes_, i.e., decrease it by the number of "actual
+ // payload bytes" in mb.
+ //
+ // "actual payload bytes" :== length of mb (which included the header) - header length
+ new_qd->missing_data_bytes_ -= (mb->length () - msging_obj.header_length ());
+ //??? mb->rd_ptr (msging_obj.header_length ());
+ }
+ }
+ else
+ {
+ new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER;
+ new_qd->msg_block_ = mb;
+ new_qd->missing_data_bytes_ = msging_obj.header_length () - mb->length ();
+ }
+
+ ACE_ASSERT (new_qd->current_state_ != INVALID);
+ if (TAO_debug_level > 7)
+ {
+ const char* s = "?unk?";
+ switch (new_qd->current_state_) {
+ case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break;
+ case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break;
+ case INVALID: s = "INVALID"; break;
+ case COMPLETED: s = "COMPLETED"; break;
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
+ ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:")
+ ACE_TEXT ("state=%s,missing_data_bytes=%u\n"),
+ mb->length(), new_qd, s, new_qd->missing_data_bytes_));
+ }
+ return new_qd;
+
+failure:
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
+ ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"),
+ mb, new_qd));
+ }
+ TAO_Queued_Data::release (new_qd);
+ return 0;
+}
+
+/*!
+ \brief Act like ACE_Message_Block::clone, but only clone the part btw. rd_ptr and wr_ptr.
+ */
+// Follow #def's swiped from Message_Block.cpp
+#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
+#define ACE_EXECUTION_TIME this->execution_time_
+#define ACE_DEADLINE_TIME this->deadline_time_
+#else
+#define ACE_EXECUTION_TIME ACE_Time_Value::zero
+#define ACE_DEADLINE_TIME ACE_Time_Value::max_time
+#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
+
+static ACE_Data_Block*
+clone_span_nocopy (/*const*/ ACE_Data_Block *the_db,
+ const char* beg, size_t span,
+ ACE_Message_Block::Message_Flags mask = 0)
+{
+ // Allocate a new data block through the same allocator as 'the_db'
+ const ACE_Message_Block::Message_Flags always_clear =
+ ACE_Message_Block::DONT_DELETE;
+
+ ACE_Data_Block *nb;
+ ACE_Allocator *db_allocator = the_db->data_block_allocator ();
+
+ ACE_NEW_MALLOC_RETURN (nb,
+ ACE_static_cast(ACE_Data_Block*,
+ db_allocator->malloc (sizeof (ACE_Data_Block))),
+ ACE_Data_Block (span, // size
+ the_db->msg_type (), // type
+ 0, // data
+ the_db->allocator_strategy (), // allocator
+ the_db->locking_strategy (), // locking strategy
+ the_db->flags (), // flags
+ db_allocator),
+ 0);
+
+
+ // Set new flags minus the mask...
+ nb->clr_flags (mask | always_clear);
+
+ // Copy in the data, and set the pointer
+ // ACE_OS::memcpy (nb->base (), beg, span);
+
+ return nb;
+}
+
+static ACE_Message_Block*
+clone_span (/*const*/ ACE_Message_Block *the_mb, size_t span_size, ACE_Message_Block::Message_Flags mask = 0)
{
+ // Get a pointer to a "cloned" <ACE_Data_Block> (will copy the
+ // values rather than increment the reference count).
+ size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
+ ACE_Data_Block *db = clone_span_nocopy (the_mb->data_block (), the_mb->rd_ptr (), aligned_size, mask);
+ if (db == 0)
+ return 0;
+
+ ACE_Message_Block *nb;
+
+ if(the_mb->message_block_allocator_ == 0)
+ {
+ ACE_NEW_RETURN (nb,
+ ACE_Message_Block (0, // size
+ ACE_Message_Block::ACE_Message_Type (0), // type
+ 0, // cont
+ 0, // data
+ 0, // allocator
+ 0, // locking strategy
+ 0, // flags
+ the_mb->priority_, // priority
+ ACE_EXECUTION_TIME, // execution time
+ ACE_DEADLINE_TIME, // absolute time to deadline
+ // Get a pointer to a
+ // "duplicated" <ACE_Data_Block>
+ // (will simply increment the
+ // reference count).
+ db,
+ db->data_block_allocator (),
+ the_mb->message_block_allocator_),
+ 0);
+ }
+ else
+ {
+ // This is the ACE_NEW_MALLOC macro with the return check removed.
+ // We need to do it this way because if it fails we need to release
+ // the cloned data block that was created above. If we used
+ // ACE_NEW_MALLOC_RETURN, there would be a memory leak because the
+ // above db pointer would be left dangling.
+ nb = ACE_static_cast(ACE_Message_Block*,the_mb->message_block_allocator_->malloc (sizeof (ACE_Message_Block)));
+ if(nb != 0)
+ new (nb) ACE_Message_Block (0, // size
+ ACE_Message_Block::ACE_Message_Type (0), // type
+ 0, // cont
+ 0, // data
+ 0, // allocator
+ 0, // locking strategy
+ 0, // flags
+ the_mb->priority_, // priority
+ ACE_EXECUTION_TIME, // execution time
+ ACE_DEADLINE_TIME, // absolute time to deadline
+ db,
+ db->data_block_allocator (),
+ the_mb->message_block_allocator_);
+ }
+
+ if (nb == 0)
+ {
+ db->release ();
+ return 0;
+ }
+
+ ACE_CDR::mb_align (nb);
+ nb->copy (the_mb->rd_ptr(), span_size);
+
+ // Clone all the continuation messages if necessary.
+ if (the_mb->cont () != 0
+ && (nb->cont_ = the_mb->cont ()->clone (mask)) == 0)
+ {
+ nb->release ();
+ return 0;
+ }
+ return nb;
+}
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc)
+{
+ // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE!
+ // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN
+ // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS
+ // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT
+ // BEHAVES DIFFERENTLY FROM make_uncompleted_message.
+
+ // Validate arguments.
+ if (mb.length() < msging_obj.header_length ())
+ return 0;
+
+ size_t total_msg_len = 0;
+ TAO_Queued_Data *new_qd = make_queued_data (alloc);
+ if (new_qd == 0)
+ goto failure;
+
+ // We can assume that there are enough bytes for a header, so
+ // extract the header data. Don't assume that there's enough for
+ // the payload just yet.
+ new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
+ msging_obj.set_queued_data_from_message_header (new_qd, mb);
+ if (new_qd->current_state_ == INVALID)
+ goto failure;
+
+ // new_qd_->missing_data_bytes_ + protocol header length should be
+ // *at least* the length of the message. Verify that we have that
+ // many bytes in the message block and, if we don't, release the new
+ // qd and fail.
+ total_msg_len = new_qd->missing_data_bytes_ + msging_obj.header_length ();
+ if (total_msg_len > mb.length ())
+ goto failure;
+
+ // Make a copy of the relevant portion of mb and hang on to it
+ if ((new_qd->msg_block_ = clone_span (&mb, total_msg_len)) == 0)
+ goto failure;
+
+
+ // Update missing data and the current state
+ new_qd->missing_data_bytes_ = 0;
+ new_qd->current_state_ = COMPLETED;
+
+ // Advance the rd_ptr on the message block
+ mb.rd_ptr (total_msg_len);
+
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
+ ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"),
+ total_msg_len, &mb, new_qd));
+ }
+
+ return new_qd;
+
+failure:
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
+ ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"),
+ &mb, mb.length ()));
+ if (TAO_debug_level >= 10)
+ ACE_HEX_DUMP ((LM_DEBUG,
+ mb.rd_ptr (), mb.length (),
+ ACE_TEXT (" residual bytes in buffer")));
+
+ }
+ TAO_Queued_Data::release (new_qd);
+ return 0;
}
/*static*/
TAO_Queued_Data *
-TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
+TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
{
TAO_Queued_Data *qd = 0;
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 52d3ea50d34..4999c45e7fa 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -14,7 +14,7 @@
#define TAO_INCOMING_MESSAGE_QUEUE_H
#include "ace/pre.h"
-#include "Pluggable_Messaging_Utils.h"
+#include "Pluggable_Messaging.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -74,14 +74,42 @@ public:
/// Return the length of the queue..
CORBA::ULong queue_length (void);
- /// Methods for sanity check. Checks to see whether the node on the
- /// head or tail is complete or not and ready for further
- /// processing.
+ /*!
+ @name Node Inspection Predicates
+
+ \brief These methods allow inspection of head and tail nodes for "completeness".
+
+ These methods check to see whether the node on the head or tail is
+ "complete" and ready for further processing. See each method's
+ documentation for its definition of "complete".
+ */
+ //@{
+ /*!
+ "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though)
+
+ \return -1 queue is empty
+ \return 0 tail is not "complete"
+ \return 1 tail is "complete"
+ */
int is_tail_complete (void);
+
+ /*!
+
+ "complete" == the GIOP message at the head is not missing any data
+ AND, if it's the first message in a series of GIOP fragments, all
+ the fragments have been received, parsed, and placed into the
+ queue
+
+ \return -1 if queue is empty
+ \return 0 if head is not "complete"
+ \return 1 if head is "complete"
+ */
int is_head_complete (void);
+ //@}
- /// This method checks whether the last message that was queued up
- /// was fragmented...
+ /*!
+ \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment.
+ */
int is_tail_fragmented (void);
/// Return the size of data that is missing in tail of the queue.
@@ -92,11 +120,12 @@ private:
friend class TAO_Transport;
+#if 0
/// Make a node for the queue.
TAO_Queued_Data *get_node (void);
+#endif
private:
-
/// A linked listof messages that await processing
TAO_Queued_Data *queued_data_;
@@ -122,20 +151,69 @@ private:
class TAO_Export TAO_Queued_Data
{
-public:
+protected:
/// Default Constructor
TAO_Queued_Data (ACE_Allocator *alloc = 0);
/// Constructor.
TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
+public:
/// Copy constructor.
TAO_Queued_Data (const TAO_Queued_Data &qd);
- /// Creation and deletion of a node in the queue.
- static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0);
+ /*!
+ \name Factory Methods
+
+ These methods manufacture instances of TAO_Queued_Data and return
+ them. These instances should be removed via TAO_Queued_Data::release.
+
+ Instances are initialized from data in the ACE_Message_Block,
+ interpreted according to rules defined in the
+ TAO_Pluggable_Messaging object.
+
+ The manufactured instance adopts the message block \em without
+ duplicating it; therefore, the caller must duplicate or orphan the
+ message block. The caller also must insure that the message block
+ can be released via ACE_Message_Block::release, and that its life
+ endures beyond the calling scope.
+
+ For the purposes of TAO_Queued_Data, a completed message is a
+ completely received message as defined by the messaging protocol
+ object. For GIOP, that means that the number of bytes specified
+ in the general GIOP header have been completely received. It
+ specifically DOES NOT mean that all \em fragments have been
+ received. Fragment reassembly is another matter altogether.
+ */
+ //@{
+ /*!
+ \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message.
+ */
+ static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc = 0);
+ /*!
+ \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message.
+ */
+ // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE!
+ // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN
+ // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS
+ // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT
+ // BEHAVES DIFFERENTLY FROM make_uncompleted_message.
+ static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc = 0);
+ /*!
+ \brief Creation and deletion of a node in the queue.
+ \todo Maybe this should be private?
+ */
+private:
+ static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0);
+public:
+ //@}
static void release (TAO_Queued_Data *qd);
+ void release (void);
/// Duplicate ourselves. This creates a copy of ourselves on the
/// heap and returns a pointer to the duplicated node.
@@ -145,11 +223,43 @@ public:
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
- /// Data missing in the above message that hasn't been read or
- /// processed yet.
- CORBA::Long missing_data_;
-
- /// The byte order of the message that is stored in the node..
+ /*!
+ @name Missing Data details
+
+ The \a missing_data_bytes_ member contains the number of bytes of
+ data missing from \a msg_block_. However, there can be two places
+ where data is missing: header and payload. We cannot know how
+ much data is missing from the payload until we have a complete
+ header. Fortunately, headers are a fixed length, so we can know
+ how much we're missing from the header.
+
+ We use \param current_state_ to indicate which portion of the message
+ \param missing_data_bytes_ refers to, as well as the general state of
+ the message.
+ */
+ //@{
+ /*!
+ Describes the meaning given to the number stored in \a missing_data_bytes_.
+ */
+ enum Queued_Data_State
+ {
+ INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted.
+ COMPLETED = 0, //!< Message is complete; \a missing_data_ should be zero.
+ WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_ indicates part of header is missing.
+ WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_ indicates part of payload is missing.
+ };
+
+ /*!
+ Indicates the current state of the message, including hints at
+ how to interpret the value stored in \a missing_data_bytes_.
+ */
+ Queued_Data_State current_state_;
+
+ /*! Data missing in the above message that hasn't been read or processed yet. */
+ CORBA::ULong missing_data_bytes_;
+ //@}
+
+ /*! The byte order of the message that is stored in the node. */
CORBA::Octet byte_order_;
/// Many protocols like GIOP have a major and minor version
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index d67bd485383..d04512d81b0 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->missing_data_ == 0)
+ this->queued_data_->missing_data_bytes_ == 0)
return 1;
return 0;
@@ -31,7 +31,7 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->next_->missing_data_ == 0 &&
+ this->queued_data_->next_->missing_data_bytes_ == 0 &&
this->queued_data_->next_->more_fragments_ == 0)
return 1;
@@ -55,23 +55,30 @@ ACE_INLINE size_t
TAO_Incoming_Message_Queue::missing_data_tail (void) const
{
if (this->size_ != 0)
- return this->queued_data_->missing_data_;
+ return this->queued_data_->missing_data_bytes_;
return 0;
}
-
+#if 0
ACE_INLINE TAO_Queued_Data *
TAO_Incoming_Message_Queue::get_node (void)
{
return TAO_Queued_Data::get_queued_data ();
}
+#endif
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
+ACE_INLINE void
+TAO_Queued_Data::release (void)
+{
+ TAO_Queued_Data::release (this);
+}
+
/*static*/
ACE_INLINE void
TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb)
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 875429df5c1..2d460580d57 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -113,6 +113,7 @@ public:
virtual void init (CORBA::Octet major,
CORBA::Octet minor) = 0;
+#if 0
/// Parse the incoming messages..
virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
@@ -137,12 +138,32 @@ public:
/// @@Bala:Docu??
virtual int consolidate_fragments (TAO_Queued_Data *dqd,
const TAO_Queued_Data *sqd) = 0;
+#endif
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd) = 0;
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
+
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. Does
+ */
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0;
+
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
+
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const = 0;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 3801e66ebcc..1bb21950a6a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -23,6 +23,7 @@
#include "Notify_Handler.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"
+#include "ace/Min_Max.h"
#if !defined (__ACE_INLINE__)
# include "Transport.inl"
@@ -105,6 +106,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, incoming_message_queue_ (orb_core)
+ , uncompleted_message_ (0)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -1290,7 +1292,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
this->id ()));
}
- // First try to process messages of the head of the incoming queue.
+ // First try to process messages off the head of the incoming queue.
int retval = this->process_queue_head (rh);
if (retval <= 0)
@@ -1335,25 +1337,27 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
+ // We'll loop trying to complete the message this number of times,
+ // and that's it.
+ unsigned int number_of_read_attempts = 2;
// Align the message block
ACE_CDR::mb_align (&message_block);
+read_from_the_connection:
size_t recv_size = 0;
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- recv_size =
- message_block.space ();
+ recv_size = message_block.space ();
}
else
{
- recv_size =
- this->messaging_object ()->header_length ();
+ recv_size = this->messaging_object ()->header_length ();
}
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.rd_ptr (),
+ ssize_t n = this->recv (message_block.wr_ptr (),
recv_size,
max_wait_time);
@@ -1371,7 +1375,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "TAO (%P|%t) - Transport[%d]::handle_input_i: "
"read %d bytes\n",
this->id (), n));
}
@@ -1379,47 +1383,381 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- // Parse the message and try consolidating the message if
- // needed.
- retval = this->parse_consolidate_messages (message_block,
- rh,
- max_wait_time);
+ if (TAO_debug_level >= 10)
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (const char *) message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
+
+ // **** BEGIN CJC PMB CHANGES ****
+ //
+ // - Does this properly handle GIOP FRAGMENT messages??
+ // @@ BALA: I doubt it.. You could get an incomplete message
+ // fragment or a complete message fragment but the message could
+ // incomplete.
+
+ // @@ BALA: How do you handle messages that didnt fit the message
+ // block here?
+ //
+ // @@ CJC: I'm not sure what you mean...If we need more space, then
+ // we allocate more.
+
+ // Check to see if we're still working to complete a message
+ if (this->uncompleted_message_)
+ {
+ // try to complete it
+
+ // on exit from this frame we have one of the following states:
+ //
+ // (a) an uncompleted message still in uncompleted_message_
+ // AND message_block is empty
+ //
+ // (b) uncompleted_message_ zero, the completed message at the
+ // tail of the incoming queue; message_block could be empty
+ // or still contain bytes
+
+ // ==> repeat
+ do
+ {
+ /*
+ * Append the "right number of bytes" to uncompleted_message_
+ */
+ // ==> right_number_of_bytes = MIN(bytes missing from
+ // uncompleted_message_, length of message_block);
+ size_t right_number_of_bytes =
+ ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
+ message_block.length () );
- if (retval <= 0)
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "trying to use %u (of %u) "
+ "bytes to complete message missing %u bytes\n",
+ this->id (),
+ right_number_of_bytes,
+ message_block.length (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+
+ // ==> append right_number_of_bytes from message_block
+ // to uncomplete_message_ & update read pointer of
+ // message_block;
+
+ // 1. we assume that uncompleted_message_.msg_block_'s
+ // wr_ptr is properly maintained
+ // 2. we presume that uncompleted_message_.msg_block was
+ // allocated with enough space to contain the *entire*
+ // expected GIOP message, so this copy shouldn't involve an
+ // additional allocation
+ this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
+ right_number_of_bytes);
+ this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
+ message_block.rd_ptr (right_number_of_bytes);
+
+ switch (this->uncompleted_message_->current_state_)
+ {
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
+ if (this->messaging_object()->check_for_valid_header (
+ *this->uncompleted_message_->msg_block_))
+ {
+ // ==> update bytes missing from uncompleted_message_
+ // with size of message from valid header;
+ this->messaging_object()->set_queued_data_from_message_header (
+ this->uncompleted_message_,
+ *this->uncompleted_message_->msg_block_);
+ // ==> change state of uncompleted_event_ to
+ // WAITING_TO_COMPLETE_PAYLOAD;
+ this->uncompleted_message_->current_state_ =
+ TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
+
+ // ==> Resize the message block to have capacity for
+ // the rest of the incoming message
+ ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
+ mb.size (mb.size ()
+ + this->uncompleted_message_->missing_data_bytes_);
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "found a valid header in the message; "
+ "waiting for %u bytes to complete payload\n",
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+
+ // Continue the loop...(?to where?)
+ continue;
+ }
+ else
+ {
+ // What the heck will we do with a bad header? Just
+ // better to close the connection and let things
+ // re-train from there.
+ return -1;
+#if 0 // I don't think I need this clause, but I'm leaving it just in case.
+ // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes;
+ this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
+ ACE_ASSERT (this->uncompleted_message->missing_data_bytes_ > 0);
+#endif
+ }
+ break;
+
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
+ // ==> if (bytes missing from uncompleted_message_ == 0)
+ if (this->uncompleted_message_->missing_data_bytes_ == 0)
+ {
+ /*
+ * We completed the message! Hooray!
+ */
+ // ==> place uncompleted_message_ (which is now
+ // complete!) at the tail of the incoming message
+ // queue;
+
+ // ---> NOTE: whoever pulls this off the queue must delete it!
+ this->uncompleted_message_->current_state_
+ = TAO_Queued_Data::COMPLETED;
+ // @@CJC NEED TO CHECK RETURN VALUE HERE!
+ this->enqueue_incoming_message (this->uncompleted_message_);
+ // zero out uncompleted_message_;
+ this->uncompleted_message_ = 0;
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "completed and queued message for processing!\n",
+ this->id ()));
+ }
+
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "still need %u bytes to complete uncompleted message.\n",
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+ }
+ break;
+
+ default:
+ // @@CJC What do we do here?!
+ ACE_ASSERT (! "Transport::handle_input_i: unexpected state"
+ "in uncompleted_message_");
+ }
+ }
+ // Does the order of the checks matter? In both (a) and (b),
+ // message_block is empty, but only in (b) is there no
+ // uncompleted_message_.
+ // ==> until (message_block is empty || there is no uncompleted_message_);
+ // or, rewritten in C++ looping constructs
+ // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
+ while (message_block.length() != 0 && this->uncompleted_message_);
+ }
+
+ // If, at the end, we're still waiting to complete the message,
+ // we should effectively return.
+ //
+ // @@BALA: Returning here will reduce the throughput. We shold try
+ // reading again to see if we could get more data..
+ //
+ // @@CJC: Good point; maybe we can go to the top again, but only try
+ // to read at most 2-3 times before we just go on. Obviously, if we
+ // complete the message (i.e., there's no uncompleted message), then
+ // we go on to the next step.
+ if (this->uncompleted_message_)
+ {
+ if (number_of_read_attempts--)
+ {
+ // We try to read again just in case more data arrived while
+ // we were doing the stuff above. This way, we can increase
+ // throughput without much of a penalty.
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "still have an uncompleted message; "
+ "will try %d more times before letting "
+ "somebody else have a chance.\n",
+ this->id (),
+ number_of_read_attempts));
+ }
+
+ // Yes, this uses the much-maligned "goto"; get over it. TCP
+ // implementations use them, too, for just the same sort of
+ // situation.
+ goto read_from_the_connection;
+ }
+ else
+ {
+ // The queue should be empty because it should have been processed
+ // above. But I wonder if I should put a check in here anyway.
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "giving up reading for now and returning "
+ "with incoming queue length = %d\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length ()));
+ if (this->uncompleted_message_)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "missing bytes from uncompleted message = %u\n",
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+ return 1;
+ }
+ }
+
+ // At this point, there should be nothing in uncompleted_message_.
+ // We now need to chop up the bytes in message_block and store any
+ // complete messages in the incoming message queue.
+ //
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
{
- if (retval == -1 && TAO_debug_level > 0)
+ TAO_Queued_Data *complete_message = 0;
+ do
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
- "error while parsing and consolidating\n",
- this->id ()));
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ")
+ ACE_TEXT("extracting complete messages\n")));
+ ACE_HEX_DUMP ((LM_DEBUG,
+ message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT (" from this message buffer")));
+ }
+ complete_message =
+ TAO_Queued_Data::make_completed_message (
+ message_block, *this->messaging_object ());
+ if (complete_message)
+ this->enqueue_incoming_message (complete_message);
}
- return retval;
+ while (complete_message != 0);
+ // On exit from this frame we have one of the following states:
+ // (a) message_block is empty
+ // (b) message_block contains bytes from a partial message
}
- // Make a node of the message block..
- TAO_Queued_Data qd (&message_block,
- this->orb_core_->transport_message_buffer_allocator ());
+ // If, at this point, there's still data in message_block, it's
+ // an incomplete message. Therefore, we stuff it into the
+ // uncompleted_message_ and clear out message_block.
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
+ {
+ // duplicate message_block remainder into this->uncompleted_message_
+ ACE_ASSERT (this->uncompleted_message_ == 0);
+ ACE_Message_Block *queueable_mb = message_block.clone ();
+ ACE_ASSERT (queueable_mb != 0);
+ message_block.rd_ptr (message_block.length ());
+ this->uncompleted_message_ =
+ TAO_Queued_Data::make_uncompleted_message (queueable_mb,
+ *this->messaging_object ());
+ }
- // Extract the data for the node..
- this->messaging_object ()->get_message_data (&qd);
+ // We should have consumed ALL the bytes by now.
+ ACE_ASSERT (message_block.length () == 0);
- // Check whether the message was fragmented..
- if (qd.more_fragments_ ||
- (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the node that we have as the node is on stack..
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
+ // **** END CJC PMG CHANGES ****
- return this->consolidate_fragments (nqd, rh);
+ // Process the message
+ return this->process_queue_head (rh);
+}
+
+int
+TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
+{
+ // some notes...
+ //
+ // will probably need to modify process_queue_head to iterate to
+ // find the first unfragmented message in the queue
+
+ return this->incoming_message_queue_.enqueue_tail (queueable_message);
+
+#if 0
+ //
+ // This is the code that will deal with fragments
+ //
+
+ // If the message doesn't indicate that more fragments are on
+ // the way, then we just queue it up at the tail and go on.
+ if (! queueable_message->more_fragments_)
+ // NOTE: we might have to capture the return value from enqueue_tail()
+ // and convert it to an appropriate return value for this method.
+ return this->incoming_message_queue_.enqueue_tail (queueable_message);
+
+ // At this point we know that queueable_message is a fragment. What
+ // we don't know is what kind of fragment it is, or how to deal with
+ // it.
+ //
+ // Let's just explore the mechanics of how things would be done for
+ // the case of GIOP. When we really do this, we'll have to go a bit
+ // more abstract.
+ switch (queueable_message->giop_version)
+ {
+ case 1.0:
+ // Fragments aren't supported in 1.0. This is an error and
+ // we should reject it somehow. What do we do here? Do we throw
+ // an exception to the receiving side? Do we throw an exception
+ // to the sending side?
+ //
+ // At the very least, we need to log the fact that we received
+ // nonsense.
+ LOG_AND_RETURN_AN_ERROR;
+ break;
+
+ case 1.1:
+ // In 1.1, fragments kinda suck because they don't have they're
+ // own message-specific header. Therefore, we have to do the
+ // following:
+ fragment_message = search the incoming queue for a GIOP 1.1 message that has the more_fragments_ bit set;
+ add this queueable_message->msg_block_ as a continuation block to the last block in fragment_message continuation chain;
+
+ // One note is that TAO_Queued_Data contains version numbers,
+ // but doesn't indicate the actual protocol to which those
+ // version numbers refer. That's not a problem, though, because
+ // instances of TAO_Queued_Data live in a queue, and that queue
+ // lives in a particular instance of a Transport, and the
+ // transport instance has an association with a particular
+ // messaging_object. The concrete messaging object embodies a
+ // messaging protocol, and must cover all versions of that
+ // protocol. Therefore, we just need to cover the bases of all
+ // versions of that one protocol.
+ break;
+
+ case 1.2:
+ // In GIOP 1.2, we get a little more context. There's a
+ // FRAGMENT message-specific header, and inside that is the
+ // request id with which the fragment is associated.
+ //
+ // What this means is that we'll have to add request_id_ as one
+ // of the TOA_Queued_Data public data members.
+ fragment_id = extract id from fragment header;
+ fragment_message = search incoming_queue for a GIOP message with request_id==fragment_id;
+ // Check for bizarre error conditions first
+ if (! fragment_message->more_fragments_)
+ LOG_AND_RETURN_AN_ERROR;
+ if (fragment_message->GIOP_version != 1.2)
+ LOG_AND_RETURN_AN_ERROR;
+
+ add this queueable_message->msg_block_ as a continuation block to the last block in fragment_message continuation chain;
+ break;
}
- // Process the message
- return this->process_parsed_messages (&qd,
- rh);
+ return 0;
+#endif
}
+#if 0
int
TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
TAO_Resume_Handle &rh,
@@ -1902,6 +2240,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
return this->process_queue_head (rh);
}
+#endif
int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
@@ -2013,6 +2352,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
+#if 0
TAO_Queued_Data *
TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
{
@@ -2063,6 +2403,7 @@ TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
return qd;
}
+#endif
int
TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 31ebe4e18a9..dc542568f9b 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -649,6 +649,7 @@ protected:
*/
virtual int register_handler_i (void) = 0;
+#if 0
/// Called by the handle_input_i (). This method is used to parse
/// message read by the handle_input_i () call. It also decides
/// whether the message needs consolidation before processing.
@@ -696,16 +697,14 @@ protected:
/// them, atleast at the head of them.
int consolidate_extra_messages (ACE_Message_Block &incoming,
TAO_Resume_Handle &rh);
+#endif
/// Process the message by sending it to the higher layers of the
/// ORB.
int process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh);
- /// Make a queued data from the <incoming> message block
- TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming);
-
- /// Implement send_message_shared() assuming the handler_lock_ is
+ /// Implement send_message_shared() assuming the handler_lock_ is
/// held.
int send_message_shared_i (TAO_Stub *stub,
int message_semantics,
@@ -763,6 +762,34 @@ public:
int handle_timeout (const ACE_Time_Value &current_time,
const void* act);
+
+ /*!
+ \name Incoming Queue Methods
+ */
+ //@{
+ /*!
+ \brief Queue up \a queueable_message as a completely-received incoming message.
+
+ This method queues up a completely-received queueable GIOP message
+ (i.e., it must be dynamically-allocated). It does not assemble a
+ complete GIOP message; that should be done prior to calling this
+ message, and is currently done in handle_input_i.
+
+ This does, however, assure that a completely-received GIOP
+ FRAGMENT gets associated with any previously-received related
+ fragments. It does this through collaboration with the messaging
+ object (since fragment reassembly is protocol specific).
+
+ \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received
+
+ \return 0 successfully enqueued \a queueable_message
+
+ \return -1 failed to enqueue \a queueable_message
+ \todo How do we indicate \em what may have failed?
+ */
+ int enqueue_incoming_message (TAO_Queued_Data *queueable_message);
+ //@}
+
private:
/// Helper method that returns the Transport Cache Manager.
@@ -939,9 +966,12 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
- /// Queue of the incoming messages..
+ /// Queue of the completely-received incoming messages..
TAO_Incoming_Message_Queue incoming_message_queue_;
+ /// Place to hold a partially-received (waiting-to-be-completed) message
+ TAO_Queued_Data * uncompleted_message_;
+
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;
diff --git a/TAO/tests/AMI/run_test.pl b/TAO/tests/AMI/run_test.pl
index d08ef80dc23..802c5794cd9 100755
--- a/TAO/tests/AMI/run_test.pl
+++ b/TAO/tests/AMI/run_test.pl
@@ -11,7 +11,7 @@ use PerlACE::Run_Test;
$client_conf = PerlACE::LocalFile ("muxed$PerlACE::svcconf_ext");
$debug_level = '0';
-$iterations = '1';
+$iterations = '10000';
foreach $i (@ARGV) {
if ($i eq '-mux') {
diff --git a/TAO/tests/InterOp-Naming/INS_test_client.cpp b/TAO/tests/InterOp-Naming/INS_test_client.cpp
index 8c543b717c5..18b9c9596a2 100644
--- a/TAO/tests/InterOp-Naming/INS_test_client.cpp
+++ b/TAO/tests/InterOp-Naming/INS_test_client.cpp
@@ -2,12 +2,45 @@
#include "INSC.h"
+#include <tao/Messaging/Messaging.h>
+#include <tao/TimeBaseC.h>
+
+template <class T>
+void set_timeout_on_objref (CORBA::ORB_ptr orb,
+ CORBA::PolicyType pt,
+ const ACE_Time_Value &timeout,
+ typename T::_var_type &obj,
+ T*)
+{
+ // TimeBase::TimeT rrtt = (timeout.msec() * 1000 + timeout.sec()) * 10000000;
+ TimeBase::TimeT rrtt = timeout.msec() * 10000;
+ CORBA::Any rrtt_any;
+ rrtt_any <<= rrtt;
+
+ CORBA::PolicyList pl;
+ pl.length (1);
+ pl[0] = orb->create_policy (pt, rrtt_any);
+
+ try
+ {
+ CORBA::Object_var tmp = obj->_set_policy_overrides (pl, CORBA::SET_OVERRIDE);
+ obj = T::_unchecked_narrow (tmp.in ());
+ }
+ catch (...)
+ {
+ obj = T::_nil ();
+ }
+
+ pl[0]->destroy ();
+}
int
main (int argc, char *argv[])
{
int i = 0;
+ ACE_LOG_MSG->set_flags (ACE_Log_Msg::VERBOSE_LITE);
+
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
@@ -73,15 +106,42 @@ main (int argc, char *argv[])
"given name.\n"),
-1);
+ const ACE_Time_Value RTT (20, 500000);
+ CORBA::Object *dummy = 0;
+#if 1
+ set_timeout_on_objref (orb.in (),
+ Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
+ RTT, objref, dummy);
+ INS_var server = INS::_narrow (objref.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+#else
+ set_timeout_on_objref (orb.in (),
+ TAO::CONNECTION_TIMEOUT_POLICY_TYPE,
+ RTT, objref, dummy);
+
INS_var server = INS::_narrow (objref.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
+#endif
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) TRYING TO NARROW OBJREF AGAIN.\n"));
+ INS_var server2 = INS::_narrow (objref.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG,
"Resolved IOR for %s : %s\n",
argv[i],
orb->object_to_string (server.in ())));
+#if 0
+ INS *d2;
+ set_timeout_on_objref (orb.in (),
+ Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
+ RTT, server, d2);
+#endif
+
CORBA::String_var test_ins_result =
server->test_ins (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
diff --git a/TAO/tests/InterOp-Naming/Makefile b/TAO/tests/InterOp-Naming/Makefile
index b2d07af51a9..639bc862b56 100644
--- a/TAO/tests/InterOp-Naming/Makefile
+++ b/TAO/tests/InterOp-Naming/Makefile
@@ -61,7 +61,7 @@ INS_test_server:$(addprefix $(VDIR),$(SIMPLE_SERVER_OBJS))
$(LINK.cc) $(LDFLAGS) -o $@ $^ -lTAO_IORTable $(TAO_SRVR_LIBS) $(POSTLINK)
INS_test_client:$(addprefix $(VDIR),$(SIMPLE_CLIENT_OBJS))
- $(LINK.cc) $(LDFLAGS) -o $@ $^ $(TAO_CLNT_LIBS) $(POSTLINK)
+ $(LINK.cc) $(LDFLAGS) -o $@ $^ -lTAO_Messaging $(TAO_CLNT_LIBS) $(POSTLINK)
.PRECIOUS: $(foreach ext, $(IDL_EXT), INS$(ext))