summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2003-12-14 16:03:49 +0000
committerbala <balanatarajan@users.noreply.github.com>2003-12-14 16:03:49 +0000
commita7e10f0b431dc0e2006196751782cd8fa40b16c7 (patch)
tree7422441092fdcc31025f06c8f064c7209747eb21
parent20ded8e045eb9993cfc20bf788d70329c11feb62 (diff)
downloadATCD-a7e10f0b431dc0e2006196751782cd8fa40b16c7.tar.gz
ChangeLogTag:Sun Dec 14 09:56:37 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
-rw-r--r--TAO/ChangeLog29
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp353
-rw-r--r--TAO/tao/GIOP_Message_Base.h52
-rw-r--r--TAO/tao/GIOP_Message_Base.i8
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_Impl.inl23
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp57
-rw-r--r--TAO/tao/GIOP_Message_Lite.h45
-rw-r--r--TAO/tao/GIOP_Message_State.cpp239
-rw-r--r--TAO/tao/GIOP_Message_State.h44
-rw-r--r--TAO/tao/GIOP_Message_State.inl31
-rw-r--r--TAO/tao/IIOP_Transport.cpp2
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp459
-rw-r--r--TAO/tao/Incoming_Message_Queue.h162
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl23
-rw-r--r--TAO/tao/Pluggable_Messaging.h44
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp229
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp2
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.h4
-rw-r--r--TAO/tao/Transport.cpp1208
-rw-r--r--TAO/tao/Transport.h104
20 files changed, 1357 insertions, 1761 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 3cc88f2f838..1a42e91533a 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,32 @@
+Sun Dec 14 09:56:37 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
+
+ * tao/GIOP_Message_Base.cpp:
+ * tao/GIOP_Message_Base.h:
+ * tao/GIOP_Message_Base.i:
+ * tao/GIOP_Message_Generator_Parser_Impl.inl:
+ * tao/GIOP_Message_Lite.cpp:
+ * tao/GIOP_Message_Lite.h:
+ * tao/GIOP_Message_State.cpp:
+ * tao/GIOP_Message_State.h:
+ * tao/GIOP_Message_State.inl:
+ * tao/IIOP_Transport.cpp:
+ * tao/Incoming_Message_Queue.cpp:
+ * tao/Incoming_Message_Queue.h:
+ * tao/Incoming_Message_Queue.inl:
+ * tao/Pluggable_Messaging.h:
+ * tao/Transport.cpp:
+ * tao/Transport.h:
+ * tao/Strategies/DIOP_Transport.cpp:
+ * tao/Strategies/SHMIOP_Transport.cpp:
+ * tao/Strategies/SHMIOP_Transport.h:
+
+ Reverting the change "Wed Dec 10 13:58:41 2003 Chris Cleeland
+ <cleeland_c@ociweb.com>" for PMB fixes. The fixes were right,
+ but it slowed down the ORB considerably due to the two
+ allocations along the critical path. We will try to get these
+ fixes in later once we have a way to get around the
+ allocations.
+
Sun Dec 14 08:47:03 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
* tests/Bug_1639_Regression/test.mpc:
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 7ada6c903ee..76aeeaee51e 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
, message_state_ (orb_core,
- this)
+ this)
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -295,7 +295,6 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (
TAO_GIOP_Message_State &msg_state)
- const
{
// Convert to the right type of Pluggable Messaging message type.
@@ -330,6 +329,264 @@ TAO_GIOP_Message_Base::message_type (
}
int
+TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
+{
+
+ if (this->message_state_.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+ssize_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
+{
+ // Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_state_.message_size ();
+
+ size_t len = incoming.length ();
+
+ // If we have too many messages or if we have less than even a size
+ // of the GIOP header then ..
+ if (len > msg_size ||
+ len < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ return -1;
+ }
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
+}
+
+
+int
+TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd)
+{
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ if (incoming.length () > 0)
+ {
+ // Make a node which has a message block of the size of
+ // MESSAGE_HEADER_LEN.
+ qd =
+ this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ qd->missing_data_ = -1;
+ }
+ return 0;
+ }
+
+ if (state.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ size_t copying_len = state.message_size ();
+
+ qd = this->make_queued_data (copying_len);
+
+ if (copying_len > incoming.length ())
+ {
+ qd->missing_data_ =
+ copying_len - incoming.length ();
+
+ copying_len = incoming.length ();
+ }
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copying_len);
+
+ incoming.rd_ptr (copying_len);
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ return 1;
+}
+
+int
+TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming)
+{
+ // Look to see whether we had atleast parsed the GIOP header ...
+ if (qd->missing_data_ == -1)
+ {
+ // The data length that has been stuck in there during the last
+ // read ....
+ size_t len =
+ qd->msg_block_->length ();
+
+ // We know that we would have space for
+ // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
+ // from the <incoming> into the message block in <qd>
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ // Move the rd_ptr () in the incoming message block..
+ incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ // Parse the message header now...
+ if (state.parse_message_header (*qd->msg_block_) == -1)
+ return -1;
+
+ // Now grow the message block so that we can copy the rest of
+ // the data...
+ if (qd->msg_block_->space () < state.message_size ())
+ {
+ ACE_CDR::grow (qd->msg_block_,
+ state.message_size ());
+ }
+
+ // Copy the pay load..
+ // Calculate the bytes that needs to be copied in the queue...
+ size_t copy_len =
+ state.payload_size ();
+
+ // If the data that needs to be copied is more than that is
+ // available to us ..
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+ else
+ {
+ qd->missing_data_ = 0;
+ }
+
+ // ..now we are set to copy the right amount of data to the
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ incoming.rd_ptr (copy_len);
+
+ // Get the other details...
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ }
+ else
+ {
+ // @@todo: Need to abstract this out to a seperate method...
+ size_t copy_len = qd->missing_data_;
+
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+
+ // Copy the right amount of data in to the node...
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ qd->msg_block_->rd_ptr (copy_len);
+
+ }
+
+
+ return 0;
+}
+
+
+int
+TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd)
+{
+ if (dqd->byte_order_ != sqd->byte_order_
+ || dqd->major_version_ != sqd->major_version_
+ || dqd->minor_version_ != sqd->minor_version_)
+ {
+ // Yes, print it out in all debug levels!. This is an error by
+ // CORBA 2.4 spec
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
+ ACE_TEXT ("different GIOP versions or byte order\n")));
+ return -1;
+ }
+
+ // Skip the header in the incoming message
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ // If we have a fragment header skip the header length too..
+ if (sqd->minor_version_ == 2 &&
+ sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+
+ // Get the length of the incoming message block..
+ size_t incoming_length =
+ sqd->msg_block_->length ();
+
+ // Increase the size of the destination message block if we need
+ // to.
+ ACE_Message_Block *mb =
+ dqd->msg_block_;
+
+ // Check space before growing.
+ if (mb->space () < incoming_length)
+ {
+ ACE_CDR::grow (mb,
+ mb->length () + incoming_length);
+ }
+
+ // Copy the data
+ dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
+ incoming_length);
+ return 0;
+}
+
+void
+TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
+{
+ // Get the message information
+ qd->byte_order_ =
+ this->message_state_.byte_order_;
+ qd->major_version_ =
+ this->message_state_.giop_version_.major;
+ qd->minor_version_ =
+ this->message_state_.giop_version_.minor;
+
+ //qd->more_fragments_ = this->message_state_.more_fragments_;
+
+ if (this->message_state_.more_fragments_)
+ qd->more_fragments_ = 1;
+ else
+ qd->more_fragments_ = 0;
+
+ qd->msg_type_=
+ this->message_type (this->message_state_);
+
+ // Reset the message_state
+ this->message_state_.reset ();
+}
+
+int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -507,13 +764,13 @@ TAO_GIOP_Message_Base::process_reply_message (
// Should be taken care by the state specific parsing
retval =
generator_parser->parse_reply (input_cdr,
- params);
+ params);
break;
case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
retval =
generator_parser->parse_locate_reply (input_cdr,
- params);
+ params);
break;
default:
retval = -1;
@@ -641,9 +898,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
parse_error =
parser->parse_request_header (request);
- TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
- if (csm)
- csm->process_service_context(request);
+ request.orb_core()->codeset_manager()->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -843,9 +1098,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
}
TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
- locate_request.object_key ().length (),
- locate_request.object_key ().get_buffer (),
- 0);
+ locate_request.object_key ().length (),
+ locate_request.object_key ().get_buffer (),
+ 0);
// Set it to an error state
parse_error = 1;
@@ -1069,7 +1324,6 @@ TAO_GIOP_Message_Base::set_state (
}
-#if 0
// Server sends an "I'm shutting down now, any requests you've sent me
// can be retried" message to the server. The message is prefab, for
// simplicity.
@@ -1165,7 +1419,7 @@ TAO_GIOP_Message_Base::
transport-> id ()));
}
-#endif
+
int
TAO_GIOP_Message_Base::send_reply_exception (
@@ -1322,49 +1576,44 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
-void
-TAO_GIOP_Message_Base::set_queued_data_from_message_header (
- TAO_Queued_Data *qd,
- const ACE_Message_Block &mb
- ) const
+TAO_Queued_Data *
+TAO_GIOP_Message_Base::make_queued_data (size_t sz)
{
- // @@CJC: Try leaving out the declaration for this->message_state_
- // and see what pukes. I don't think we need it any more.
- TAO_GIOP_Message_State state;
- if (state.take_values_from_message_block (mb) == -1)
- {
- // what the heck do we do here?!
- qd->current_state_ = TAO_Queued_Data::INVALID;
- return;
- }
+ // Get a node for the queue..
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data (
+ this->orb_core_->transport_message_buffer_allocator ());
- // It'd be nice to have an abstract base for GIOP_Message_State
- // so that there could just be a line like:
- // qd->take_values_from (state);
- // Get the message information
- qd->byte_order_ = state.byte_order ();
- qd->major_version_ = state.giop_version ().major;
- qd->minor_version_ = state.giop_version ().minor;
- qd->more_fragments_ = state.more_fragments () ? 1 : 0;
- qd->request_id_ = state.request_id_;
- qd->msg_type_= message_type (state);
- qd->missing_data_bytes_ = state.payload_size ();
+ // @@todo: We have a similar method in Transport.cpp. Need to see how
+ // we can factor them out..
+ // Make a datablock for the size requested + something. The
+ // "something" is required because we are going to align the data
+ // block in the message block. During alignment we could loose some
+ // bytes. As we may not know how many bytes will be lost, we will
+ // allocate ACE_CDR::MAX_ALIGNMENT extra.
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (sz +
+ ACE_CDR::MAX_ALIGNMENT);
+
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
+
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ ACE_Message_Block *new_mb = mb.duplicate ();
+
+ ACE_CDR::mb_align (new_mb);
+
+ qd->msg_block_ = new_mb;
+
+
+ return qd;
}
-int
-TAO_GIOP_Message_Base::check_for_valid_header (
- const ACE_Message_Block &mb
- ) const
+size_t
+TAO_GIOP_Message_Base::header_length (void) const
{
- // NOTE! We don't hardcode the length of the header b/c header_length should
- // be eligible for inlining by pretty much any compiler, and it should return
- // a constant. The rest of this method is hard-coded and hand-optimized because
- // this method gets called A LOT.
- if (mb.length () < this->header_length ())
- return -1;
-
- // Is finding that it's the right length and the magic bytes present
- // enough to declare it a valid header? I think so...
- register const char* h = mb.rd_ptr ();
- return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0;
+ return TAO_GIOP_MESSAGE_HEADER_LEN;
}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 7bf61e340f7..461673bd359 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -94,37 +94,35 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
- /// Process the request message that we have received on the
- /// connection
- virtual int process_request_message (TAO_Transport *transport,
- TAO_Queued_Data *qd);
-
- /*!
- \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block);
- Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
- see if they look like the beginning of a message. If \code mb does not
- contain less than \code header_length() bytes, this method cannot make a
- complete evaluation, and returns a commensurate value.
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &message_block);
- \return 1 \code header_length() bytes found, and constitute a valid header
- \return 0 \code header_length() bytes found, and do not constitute a valid header
- \return -1 not enough bytes available to make a determination of header validity
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
*/
- virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd);
- /*!
- \brief Set fields in \param qd based on values derived from \param mb.
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming);
- This function sets fields in \param qd based on values derived
- from \param mb. It assumes that if the length of \param mb is
- enough to hold a header, then the data in there can be trusted to
- make sense.
- */
- virtual void set_queued_data_from_message_header (
- TAO_Queued_Data *,
- const ACE_Message_Block &mb) const;
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd);
+ /// @@Bala:Docu??
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd);
+
+ /// Process the request message that we have received on the
+ /// connection
+ virtual int process_request_message (TAO_Transport *transport,
+ TAO_Queued_Data *qd);
/// Parse the reply message that we received and return the reply
@@ -176,7 +174,7 @@ protected:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state) const;
+ TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
private:
@@ -199,12 +197,10 @@ private:
/// Send error messages
int send_error (TAO_Transport *transport);
-#if 0
/// Close a connection, first sending GIOP::CloseConnection.
void send_close_connection (const TAO_GIOP_Message_Version &version,
TAO_Transport *transport,
void *ctx);
-#endif
/// We must send a LocateReply through <transport>, this request
/// resulted in some kind of exception.
diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i
index f5e39d9aa54..a589447a413 100644
--- a/TAO/tao/GIOP_Message_Base.i
+++ b/TAO/tao/GIOP_Message_Base.i
@@ -4,11 +4,3 @@
//
// GIOP_Message_Base
//
-
-
-ACE_INLINE size_t
-TAO_GIOP_Message_Base::header_length (void) const
-{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
-}
-
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
index 47e4730befb..18bb7936ffd 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
+++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
@@ -5,24 +5,9 @@ TAO_GIOP_Message_Generator_Parser_Impl::
check_revision (CORBA::Octet incoming_major,
CORBA::Octet incoming_minor)
{
- CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor;
- CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR;
-
- CORBA::Boolean ret = 0;
-
- // If it's greater than the max, we know it's not allowed.
- if (version_as_whole_num > max_allowable_version)
+ if (incoming_major > TAO_DEF_GIOP_MAJOR ||
+ incoming_minor > TAO_DEF_GIOP_MINOR)
return 0;
-
- // If it's less than the max, though, we still have to check for
- // each explicit version and only allow the ones we know work.
- switch (version_as_whole_num)
- {
- case 0x0100:
- case 0x0101:
- case 0x0102:
- ret = 1;
- }
-
- return ret;
+
+ return 1;
}
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index 3ff0683729a..462266dd9bf 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -243,7 +243,6 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
}
-#if 0
int
TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
{
@@ -502,7 +501,6 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/,
// We dont know what fragments are???
return -1;
}
-#endif
int
TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
@@ -730,9 +728,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
parse_error =
this->parse_request_header (request);
- TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
- if (csm)
- csm->process_service_context(request);
+ request.orb_core()->codeset_manager()->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -1630,6 +1626,38 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
}
}
+TAO_Queued_Data *
+TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
+{
+ // Get a node for the queue..
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data ();
+
+ // Make a datablock for the size requested + something. The
+ // "something" is required because we are going to align the data
+ // block in the message block. During alignment we could loose some
+ // bytes. As we may not know how many bytes will be lost, we will
+ // allocate ACE_CDR::MAX_ALIGNMENT extra.
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (sz +
+ ACE_CDR::MAX_ALIGNMENT);
+
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
+
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ ACE_Message_Block *new_mb = mb.duplicate ();
+
+ ACE_CDR::mb_align (new_mb);
+
+ qd->msg_block_ = new_mb;
+
+ return qd;
+}
+
int
TAO_GIOP_Message_Lite::generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
@@ -1651,22 +1679,3 @@ TAO_GIOP_Message_Lite::header_length (void) const
{
return TAO_GIOP_LITE_HEADER_LEN;
}
-
-void
-TAO_GIOP_Message_Lite::set_queued_data_from_message_header (
- TAO_Queued_Data *qd,
- const ACE_Message_Block &mb
- ) const
-{
- ACE_UNUSED_ARG (qd);
- ACE_UNUSED_ARG (mb);
-}
-
-int
-TAO_GIOP_Message_Lite::check_for_valid_header (
- const ACE_Message_Block &mb
- ) const
-{
- ACE_UNUSED_ARG (mb);
- return 0;
-}
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index bfc7f5f01ad..b10ef17982c 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -88,11 +88,8 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
- /// Process the request message that we have received on the
- /// connection
- virtual int process_request_message (TAO_Transport *transport,
- TAO_Queued_Data *qd);
-
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block);
/// Get the message type. The return value would be one of the
/// following:
@@ -102,25 +99,33 @@ public:
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
TAO_Pluggable_Message_Type message_type (void);
- /*!
- \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
- Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
- see if they look like the beginning of a message. Does
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &message_block);
+
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
*/
- virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd);
- /*!
- \brief Set fields in \param qd based on values derived from \param mb.
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming);
- This function sets fields in \param qd based on values derived
- from \param mb. It assumes that if the length of \param mb is
- enough to hold a header, then the data in there can be trusted to
- make sense.
- */
- virtual void set_queued_data_from_message_header (
- TAO_Queued_Data *,
- const ACE_Message_Block &mb) const;
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd);
+
+ /// @@Bala: Docu???
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd);
+
+ /// Process the request message that we have received on the
+ /// connection
+ virtual int process_request_message (TAO_Transport *transport,
+ TAO_Queued_Data *qd);
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index f9c7fbeda1a..e3a3ca20bf3 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -5,65 +5,20 @@
#include "tao/GIOP_Message_Base.h"
#include "ace/Log_Msg.h"
-#include "ace/OS_NS_string.h"
#if !defined (__ACE_INLINE__)
# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
-
-class TAO_Debug_Msg_Emitter_Guard
-{
-public:
- TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg)
- : which_level_(debug_level)
- {
- if (TAO_debug_level < this->which_level_)
- {
- msg_ = 0;
- return;
- }
-
- this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ];
- ACE_OS::strcpy (this->msg_, msg);
- ACE_OS::strcat (this->msg_, " begin\n");
-
- if (TAO_debug_level >= this->which_level_)
- {
- ACE_DEBUG ((LM_DEBUG, this->msg_ ));
- }
- }
-
- ~TAO_Debug_Msg_Emitter_Guard ()
- {
- if (this->msg_)
- {
- if (TAO_debug_level >= this->which_level_)
- {
- char* begin_start =
- this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1;
- ACE_OS::strcpy (begin_start, " end\n");
- ACE_DEBUG ((LM_DEBUG, this->msg_));
- }
-
- delete[] this->msg_;
- }
- }
-
-private:
- static const int MAGIC_LENGTH;
- unsigned int which_level_;
- char* msg_;
-};
-
-const int TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH = 8; // " begin\n" + \000
-
-ACE_RCSID(tao, GIOP_Message_State, "$Id$")
+ACE_RCSID (tao,
+ GIOP_Message_State,
+ "$Id$")
TAO_GIOP_Message_State::TAO_GIOP_Message_State (
TAO_ORB_Core * /*orb_core*/,
- TAO_GIOP_Message_Base * /*base*/)
- : giop_version_ (TAO_DEF_GIOP_MAJOR,
+ TAO_GIOP_Message_Base *base)
+ : base_ (base),
+ giop_version_ (TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR),
byte_order_ (0),
message_type_ (0),
@@ -74,79 +29,125 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State (
{
}
-// This doesn't check the message block's length, so that means that
-// the *caller* needs to do that first.
+
int
-TAO_GIOP_Message_State::take_values_from_message_block (
- const ACE_Message_Block& mb
- )
+TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
{
- const char* buf = mb.rd_ptr ();
+ if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming) == -1)
+ return -1;
+ }
- // Get the version information
- if (this->set_version_info_from_buffer (buf) == -1)
+ return 0;
+}
+
+int
+TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
+{
+ if (TAO_debug_level > 8)
{
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n"
+ ));
}
- // Get the byte order information...
- if (this->set_byte_order_info_from_buffer (buf) == -1)
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
+
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
{
return -1;
}
+ // Get the version information
+ if (this->get_version_info (buf) == -1)
+ return -1;
+
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf) == -1)
+ return -1;
+
// Get the message type
this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
- // Get the size of the message..
- this->set_payload_size_from_buffer (buf);
- // Get the request id
- this->parse_fragment_header (buf, mb.length ());
+ // Get the size of the message..
+ this->get_payload_size (buf);
if (this->message_size_ == 0)
{
- const char* msgname = 0;
-
- switch (this->message_type_)
- {
- case TAO_GIOP_MESSAGERROR:
- msgname = "GIOP_MESSAGE_ERROR"; break;
- case TAO_GIOP_CLOSECONNECTION:
- msgname = "GIOP_CLOSE_CONNECTION"; break;
- }
- if (msgname != 0)
+ if (this->message_type_ == TAO_GIOP_MESSAGERROR)
{
if (TAO_debug_level > 0)
{
- ACE_DEBUG ((
- LM_DEBUG,
- "(%P|%t) GIOP_Message_State::take_values: %s rcv'd.\n",
- msgname
- ));
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) -"
+ "GIOP_MESSAGE_ERROR received \n"));
}
+ return 0;
}
else
{
if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) GIOP_Message_State::take_values: "
- "Message of size zero rcv'd.\n"));
- }
-
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - "
+ "Message of size zero recd. \n"));
return -1;
}
}
+
+ if (this->more_fragments_)
+ {
+ (void) this->parse_fragment_header (buf,
+ incoming.length ());
+ }
+
return 0;
}
+
+
+
+int
+TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
+{
+ // The values are hard-coded to support non-ASCII platforms.
+ if (!(buf [0] == 0x47 // 'G'
+ && buf [1] == 0x49 // 'I'
+ && buf [2] == 0x4f // 'O'
+ && buf [3] == 0x50)) // 'P'
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ")
+ ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"),
+ buf[0],
+ buf[1],
+ buf[2],
+ buf[3]));
+ return -1;
+ }
+
+ return 0;
+}
+
int
-TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
+TAO_GIOP_Message_State::get_version_info (char *buf)
{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - GIOP_Message_State::get_version_info\n"));
+ }
+
// We have a GIOP message on hand. Get its revision numbers
- CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
- CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ CORBA::Octet incoming_major =
+ buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet incoming_minor =
+ buf[TAO_GIOP_VERSION_MINOR_OFFSET];
// Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
@@ -156,9 +157,7 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - ")
- ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:")
- ACE_TEXT ("bad version <%d.%d>\n"),
+ ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"),
incoming_major, incoming_minor));
}
@@ -173,9 +172,15 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
}
int
-TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
+TAO_GIOP_Message_State::get_byte_order_info (char *buf)
{
- // Let us be specific that this is for 1.0
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n"));
+ }
+
+ // Let us be specific that this is for 1.0
if (this->giop_version_.minor == 0 &&
this->giop_version_.major == 1)
{
@@ -186,14 +191,10 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
this->byte_order_ != 1)
{
if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::"
- "get_byte_order_info, "
- "invalid byte order <%d> for version <1.0>\n",
- this->byte_order_));
- }
-
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info, "
+ "invalid byte order <%d> for version <1.0>\n",
+ this->byte_order_));
return -1;
}
}
@@ -210,15 +211,12 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
{
if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>")
- ACE_TEXT (" for version <%d %d> \n"),
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
- this->giop_version_.major,
- this->giop_version_.minor));
- }
-
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ this->giop_version_.major,
+ this->giop_version_.minor));
return -1;
}
}
@@ -226,10 +224,19 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
return 0;
}
+void
+TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
+{
+ // Move the read pointer
+ rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
+
+ this->message_size_ = this->read_ulong (rd_ptr);
+}
+
int
-TAO_GIOP_Message_State::parse_fragment_header (const char *buf,
+TAO_GIOP_Message_State::parse_fragment_header (char *buf,
size_t length)
{
size_t len =
@@ -239,7 +246,9 @@ TAO_GIOP_Message_State::parse_fragment_header (const char *buf,
// By this point we are doubly sure that we have a more or less
// valid GIOP message with a valid major revision number.
- if (this->giop_version_.minor >= 2 && length > len)
+ if (this->giop_version_.minor == 2 &&
+ this->message_type_ == TAO_GIOP_FRAGMENT &&
+ length > len)
{
// Fragmented message in GIOP 1.2 should have a fragment header
// following the GIOP header. Grab the rd_ptr to get that
@@ -254,7 +263,7 @@ TAO_GIOP_Message_State::parse_fragment_header (const char *buf,
}
CORBA::ULong
-TAO_GIOP_Message_State::read_ulong (const char *rd_ptr)
+TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
{
CORBA::ULong x = 0;
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index 9ae3db82230..f902fa03a0e 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -41,10 +41,11 @@ class TAO_Export TAO_GIOP_Message_State
public:
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0,
- TAO_GIOP_Message_Base *base = 0);
+ TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
+ TAO_GIOP_Message_Base *base);
- int take_values_from_message_block (const ACE_Message_Block& mb);
+ /// Parse the message header.
+ int parse_message_header (ACE_Message_Block &incoming);
/// Return the message size
CORBA::ULong message_size (void) const;
@@ -52,24 +53,9 @@ public:
/// Return the message size
CORBA::ULong payload_size (void) const;
- /*!
- \brief Return the byte order information.
- \return 0 big-endian
- \return 1 little-endian
- */
+ /// Return the byte order information
CORBA::Octet byte_order (void) const;
- /*!
- \brief Return GIOP version information.
- */
- const TAO_GIOP_Message_Version &giop_version () const;
-
- /// (Requests and Replys)
- CORBA::Octet more_fragments () const;
-
- /// MsgType above
- CORBA::Octet message_type () const;
-
/// Reset the state..
void reset (void);
@@ -77,30 +63,40 @@ private:
friend class TAO_GIOP_Message_Base;
+ /// Parse the message header.
+ int parse_message_header_i (ACE_Message_Block &incoming);
+
+ /// Checks for the magic word 'GIOP' in the start of the incoing
+ /// stream
+ int parse_magic_bytes (char *buf);
+
/// Extracts the version information from the incoming
/// stream. Performs a check for whether the version information is
/// right and sets the information in the <state>
- int set_version_info_from_buffer (const char *buf);
+ int get_version_info (char *buf);
/// Extracts the byte order information from the incoming
/// stream. Performs a check for whether the byte order information
/// right and sets the information in the <state>
- int set_byte_order_info_from_buffer (const char *buf);
+ int get_byte_order_info (char *buf);
/// Gets the size of the payload and set the size in the <state>
- void set_payload_size_from_buffer (const char *buf);
+ void get_payload_size (char *buf);
/// Parses the GIOP FRAGMENT_HEADER information from the incoming
/// stream.
- int parse_fragment_header (const char *buf,
+ int parse_fragment_header (char *buf,
size_t length);
/// Read the unsigned long from the buffer. The <buf> should just
/// point to the next 4 bytes data that represent the ULong
- CORBA::ULong read_ulong (const char *buf);
+ CORBA::ULong read_ulong (char *buf);
private:
+ /// The GIOP base class..
+ TAO_GIOP_Message_Base *base_;
+
// GIOP version information..
TAO_GIOP_Message_Version giop_version_;
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
index 80d421c7340..fe076bee689 100644
--- a/TAO/tao/GIOP_Message_State.inl
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -33,29 +33,22 @@ TAO_GIOP_Message_State::reset (void)
this->missing_data_ = 0;
}
-ACE_INLINE const TAO_GIOP_Message_Version &
-TAO_GIOP_Message_State::giop_version () const
+#if 0
+ACE_INLINE int
+TAO_GIOP_Message_State::message_fragmented (void)
{
- return this->giop_version_;
-}
+ if (this->more_fragments)
+ return 1;
-ACE_INLINE CORBA::Octet
-TAO_GIOP_Message_State::more_fragments () const
-{
- return this->more_fragments_;
+ return 0;
}
-ACE_INLINE CORBA::Octet
-TAO_GIOP_Message_State::message_type () const
-{
- return this->message_type_;
-}
-ACE_INLINE void
-TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr)
-{
- // Move the read pointer
- rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- this->message_size_ = this->read_ulong (rd_ptr);
+ACE_INLINE CORBA::Boolean
+TAO_GIOP_Message_State::header_received (void) const
+{
+ return this->message_size != 0;
}
+
+#endif
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 25b16308ff1..1d802aa25c7 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -95,7 +95,7 @@ TAO_IIOP_Transport::recv (char *buf,
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ")
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv_i, ")
ACE_TEXT ("read failure - %m\n"),
this->id ()));
}
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index a510be13f4b..14970106b50 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,8 +1,7 @@
#include "Incoming_Message_Queue.h"
-#include "Pluggable_Messaging.h"
#include "debug.h"
-#include "ace/Malloc_T.h"
-#include "ace/Message_Block.h"
+
+#include "ace/Log_Msg.h"
#if !defined (__ACE_INLINE__)
# include "Incoming_Message_Queue.inl"
@@ -13,7 +12,7 @@ ACE_RCSID (tao,
"$Id$")
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
- : last_added_ (0),
+ : queued_data_ (0),
size_ (0),
orb_core_ (orb_core)
{
@@ -43,21 +42,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if (block.length () <= this->last_added_->missing_data_bytes_)
+ if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
{
n = block.length ();
}
else
{
- n = this->last_added_->missing_data_bytes_;
+ n = this->queued_data_->missing_data_;
}
// Do the copy
- this->last_added_->msg_block_->copy (block.rd_ptr (),
+ this->queued_data_->msg_block_->copy (block.rd_ptr (),
n);
// Decerement the missing data
- this->last_added_->missing_data_bytes_ -= n;
+ this->queued_data_->missing_data_ -= n;
}
return n;
@@ -66,20 +65,17 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
TAO_Queued_Data *
TAO_Incoming_Message_Queue::dequeue_head (void)
{
- if (this->size_ == 0)
- return 0;
-
// Get the node on the head of the queue...
- TAO_Queued_Data *head = this->last_added_->next_;
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
// Reset the head node..
- this->last_added_->next_ = head->next_;
-
- // Decrease the size and reset last_added_ if empty
- if (--this->size_ == 0)
- this->last_added_ = 0;
+ this->queued_data_->next_ = tmp->next_;
+
+ // Decrease the size
+ --this->size_;
- return head;
+ return tmp;
}
TAO_Queued_Data *
@@ -90,412 +86,95 @@ TAO_Incoming_Message_Queue::dequeue_tail (void)
return 0;
// Get the node on the head of the queue...
- TAO_Queued_Data *head =
- this->last_added_->next_;
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
- while (head->next_ != this->last_added_)
+ while (tmp->next_ != this->queued_data_)
{
- head = head->next_;
+ tmp = tmp->next_;
}
// Put the head in tmp.
- head->next_ = this->last_added_->next_;
+ tmp->next_ = this->queued_data_->next_;
- TAO_Queued_Data *ret_qd = this->last_added_;
+ TAO_Queued_Data *ret_qd = this->queued_data_;
- this->last_added_ = head;
+ this->queued_data_ = tmp;
// Decrease the size
- if (--this->size_ == 0)
- this->last_added_ = 0;
+ --this->size_;
return ret_qd;
}
+
int
TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
{
if (this->size_ == 0)
{
- this->last_added_ = nd;
- this->last_added_->next_ = this->last_added_;
+ this->queued_data_ = nd;
+ this->queued_data_->next_ = this->queued_data_;
}
else
{
- nd->next_ = this->last_added_->next_;
- this->last_added_->next_ = nd;
- this->last_added_ = nd;
+ nd->next_ = this->queued_data_->next_;
+ this->queued_data_->next_ = nd;
+ this->queued_data_ = nd;
}
++ this->size_;
return 0;
}
-TAO_Queued_Data *
-TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major,
- CORBA::Octet minor) const
-{
- TAO_Queued_Data *found = 0;
- if (this->last_added_ != 0)
- {
- TAO_Queued_Data *qd = this->last_added_->next_;
-
- do {
- if (qd->more_fragments_ &&
- qd->major_version_ == major && qd->minor_version_ == minor)
- {
- found = qd;
- }
- else
- {
- qd = qd->next_;
- }
- } while (found == 0 && qd != this->last_added_->next_);
- }
-
- return found;
-}
-
-TAO_Queued_Data *
-TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const
-{
- TAO_Queued_Data *found = 0;
- if (this->last_added_ != 0)
- {
- TAO_Queued_Data *qd = this->last_added_->next_;
-
- do {
- if (qd->more_fragments_ && qd->request_id_ == request_id)
- {
- found = qd;
- }
- else
- {
- qd = qd->next_;
- }
- } while (found == 0 && qd != this->last_added_->next_);
- }
-
- return found;
-}
-
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
- : msg_block_ (0)
- , current_state_ (INVALID)
- , missing_data_bytes_ (0)
- , byte_order_ (0)
- , major_version_ (0)
- , minor_version_ (0)
- , more_fragments_ (0)
- , request_id_ (0)
- , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- , next_ (0)
- , allocator_ (alloc)
+ : msg_block_ (0),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
ACE_Allocator *alloc)
- : msg_block_ (mb)
- , current_state_ (INVALID)
- , missing_data_bytes_ (0)
- , byte_order_ (0)
- , major_version_ (0)
- , minor_version_ (0)
- , more_fragments_ (0)
- , request_id_ (0)
- , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- , next_ (0)
- , allocator_ (alloc)
+ : msg_block_ (mb),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
- : msg_block_ (qd.msg_block_->duplicate ())
- , current_state_ (qd.current_state_)
- , missing_data_bytes_ (qd.missing_data_bytes_)
- , byte_order_ (qd.byte_order_)
- , major_version_ (qd.major_version_)
- , minor_version_ (qd.minor_version_)
- , more_fragments_ (qd.more_fragments_)
- , request_id_ (qd.request_id_)
- , msg_type_ (qd.msg_type_)
- , next_ (0)
- , allocator_ (qd.allocator_)
+ : msg_block_ (qd.msg_block_->duplicate ()),
+ missing_data_ (qd.missing_data_),
+ byte_order_ (qd.byte_order_),
+ major_version_ (qd.major_version_),
+ minor_version_ (qd.minor_version_),
+ more_fragments_ (qd.more_fragments_),
+ msg_type_ (qd.msg_type_),
+ next_ (0),
+ allocator_ (qd.allocator_)
{
}
-
-/*!
- \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb.
-
- This function allocates a new aligned message block using the same
- allocators and flags as found in \a mb. The size of the new message
- block is at least \a new_size; the size may be adjusted up in order
- to accomodate alignment requirements and still fit \a new_size bytes
- into the aligned buffer.
-
- \param mb message block whose parameters should be mimicked
- \param new_size size of the new message block (will be adjusted for proper alignment)
- \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure
-
- \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block!
- */
-static ACE_Message_Block*
-clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
-{
- // Calculate the required size of the cloned block with alignment
- size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocators
- ACE_Allocator *data_allocator;
- ACE_Allocator *data_block_allocator;
- ACE_Allocator *message_block_allocator;
- mb->access_allocators (data_allocator,
- data_block_allocator,
- message_block_allocator);
-
- // Create a new Message Block
- ACE_Message_Block *nb;
- ACE_NEW_MALLOC_RETURN (nb,
- ACE_static_cast(ACE_Message_Block*,
- message_block_allocator->malloc (
- sizeof (ACE_Message_Block))),
- ACE_Message_Block(aligned_size,
- mb->msg_type(),
- mb->cont(),
- 0, //we want the data block created
- data_allocator,
- mb->locking_strategy(),
- mb->msg_priority(),
- mb->msg_execution_time (),
- mb->msg_deadline_time (),
- data_block_allocator,
- message_block_allocator),
- 0);
-
- ACE_CDR::mb_align (nb);
-
- // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
- // we just dynamically allocated the two things.
- nb->set_flags (mb->flags());
- nb->clr_flags (ACE_Message_Block::DONT_DELETE);
-
- return nb;
-}
-
-/*!
- \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes.
-
- (This is similar to memcpy, although with message blocks we can be a
- little smarter.) This function assumes that \a dst has enough space
- for \a span_size bytes, and that \a src has at least \a span_size
- bytes available to copy. When everything is copied \a dst->wr_ptr
- gets updated accordingly, but \a src->rd_ptr is left to the caller
- to update.
-
- \param dst the destination message block
- \param src the source message block
- \param span_size size of the maximum span of bytes to be copied
- \return 0 on failure, otherwise \a dst
- */
-static ACE_Message_Block*
-copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size)
-{
- // @todo check for enough space in dst, and src contains at least span_size
-
- if (src == 0 || dst == 0)
- return 0;
-
- if (span_size == 0)
- return dst;
-
- dst->copy (src->rd_ptr (), span_size);
- return dst;
-}
-
/*static*/
TAO_Queued_Data *
-TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc)
-{
- register TAO_Queued_Data *new_qd = 0;
- register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */
- register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */
-
- // Validate arguments.
- if (mb == 0)
- goto failure;
-
- new_qd = make_queued_data (alloc);
- if (new_qd == 0)
- goto failure;
-
- // do we have enough bytes to make a complete header?
- if (MB_LEN >= HDR_LEN)
- {
- // Since we have enough bytes to make a complete header,
- // the header needs to be valid. Check that now, and punt
- // if it's not valid.
- if (! msging_obj.check_for_valid_header (*mb))
- {
- goto failure;
- }
- else
- {
- new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
- msging_obj.set_queued_data_from_message_header (new_qd, *mb);
- if (new_qd->current_state_ == INVALID)
- goto failure;
-
- // missing_data_bytes_ now has the full GIOP message size, so we allocate
- // a new message block of that size, plus the header.
- new_qd->msg_block_ = clone_mb_nocopy_size (mb,
- new_qd->missing_data_bytes_ +
- HDR_LEN);
- // Of course, we don't have the whole message (if we did, we
- // wouldn't be here!), so we copy only what we've got, i.e., whatever's
- // in the message block.
- if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
- goto failure;
-
- // missing_data_bytes_ now has the full GIOP message size, but
- // there might still be stuff in mb. Therefore, we have to adjust
- // missing_data_bytes_, i.e., decrease it by the number of "actual
- // payload bytes" in mb.
- //
- // "actual payload bytes" :== length of mb (which included the header) - header length
- new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN);
- mb->rd_ptr (MB_LEN);
- }
- }
- else
- {
- new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER;
- new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN);
- if (new_qd->msg_block_ == 0 ||
- copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
- goto failure;
- new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN;
- mb->rd_ptr (MB_LEN);
- }
-
- ACE_ASSERT (new_qd->current_state_ != INVALID);
- if (TAO_debug_level > 7)
- {
- const char* s = "?unk?";
- switch (new_qd->current_state_)
- {
- case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break;
- case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break;
- case INVALID: s = "INVALID"; break;
- case COMPLETED: s = "COMPLETED"; break;
- }
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
- ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:")
- ACE_TEXT ("state=%s,missing_data_bytes=%u\n"),
- new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_));
- }
- return new_qd;
-
-failure:
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
- ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"),
- mb, new_qd));
- }
- TAO_Queued_Data::release (new_qd);
- return 0;
-}
-
-
-/*static*/
-TAO_Queued_Data *
-TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc)
-{
- register const size_t HDR_LEN = msging_obj.header_length ();
- register const size_t MB_LEN = mb.length ();
-
- // Validate arguments.
- if (MB_LEN < HDR_LEN)
- return 0;
-
- size_t total_msg_len = 0;
- register TAO_Queued_Data *new_qd = make_queued_data (alloc);
- if (new_qd == 0)
- goto failure;
-
- // We can assume that there are enough bytes for a header, so
- // extract the header data. Don't assume that there's enough for
- // the payload just yet.
- new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
- msging_obj.set_queued_data_from_message_header (new_qd, mb);
- if (new_qd->current_state_ == INVALID)
- goto failure;
-
- // new_qd_->missing_data_bytes_ + protocol header length should be
- // *at least* the length of the message. Verify that we have that
- // many bytes in the message block and, if we don't, release the new
- // qd and fail.
- total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN;
- if (total_msg_len > MB_LEN)
- goto failure;
-
- // Make a copy of the relevant portion of mb and hang on to it
- if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0)
- goto failure;
-
- if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0)
- goto failure;
-
- // Update missing data and the current state
- new_qd->missing_data_bytes_ = 0;
- new_qd->current_state_ = COMPLETED;
-
- // Advance the rd_ptr on the message block
- mb.rd_ptr (total_msg_len);
-
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
- ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"),
- total_msg_len, &mb, new_qd));
- }
-
- return new_qd;
-
-failure:
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
- ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"),
- &mb, MB_LEN));
- if (TAO_debug_level >= 10)
- ACE_HEX_DUMP ((LM_DEBUG,
- mb.rd_ptr (), MB_LEN,
- ACE_TEXT (" residual bytes in buffer")));
-
- }
- TAO_Queued_Data::release (new_qd);
- return 0;
-}
-
-/*static*/
-TAO_Queued_Data *
-TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
+TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
{
TAO_Queued_Data *qd = 0;
@@ -602,29 +281,3 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
return qd;
}
-
-void
-TAO_Queued_Data::consolidate (void)
-{
- // Is this a chain of fragments?
- if (this->more_fragments_ && this->msg_block_->cont () != 0)
- {
- // Create a message block big enough to hold the entire chain
- ACE_Message_Block *dest = clone_mb_nocopy_size (
- this->msg_block_,
- this->msg_block_->total_length ());
- // Reset the cont() parameter
- dest->cont (0);
-
- // Use ACE_CDR to consolidate the chain for us
- ACE_CDR::consolidate (dest, this->msg_block_);
-
- // free the original message block chain
- this->msg_block_->release ();
-
- // Set the message block to the new consolidated message block
- this->msg_block_ = dest;
- this->more_fragments_ = 0;
- }
-}
-
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 660a207090b..3adfc3d24ac 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -27,7 +27,6 @@ class ACE_Allocator;
class TAO_ORB_Core;
class TAO_Queued_Data;
class TAO_Transport;
-class TAO_Pluggable_Messaging;
/**
* @class TAO_Incoming_Message_Queue
@@ -76,68 +75,31 @@ public:
/// Return the length of the queue..
CORBA::ULong queue_length (void);
- /*!
- @name Node Inspection Predicates
-
- \brief These methods allow inspection of head and tail nodes for "completeness".
-
- These methods check to see whether the node on the head or tail is
- "complete" and ready for further processing. See each method's
- documentation for its definition of "complete".
- */
- //@{
- /*!
- "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though)
-
- \return -1 queue is empty
- \return 0 tail is not "complete"
- \return 1 tail is "complete"
- */
+ /// Methods for sanity check. Checks to see whether the node on the
+ /// head or tail is complete or not and ready for further
+ /// processing.
int is_tail_complete (void);
-
- /*!
-
- "complete" == the GIOP message at the head is not missing any data
- AND, if it's the first message in a series of GIOP fragments, all
- the fragments have been received, parsed, and placed into the
- queue
-
- \return -1 if queue is empty
- \return 0 if head is not "complete"
- \return 1 if head is "complete"
- */
int is_head_complete (void);
- //@}
- /*!
- \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment.
- */
+ /// This method checks whether the last message that was queued up
+ /// was fragmented...
int is_tail_fragmented (void);
/// Return the size of data that is missing in tail of the queue.
size_t missing_data_tail (void) const;
/// void missing_data (size_t data);
- /// Find the first fragment that matches the GIOP version
- TAO_Queued_Data *find_fragment (CORBA::Octet major,
- CORBA::Octet minor) const;
-
- /// Find the first fragment that matches the request id
- TAO_Queued_Data *find_fragment (CORBA::ULong request_id) const;
-
private:
friend class TAO_Transport;
+ /// Make a node for the queue.
+ TAO_Queued_Data *get_node (void);
+
private:
- /*!
- \brief A circular linked list of messages awaiting processing.
- \a last_message_added_ points to the most recent message added to
- the list. The earliest addition can be easily accessed via
- \a last_message_added_->next_.
- */
- TAO_Queued_Data *last_added_;
+ /// A linked listof messages that await processing
+ TAO_Queued_Data *queued_data_;
/// The size of the queue
CORBA::ULong size_;
@@ -161,73 +123,20 @@ private:
class TAO_Export TAO_Queued_Data
{
-protected:
+public:
/// Default Constructor
TAO_Queued_Data (ACE_Allocator *alloc = 0);
/// Constructor.
TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
-public:
/// Copy constructor.
TAO_Queued_Data (const TAO_Queued_Data &qd);
- /*!
- \name Factory Methods
-
- These methods manufacture instances of TAO_Queued_Data and return
- them. These instances should be removed via TAO_Queued_Data::release.
-
- Instances are initialized from data in the ACE_Message_Block,
- interpreted according to rules defined in the
- TAO_Pluggable_Messaging object.
-
- The manufactured instance adopts the message block \em without
- duplicating it; therefore, the caller must duplicate or orphan the
- message block. The caller also must insure that the message block
- can be released via ACE_Message_Block::release, and that its life
- endures beyond the calling scope.
-
- For the purposes of TAO_Queued_Data, a completed message is a
- completely received message as defined by the messaging protocol
- object. For GIOP, that means that the number of bytes specified
- in the general GIOP header have been completely received. It
- specifically DOES NOT mean that all \em fragments have been
- received. Fragment reassembly is another matter altogether.
- */
- //@{
- /*!
- \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message.
- */
- static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc = 0);
- /*!
- \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message.
- */
- // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE!
- // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN
- // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS
- // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT
- // BEHAVES DIFFERENTLY FROM make_uncompleted_message.
- static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc = 0);
-
- /// Consolidate this fragments chained message blocks into one.
- void consolidate (void);
-
- /*!
- \brief Creation and deletion of a node in the queue.
- \todo Maybe this should be private?
- */
-private:
- static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0);
-public:
- //@}
+ /// Creation and deletion of a node in the queue.
+ static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0);
static void release (TAO_Queued_Data *qd);
- void release (void);
/// Duplicate ourselves. This creates a copy of ourselves on the
/// heap and returns a pointer to the duplicated node.
@@ -237,43 +146,11 @@ public:
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
- /*!
- @name Missing Data details
-
- The \a missing_data_bytes_ member contains the number of bytes of
- data missing from \a msg_block_. However, there can be two places
- where data is missing: header and payload. We cannot know how
- much data is missing from the payload until we have a complete
- header. Fortunately, headers are a fixed length, so we can know
- how much we're missing from the header.
-
- We use \param current_state_ to indicate which portion of the message
- \param missing_data_bytes_ refers to, as well as the general state of
- the message.
- */
- //@{
- /*!
- Describes the meaning given to the number stored in \a missing_data_bytes_.
- */
- enum Queued_Data_State
- {
- INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted.
- COMPLETED = 0, //!< Message is complete; \a missing_data_bytes_ should be zero.
- WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_bytes_ indicates part of header is missing.
- WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_bytes_ indicates part of payload is missing.
- };
-
- /*!
- Indicates the current state of the message, including hints at
- how to interpret the value stored in \a missing_data_bytes_.
- */
- Queued_Data_State current_state_;
-
- /*! Data missing in the above message that hasn't been read or processed yet. */
- size_t missing_data_bytes_;
- //@}
-
- /*! The byte order of the message that is stored in the node. */
+ /// Data missing in the above message that hasn't been read or
+ /// processed yet.
+ CORBA::Long missing_data_;
+
+ /// The byte order of the message that is stored in the node..
CORBA::Octet byte_order_;
/// Many protocols like GIOP have a major and minor version
@@ -288,9 +165,6 @@ public:
/// queue already has more fragments that is missing..
CORBA::Octet more_fragments_;
- /// The fragment request id
- CORBA::ULong request_id_;
-
/// The message type of the message
TAO_Pluggable_Message_Type msg_type_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index f82267b5cea..d67bd485383 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->last_added_->missing_data_bytes_ == 0)
+ this->queued_data_->missing_data_ == 0)
return 1;
return 0;
@@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->last_added_->next_->missing_data_bytes_ == 0 &&
- this->last_added_->next_->more_fragments_ == 0)
+ this->queued_data_->next_->missing_data_ == 0 &&
+ this->queued_data_->next_->more_fragments_ == 0)
return 1;
return 0;
@@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void)
return 0;
if (this->size_ &&
- this->last_added_->more_fragments_ == 1)
+ this->queued_data_->more_fragments_ == 1)
return 1;
return 0;
@@ -55,22 +55,23 @@ ACE_INLINE size_t
TAO_Incoming_Message_Queue::missing_data_tail (void) const
{
if (this->size_ != 0)
- return this->last_added_->missing_data_bytes_;
+ return this->queued_data_->missing_data_;
return 0;
}
-/************************************************************************/
-// Methods for TAO_Queued_Data
-/************************************************************************/
-ACE_INLINE void
-TAO_Queued_Data::release (void)
+ACE_INLINE TAO_Queued_Data *
+TAO_Incoming_Message_Queue::get_node (void)
{
- TAO_Queued_Data::release (this);
+ return TAO_Queued_Data::get_queued_data ();
}
+/************************************************************************/
+// Methods for TAO_Queued_Data
+/************************************************************************/
+
/*static*/
ACE_INLINE void
TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb)
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 3e84635d767..8ac38de01e0 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -121,30 +121,36 @@ public:
virtual void init (CORBA::Octet major,
CORBA::Octet minor) = 0;
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
+
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0;
+
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd) = 0;
+
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
+ */
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd) = 0;
+
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming) = 0;
+
+ /// @@Bala:Docu??
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd) = 0;
+
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd) = 0;
- /*!
- \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
-
- Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
- see if they look like the beginning of a message. Does
- */
- virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0;
-
- /*!
- \brief Set fields in \param qd based on values derived from \param mb.
-
- This function sets fields in \param qd based on values derived
- from \param mb. It assumes that if the length of \param mb is
- enough to hold a header, then the data in there can be trusted to
- make sense.
- */
- virtual void set_queued_data_from_message_header (
- TAO_Queued_Data *,
- const ACE_Message_Block &mb) const = 0;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index aa5a569892b..d4edd4c8c80 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -88,19 +88,12 @@ TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
for (int i = 0; i < iovcnt; i++)
bytes_to_send += iov[i].iov_len;
- ssize_t n = this->connection_handler_->dgram ().send (iov,
- iovcnt,
- addr);
+ this->connection_handler_->dgram ().send (iov,
+ iovcnt,
+ addr);
// @@ Michael:
// Always return a positive number of bytes sent, as we do
// not handle sending errors in DIOP.
- if (n == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) Send of %d bytes failed %p\n"),
- bytes_to_send,
- ACE_TEXT ("send_i ()\n")));
- }
bytes_transferred = bytes_to_send;
@@ -198,7 +191,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.wr_ptr (),
+ ssize_t n = this->recv (message_block.rd_ptr (),
message_block.space (),
max_wait_time);
@@ -206,7 +199,6 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
if (n <= 0)
{
if (n == -1)
- // @@ Why not send_connection_closed_notifications() ?
this->tms_->connection_closed ();
return n;
@@ -215,43 +207,23 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- // Check the incoming message for validity. The check needs to be
+ // Parse the incoming message for validity. The check needs to be
// performed by the messaging objects.
- if (this->messaging_object ()->check_for_valid_header (message_block) == 0)
- {
- if (TAO_debug_level)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"),
- this->id (),
- ACE_TEXT ("handle_input_i ()\n")));
- }
-
- return -1;
- }
+ if (this->parse_incoming_messages (message_block) == -1)
+ return -1;
// NOTE: We are not performing any queueing nor any checking for
- // missing data. We are assuming that ALL the data arrives in a
+ // missing data. We are assuming that ALL the data would be got in a
// single read.
// Make a node of the message block..
- //
- // We could make this more efficient by having a fixed Queued Data
- // allocator, i.e., it always gave back the same thing. Actually,
- // we *could* create an allocator that took a stack-allocated object
- // as an argument and returned that when asked an allocation is
- // done. Something to contemplate...
- TAO_Queued_Data* qd =
- TAO_Queued_Data::make_completed_message (message_block,
- *this->messaging_object ());
- int retval = -1;
- if (qd)
- {
- // Process the message
- retval = this->process_parsed_messages (qd, rh);
- TAO_Queued_Data::release (qd);
- }
- return retval;
+ TAO_Queued_Data qd (&message_block);
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Process the message
+ return this->process_parsed_messages (&qd, rh);
}
@@ -338,175 +310,4 @@ TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
-// @@ Frank: Hopefully DIOP doesn't need this
-/*
-int
-TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
-{
- CORBA::Boolean byte_order;
- if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
- return -1;
-
- cdr.reset_byte_order (ACE_static_cast(int,byte_order));
-
- DIOP::ListenPointList listen_list;
- if ((cdr >> listen_list) == 0)
- return -1;
-
- // As we have received a bidirectional information, set the flag to
- // 1
- this->bidirectional_flag (1);
- return this->connection_handler_->process_listen_point_list (listen_list);
-}
-*/
-
-
-
-// @@ Frank: Hopefully DIOP doesn't need this
-/*
-void
-TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
-{
-
- // Get a handle on to the acceptor registry
- TAO_Acceptor_Registry * ar =
- this->orb_core ()->acceptor_registry ();
-
-
- // Get the first acceptor in the registry
- TAO_AcceptorSetIterator acceptor = ar->begin ();
-
- DIOP::ListenPointList listen_point_list;
-
- for (;
- acceptor != ar->end ();
- acceptor++)
- {
- // Check whether it is a DIOP acceptor
- if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE)
- {
- this->get_listen_point (listen_point_list,
- *acceptor);
- }
- }
-
- // We have the ListenPointList at this point. Create a output CDR
- // stream at this point
- TAO_OutputCDR cdr;
-
- // Marshall the information into the stream
- if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0)
- || (cdr << listen_point_list) == 0)
- return;
-
- // Add this info in to the svc_list
- opdetails.service_context ().set_context (IOP::BI_DIR_DIOP,
- cdr);
-
- return;
-}
-
-
-int
-TAO_DIOP_Transport::get_listen_point (
- DIOP::ListenPointList &listen_point_list,
- TAO_Acceptor *acceptor)
-{
- TAO_DIOP_Acceptor *iiop_acceptor =
- ACE_dynamic_cast (TAO_DIOP_Acceptor *,
- acceptor );
-
- // Get the array of endpoints serviced by <iiop_acceptor>
- const ACE_INET_Addr *endpoint_addr =
- iiop_acceptor->endpoints ();
-
- // Get the count
- size_t count =
- iiop_acceptor->endpoint_count ();
-
- // Get the local address of the connection
- ACE_INET_Addr local_addr;
-
- if (this->connection_handler_->peer ().get_local_addr (local_addr)
- == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local host")
- ACE_TEXT (" address in set_bidir_context_info () \n")),
- -1);
- }
-
-
- // Note: Looks like there is no point in sending the list of
- // endpoints on interfaces on which this connection has not
- // been established. If this is wrong, please correct me.
- char *local_interface = 0;
-
- // Get the hostname for the local address
- if (iiop_acceptor->hostname (this->orb_core_,
- local_addr,
- local_interface) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local host")
- ACE_TEXT (" name \n")),
- -1);
- }
-
- ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *,
- endpoint_addr);
-
- for (size_t index = 0;
- index <= count;
- index++)
- {
- // Get the listen point on that acceptor if it has the same
- // interface on which this connection is established
- char *acceptor_interface = 0;
-
- if (iiop_acceptor->hostname (this->orb_core_,
- tmp_addr[index],
- acceptor_interface) == -1)
- continue;
-
- // @@ This is very bad for performance, but it is a one time
- // affair
- if (ACE_OS::strcmp (local_interface,
- acceptor_interface) == 0)
- {
- // We have the connection and the acceptor endpoint on the
- // same interface
- DIOP::ListenPoint point;
- point.host = CORBA::string_dup (local_interface);
- point.port = endpoint_addr[index].get_port_number ();
-
- // Get the count of the number of elements
- CORBA::ULong len = listen_point_list.length ();
-
- // Increase the length by 1
- listen_point_list.length (len + 1);
-
- // Add the new length to the list
- listen_point_list[len] = point;
- }
-
- // @@ This is bad....
- CORBA::string_free (acceptor_interface);
- }
-
- CORBA::string_free (local_interface);
- return 1;
-}
-*/
-
-#if 0
-TAO_Connection_Handler *
-TAO_DIOP_Transport::invalidate_event_handler_i (void)
-{
- TAO_Connection_Handler * eh = this->connection_handler_;
- this->connection_handler_ = 0;
- return eh;
-}
-#endif
-
#endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index c7f845eee81..aaebc6860cf 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -136,7 +136,6 @@ TAO_SHMIOP_Transport::recv (char *buf,
}
-#if 0
int
TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
ssize_t missing_data,
@@ -192,7 +191,6 @@ TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
// process that
return this->process_parsed_messages (&pqd, rh);
}
-#endif
int
TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h
index 8b54426aaa7..02c67c63116 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.h
+++ b/TAO/tao/Strategies/SHMIOP_Transport.h
@@ -82,14 +82,10 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
-#if 0
- // This no longer exists with the PMB-change flow. Not sure how to deal with that,
- // so for now we ditch the method and see if things work.
virtual int consolidate_message (ACE_Message_Block &incoming,
ssize_t missing_data,
TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time);
-#endif
//@}
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 1640644edf9..9923ff1f895 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -19,9 +19,7 @@
#include "Resume_Handle.h"
#include "Codeset_Manager.h"
#include "Codeset_Translator_Factory.h"
-#include "GIOP_Message_State.h"
#include "ace/OS_NS_sys_time.h"
-#include "ace/Message_Block.h"
#include "ace/Reactor.h"
@@ -112,15 +110,12 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, incoming_message_queue_ (orb_core)
- , uncompleted_message_ (0)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
, handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
, id_ ((size_t) this)
, purging_order_ (0)
- , recv_buffer_size_ (0)
- , sent_byte_count_ (0)
, char_translator_ (0)
, wchar_translator_ (0)
, tcs_set_ (0)
@@ -241,9 +236,13 @@ TAO_Transport::generate_request_header (
{
// codeset service context is only supposed to be sent in the first request
// on a particular connection.
- TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager();
- if (csm && this->first_request_)
- csm->generate_service_context( opdetails, *this );
+ if (this->first_request_)
+ {
+ this->orb_core ()->codeset_manager ()->generate_service_context (
+ opdetails,
+ *this
+ );
+ }
if (this->messaging_object ()->generate_request_header (opdetails,
spec,
@@ -605,12 +604,7 @@ int
TAO_Transport::schedule_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -626,12 +620,7 @@ int
TAO_Transport::cancel_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -750,9 +739,6 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
// no bytes are sent send() can only return 0 or -1
ACE_ASSERT (byte_count != 0);
- // Total no. of bytes sent for a send call
- this->sent_byte_count_ += byte_count;
-
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -776,10 +762,6 @@ TAO_Transport::drain_queue_i (void)
// We loop over all the elements in the queue ...
TAO_Queued_Message *i = this->head_;
- // reset the value so that the counting is done for each new send
- // call.
- this->sent_byte_count_ = 0;
-
while (i != 0)
{
// ... each element fills the iovector ...
@@ -838,14 +820,8 @@ TAO_Transport::drain_queue_i (void)
if (this->flush_timer_pending ())
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- }
+ ACE_Reactor *reactor = eh->reactor ();
+ reactor->cancel_timer (this->flush_timer_id_);
this->reset_flush_timer ();
}
@@ -922,25 +898,20 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
if (set_timer)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- this->current_deadline_ = new_deadline;
- ACE_Time_Value delay =
- new_deadline - ACE_OS::gettimeofday ();
+ ACE_Reactor *reactor = eh->reactor ();
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
- if (this->flush_timer_pending ())
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- this->flush_timer_id_ =
- reactor->schedule_timer (&this->transport_timer_,
- &this->current_deadline_,
- delay);
- }
+ if (this->flush_timer_pending ())
+ {
+ reactor->cancel_timer (this->flush_timer_id_);
}
+
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
}
return constraints_reached;
@@ -1147,18 +1118,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
return 0;
}
-
-class CTHack
-{
-public:
- CTHack() { enter(); }
- ~CTHack() { leave(); }
-private:
- void enter() { x = 1; }
- void leave() { x = 0; }
- int x;
-};
-
/*
*
* All the methods relevant to the incoming data path of the ORB are
@@ -1170,8 +1129,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
int /*block*/)
{
- CTHack cthack;
-
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1179,8 +1136,9 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
this->id ()));
}
- // First try to process messages off the head of the incoming queue.
+ // First try to process messages of the head of the incoming queue.
int retval = this->process_queue_head (rh);
+
if (retval <= 0)
{
if (retval == -1)
@@ -1191,6 +1149,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
"error while parsing the head of the queue\n",
this->id()));
}
+
return retval;
}
@@ -1199,7 +1158,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// The buffer on the stack which will be used to hold the input
// messages
- char buf[TAO_MAXBUFSIZE];
+ char buf [TAO_MAXBUFSIZE];
#if defined (ACE_HAS_PURIFY)
(void) ACE_OS::memset (buf,
@@ -1221,35 +1180,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
- // We'll loop trying to complete the message this number of times,
- // and that's it.
- unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
-
- unsigned int did_queue_message = 0;
// Align the message block
ACE_CDR::mb_align (&message_block);
size_t recv_size = 0;
+
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- recv_size = message_block.space ();
+ recv_size =
+ message_block.space ();
}
else
{
- recv_size = this->messaging_object ()->header_length ();
+ recv_size =
+ this->messaging_object ()->header_length ();
}
- // Saving the size of the received buffer in case any one needs to
- // get the size of the message thats received in the
- // context. Obviously the value will be changed for each recv call
- // and the user is supposed to invoke the accessor only in the
- // invocation context to get meaningful information.
- this->recv_buffer_size_ = recv_size;
-
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.wr_ptr (),
+ ssize_t n = this->recv (message_block.rd_ptr (),
recv_size,
max_wait_time);
@@ -1262,7 +1212,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i: "
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
"read %d bytes\n",
this->id (), n));
}
@@ -1270,372 +1220,172 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- if (TAO_debug_level >= 10)
- ACE_HEX_DUMP ((LM_DEBUG,
- (const char *) message_block.rd_ptr (),
- message_block.length (),
- ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
-
+ // Parse the message and try consolidating the message if
+ // needed.
+ retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
-complete_message_and_possibly_enqueue:
- // Check to see if we're still working to complete a message
- if (this->uncompleted_message_)
+ if (retval <= 0)
{
- // try to complete it
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "error while parsing and consolidating\n",
+ this->id ()));
+ }
+ return retval;
+ }
- // on exit from this frame we have one of the following states:
- //
- // (a) an uncompleted message still in uncompleted_message_
- // AND message_block is empty
- //
- // (b) uncompleted_message_ zero, the completed message at the
- // tail of the incoming queue; message_block could be empty
- // or still contain bytes
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block,
+ this->orb_core_->transport_message_buffer_allocator ());
- // ==> repeat
- do
- {
- /*
- * Append the "right number of bytes" to uncompleted_message_
- */
- // ==> right_number_of_bytes = MIN(bytes missing from
- // uncompleted_message_, length of message_block);
- size_t right_number_of_bytes =
- ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
- message_block.length () );
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "trying to use %u (of %u) "
- "bytes to complete message missing %u bytes\n",
- this->id (),
- right_number_of_bytes,
- message_block.length (),
- this->uncompleted_message_->missing_data_bytes_));
- }
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd =
+ TAO_Queued_Data::duplicate (qd);
- // ==> append right_number_of_bytes from message_block
- // to uncomplete_message_ & update read pointer of
- // message_block;
-
- // 1. we assume that uncompleted_message_.msg_block_'s
- // wr_ptr is properly maintained
- // 2. we presume that uncompleted_message_.msg_block was
- // allocated with enough space to contain the *entire*
- // expected GIOP message, so this copy shouldn't involve an
- // additional allocation
- this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
- right_number_of_bytes);
- this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
- message_block.rd_ptr (right_number_of_bytes);
-
- switch (this->uncompleted_message_->current_state_)
- {
- case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
- {
- int hdrvalidity = this->messaging_object()->check_for_valid_header (
- *this->uncompleted_message_->msg_block_);
- if (hdrvalidity == 0)
- {
- // According to the spec, Section 15.4.8, we should send
- // the MessageError GIOP message on receipt of "any message...whose
- // header is not properly formed (e.g., has the wrong magic value)".
- //
- // So, rather than returning -1, what we REALLY need to do is
- // send a MessageError in reply.
- //
- // I'm not sure what the best way to trigger that is...probably to
- // queue up a special internal-only COMPLETED message that, when
- // processed, sends the MessageError as part of its processing.
- return -1;
- }
- else if (hdrvalidity == 1)
- {
- // ==> update bytes missing from uncompleted_message_
- // with size of message from valid header;
- this->messaging_object()->set_queued_data_from_message_header (
- this->uncompleted_message_,
- *this->uncompleted_message_->msg_block_);
- // ==> change state of uncompleted_event_ to
- // WAITING_TO_COMPLETE_PAYLOAD;
- this->uncompleted_message_->current_state_ =
- TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
-
- // ==> Resize the message block to have capacity for
- // the rest of the incoming message
- ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
- ACE_CDR::grow (&mb,
- mb.size ()
- + this->uncompleted_message_->missing_data_bytes_);
-
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "found a valid header in the message; "
- "waiting for %u bytes to complete payload\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
-
- // Continue the loop...
- continue;
- }
- // In the case where we don't have enough information (hdrvalidity == -1),
- // we just have to fall through and collect more.
-#if 0
- else
- {
- // What the heck will we do with a bad header? Just
- // better to close the connection and let things
- // re-train from there.
- if (this->uncompleted_message_->msg_block_->length () ==
- this->messaging_object()->header_length())
- return -1;
-
-#if 0 // I don't think I need this clause, but I'm leaving it just in case.
- // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes;
- this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
- ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0);
-#endif
- }
-#endif
- }
- break;
-
- case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
- // Here we have an opportunity to try to finish reading the
- // uncompleted message. This is a Good Idea(TM) because there are
- // good odds that either more data came available since the last
- // time we read, or that we simply didn't read the whole message on
- // the first read. So, we try to read again.
- //
- // NOTE! this changes this->uncompleted_message_!
- this->try_to_complete (max_wait_time);
+ return this->consolidate_fragments (nqd, rh);
+ }
- // ==> if (bytes missing from uncompleted_message_ == 0)
- if (this->uncompleted_message_->missing_data_bytes_ == 0)
- {
- /*
- * We completed the message! Hooray!
- */
- // ==> place uncompleted_message_ (which is now
- // complete!) at the tail of the incoming message
- // queue;
-
- // ---> NOTE: whoever pulls this off the queue must delete it!
- this->uncompleted_message_->current_state_
- = TAO_Queued_Data::COMPLETED;
-
- // @@CJC NEED TO CHECK RETURN VALUE HERE!
- this->enqueue_incoming_message (this->uncompleted_message_);
- did_queue_message = 1;
- // zero out uncompleted_message_;
- this->uncompleted_message_ = 0;
-
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "completed and queued message for processing!\n",
- this->id ()));
- }
+ // Process the message
+ return this->process_parsed_messages (&qd,
+ rh);
+}
- }
- else
- {
+int
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (block) == -1)
+ {
+ return -1;
+ }
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "still need %u bytes to complete uncompleted message.\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
- }
- break;
+ // Check whether we have a complete message for processing
+ ssize_t missing_data = this->missing_data (block);
- default:
- // @@CJC What do we do here?!
- ACE_ASSERT (! "Transport::handle_input_i: unexpected state"
- "in uncompleted_message_");
- }
- }
- // Does the order of the checks matter? In both (a) and (b),
- // message_block is empty, but only in (b) is there no
- // uncompleted_message_.
- // ==> until (message_block is empty || there is no uncompleted_message_);
- // or, rewritten in C++ looping constructs
- // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
- while (message_block.length() != 0 && this->uncompleted_message_);
- }
-
- // *****************************
- // @@ CJC
- //
- // Once upon a time we tried to complete reading the uncompleted
- // message here, but testing found that completing later worked
- // better.
- // *****************************
-
-
- // At this point, there should be nothing in uncompleted_message_.
- // We now need to chop up the bytes in message_block and store any
- // complete messages in the incoming message queue.
- //
- // ==> if (message_block still has data)
- if (message_block.length () != 0)
- {
- TAO_Queued_Data *complete_message = 0;
- do
- {
- if (TAO_debug_level >= 10)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ")
- ACE_TEXT("extracting complete messages\n")));
- ACE_HEX_DUMP ((LM_DEBUG,
- message_block.rd_ptr (),
- message_block.length (),
- ACE_TEXT (" from this message buffer")));
- }
- complete_message =
- TAO_Queued_Data::make_completed_message (
- message_block, *this->messaging_object ());
- if (complete_message)
- {
- this->enqueue_incoming_message (complete_message);
- did_queue_message = 1;
- }
- }
- while (complete_message != 0);
- // On exit from this frame we have one of the following states:
- // (a) message_block is empty
- // (b) message_block contains bytes from a partial message
- }
-
- // If, at this point, there's still data in message_block, it's
- // an incomplete message. Therefore, we stuff it into the
- // uncompleted_message_ and clear out message_block.
- // ==> if (message_block still has data)
- if (message_block.length () != 0)
- {
- // duplicate message_block remainder into this->uncompleted_message_
- ACE_ASSERT (this->uncompleted_message_ == 0);
- this->uncompleted_message_ =
- TAO_Queued_Data::make_uncompleted_message (&message_block,
- *this->messaging_object ());
- ACE_ASSERT (this->uncompleted_message_ != 0);
-
- // In a debug build, we won't reach this point if we couldn't
- // create an uncompleted message because the above ASSERT will
- // trip. However, in an optimized build, the ASSERT isn't
- // there, so we'll go past here.
- //
- // We could put a check in here similar to the ASSERT condition,
- // but doing that would terminate this loop early and result in
- // our never processing any completed messages that were received
- // in this trip to handle_input_i.
- //
- // Maybe we could instead queue up a special completed message that,
- // when processed, causes the connection to get closed in a non-graceful
- // termination scenario.
- }
-
- // We should have consumed ALL the bytes by now.
- ACE_ASSERT (message_block.length () == 0);
-
- //
- // We don't want to try to re-read earlier because we may not have
- // an uncompleted message until we get to this point. So, if we did
- // it earlier, we could have missed the opportunity to complete it
- // and dispatch.
- //
- // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
- // to increase throughput!
-
- if (this->uncompleted_message_)
- {
- if (number_of_read_attempts--)
- {
- // We try to read again just in case more data arrived while
- // we were doing the stuff above. This way, we can increase
- // throughput without much of a penalty.
+ if (missing_data < 0)
+ {
+ // If we have more than one message
+ return this->consolidate_extra_messages (block,
+ rh);
+ }
+ else if (missing_data > 0)
+ {
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "still have an uncompleted message; "
- "will try %d more times before letting "
- "somebody else have a chance.\n",
- this->id (),
- number_of_read_attempts));
- }
+ return 1;
+}
- // We only bother trying to complete payload, not header, because the
- // retry only happens in the complete-the-payload clause above.
- if (this->uncompleted_message_->current_state_ ==
- TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD)
- goto complete_message_and_possibly_enqueue;
- }
- else
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (this->incoming_message_queue_.is_tail_complete () != 0)
+ {
+ // As it looks like a new message has been read, process the
+ // message. Call the messaging object to do the parsing..
+ int retval =
+ this->messaging_object ()->parse_incoming_messages (block);
+
+ if (retval == -1)
{
- // The queue should be empty because it should have been processed
- // above. But I wonder if I should put a check in here anyway.
if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "giving up reading for now and returning "
- "with incoming queue length = %d\n",
- this->id (),
- this->incoming_message_queue_.queue_length ()));
- if (this->uncompleted_message_)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "missing bytes from uncompleted message = %u\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
- return 1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
+ "error in incoming message\n",
+ this->id ()));
+
+ return -1;
}
}
- // **** END CJC PMG CHANGES ****
+ return 0;
+}
+
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message.
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->incoming_message_queue_.missing_data_tail ();
+ }
- return did_queue_message ? this->process_queue_head (rh) : 1;
+ return this->messaging_object ()->missing_data (incoming);
}
-void
-TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- if (this->uncompleted_message_ == 0)
- return;
+ // Check whether the last message in the queue is complete..
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
+ this->id ()));
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.size ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
ssize_t n = 0;
- size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
- ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
- // Try to complete this until we error or block right here...
- for (ssize_t bytes = missing_data;
- bytes != 0;
- bytes -= n)
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (ssize_t bytes = missing_data; bytes != 0; bytes -= n)
{
// .. do a read on the socket again.
- n = this->recv (mb.wr_ptr (),
+ n = this->recv (incoming.wr_ptr (),
bytes,
max_wait_time);
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
"read %d bytes on attempt\n",
this->id(), n));
}
@@ -1645,168 +1395,375 @@ TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
break;
}
- mb.wr_ptr (n);
+ incoming.wr_ptr (n);
missing_data -= n;
}
+
+ // If we got an error..
+ if (n == -1)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Trasport[%d]::consolidate_message, "
+ "error while trying to consolidate\n",
+ this->id ()));
+ }
+
+ return -1;
+ }
+
+ // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
+ // have to put the message in the queue anyway. So let us proceed
+ // to do that and return...
+
+ // Check to see if we have messages in queue or if we have missing
+ // data . AT this point we cannot have have semi-complete messages
+ // in the queue as they would have been taken care before. Put
+ // ourselves in the queue and then try processing one of the
+ // messages..
+ if ((missing_data > 0
+ ||this->incoming_message_queue_.queue_length ())
+ && this->incoming_message_queue_.is_tail_fragmented () == 0)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "queueing up the message\n",
+ this->id ()));
+ }
+
+ // Get a queued data
+ TAO_Queued_Data *qd =
+ this->make_queued_data (incoming);
+
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ if (this->incoming_message_queue_.is_head_complete ())
+ {
+ return this->process_queue_head (rh);
+ }
+
+ return 0;
+ }
+
+ // We dont have any missing data. Just make a queued_data node with
+ // the existing message block and send it to the higher layers of
+ // the ORB.
+ TAO_Queued_Data pqd (&incoming,
+ this->orb_core_->transport_message_buffer_allocator ());
+ pqd.missing_data_ = missing_data;
+ this->messaging_object ()->get_message_data (&pqd);
+
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (pqd.more_fragments_ ||
+ (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd,
+ rh);
}
+int
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh)
+{
+ // If we have received a fragment message then we have to
+ // consolidate <qd> with the last message in queue
+ // @@todo: this piece of logic follows GIOP a bit... Need to revisit
+ // if we have protocols other than GIOP
+
+ // @@todo: Fragments now have too much copying overhead. Need to get
+ // them out if we want to have some reasonable performance metrics
+ // in future.. Post 1.2 seems a nice time..
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ TAO_Queued_Data *tqd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ tqd->more_fragments_ = qd->more_fragments_;
+ tqd->missing_data_ = qd->missing_data_;
+
+ if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1)
+ {
+ return -1;
+ }
+
+ TAO_Queued_Data::release (qd);
+ this->incoming_message_queue_.enqueue_tail (tqd);
+ this->process_queue_head (rh);
+ }
+ else
+ {
+ // if we dont have a fragment already in the queue just add it in
+ // the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ return 0;
+}
int
-TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- // Get the GIOP version
- CORBA::Octet major = queueable_message->major_version_;
- CORBA::Octet minor = queueable_message->minor_version_;
- CORBA::UShort whole = major << 8 | minor;
-
- // Set up a couple of pointers that are shared by the code
- // for the different GIOP versions.
- ACE_Message_Block *mb = 0;
- TAO_Queued_Data *fragment_message = 0;
-
- switch(whole)
- {
- case 0x0100: // GIOP 1.0
- if (!queueable_message->more_fragments_)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- // Fragments aren't supported in 1.0. This is an error and
- // we should reject it somehow. What do we do here? Do we throw
- // an exception to the receiving side? Do we throw an exception
- // to the sending side?
- //
- // At the very least, we need to log the fact that we received
- // nonsense.
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "detected a fragmented GIOP 1.0 message\n"),
- -1);
- break;
- case 0x0101: // GIOP 1.1
- // In 1.1, fragments kinda suck because they don't have they're
- // own message-specific header. Therefore, we have to do the
- // following:
- fragment_message =
- this->incoming_message_queue_.find_fragment (major, minor);
-
- // No fragment was found
- if (fragment_message == 0)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- if (queueable_message->more_fragments_)
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
+ this->id ()));
+ }
+
+ // If the queue did not have a complete message put this piece of
+ // message in the queue. We know it did not have a complete
+ // message. That is why we are here.
+ size_t n =
+ this->incoming_message_queue_.copy_tail (incoming);
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "copied [%d] bytes to the tail\n",
+ this->id (),
+ n));
+ }
+
+ // Update the missing data...
+ missing_data =
+ this->incoming_message_queue_.missing_data_tail ();
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "missing [%d] bytes in the tail message\n",
+ this->id (),
+ missing_data));
+ }
+
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
+
+ // If we have some more information left in the message block..
+ if (incoming.length ())
+ {
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
+
+ // If there is an error return
+ if (retval == -1)
{
- // Find the last message block in the continuation
- mb = fragment_message->msg_block_;
- while (mb->cont () != 0)
- mb = mb->cont ();
-
- // Add the current message block to the end of the chain
- // after adjusting the read pointer to skip the GIOP header
- queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN);
- mb->cont (queueable_message->msg_block_);
-
- // Get rid of the queuable message but save the message block
- queueable_message->msg_block_ = 0;
- queueable_message->release ();
-
- // One note is that TAO_Queued_Data contains version numbers,
- // but doesn't indicate the actual protocol to which those
- // version numbers refer. That's not a problem, though, because
- // instances of TAO_Queued_Data live in a queue, and that queue
- // lives in a particular instance of a Transport, and the
- // transport instance has an association with a particular
- // messaging_object. The concrete messaging object embodies a
- // messaging protocol, and must cover all versions of that
- // protocol. Therefore, we just need to cover the bases of all
- // versions of that one protocol.
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "error while consolidating, part of the read message\n",
+ this->id ()));
+ }
+ return retval;
}
- else
+ else if (retval == 1)
{
- // There is a complete chain of fragments
- fragment_message->consolidate ();
+ // If the message in the <incoming> message block has only
+ // one message left we need to process that seperately.
+
+ // Get a queued data
+ TAO_Queued_Data *qd = this->make_queued_data (incoming);
+
+ // Get the rest of the message data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add the missing data to the queue
+ qd->missing_data_ = 0;
- // Go ahead and enqueue this message
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (qd->more_fragments_ ||
+ (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ return this->consolidate_fragments (qd, rh);
+ }
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // We should surely have a message in queue now. So just
+ // process that.
+ return this->process_queue_head (rh);
}
- break;
- case 0x0102: // GIOP 1.2
- // In 1.2, we get a little more context. There's a
- // FRAGMENT message-specific header, and inside that is the
- // request id with which the fragment is associated.
- fragment_message =
- this->incoming_message_queue_.find_fragment (
- queueable_message->request_id_);
-
- // No fragment was found
- if (fragment_message == 0)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- if (fragment_message->major_version_ != major ||
- fragment_message->minor_version_ != minor)
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "GIOP versions do not match "
- "(%d.%d != %d.%d\n",
- fragment_message->major_version_,
- fragment_message->minor_version_,
- major, minor),
- -1);
-
- // Find the last message block in the continuation
- mb = fragment_message->msg_block_;
- while (mb->cont () != 0)
- mb = mb->cont ();
-
- // Add the current message block to the end of the chain
- // after adjusting the read pointer to skip the GIOP header
- queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN +
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
- mb->cont (queueable_message->msg_block_);
-
- // Remove our reference to the message block. At this point
- // the message block of the fragment head owns it as part of a
- // chain
- queueable_message->msg_block_ = 0;
-
- if (!queueable_message->more_fragments_)
+
+ // parse_consolidate_messages () would have processed one of the
+ // messages, so we better return as we dont want to starve other
+ // threads.
+ return 0;
+ }
+
+ // If we still have some missing data..
+ if (missing_data > 0)
+ {
+ // Get the last message from the Queue
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "trying recv, again\n",
+ this->id ()));
+ }
+
+ // Try to do a read again. If we have some luck it would be
+ // great..
+ ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "recv retval [%d]\n",
+ this->id (),
+ n));
+ }
+
+ // Error...
+ if (n < 0)
+ {
+ return n;
+ }
+
+ // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
+ // message in queue before returning..
+ // Move the write pointer
+ qd->msg_block_->wr_ptr (n);
+
+ // Decrement the missing data
+ qd->missing_data_ -= n;
+
+ // Now put the TAO_Queued_Data back in the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // Any way as we have come this far and are about to return,
+ // just try to process a message if it is there in the queue.
+ if (this->incoming_message_queue_.is_head_complete ())
{
- // This is the end of the fragments for this request
- fragment_message->consolidate ();
+ return this->process_queue_head (rh);
}
- // Get rid of the queuable message
- queueable_message->release ();
- break;
- default:
- if (!queueable_message->more_fragments_)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
- // This is an unknown GIOP version
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "can not handle a fragmented GIOP %d.%d "
- "message\n", major, minor),
- -1);
+ return 0;
}
- return 0;
+ // Process a message in the head of the queue if we have one..
+ return this->process_queue_head (rh);
}
int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block
+ &incoming,
+ TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
+ this->id ()));
+ }
+
+ // Pick the tail of the queue
+ TAO_Queued_Data *tail =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (tail)
+ {
+ // If we have a node in the tail, checek to see whether it needs
+ // consolidation. If so, just consolidate it.
+ if (this->messaging_object ()->consolidate_node (tail, incoming) == -1)
+ {
+ return -1;
+ }
+
+ // .. put the tail back in queue..
+ this->incoming_message_queue_.enqueue_tail (tail);
+ }
+
+ int retval = 1;
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
+ "extracting extra messages\n",
+ this->id ()));
+ }
+
+ // Extract messages..
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
+
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ {
+ // If we have read a framented message then...
+ if (q_data->more_fragments_ ||
+ q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ this->consolidate_fragments (q_data, rh);
+ }
+ else
+ {
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+ }
+ }
+
+ // In case of error return..
+ if (retval == -1)
+ {
+ return retval;
+ }
+
+ return this->process_queue_head (rh);
+}
+
+int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh)
{
// Get the <message_type> that we have received
TAO_Pluggable_Message_Type t = qd->msg_type_;
+ // int result = 0;
+
if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
{
if (TAO_debug_level > 0)
@@ -1871,69 +1828,126 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
-int
-TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+TAO_Queued_Data *
+TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
{
- if (TAO_debug_level > 3)
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data (
+ this->orb_core_->transport_message_buffer_allocator ());
+
+ // Get the flag for the details of the data block...
+ ACE_Message_Block::Message_Flags flg =
+ incoming.self_flags ();
+
+ if (ACE_BIT_DISABLED (flg,
+ ACE_Message_Block::DONT_DELETE))
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
- this->id ()));
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
}
+ else
+ {
+ // As we are in CORBA mode, all the data blocks would be aligned
+ // on an 8 byte boundary. Hence create a data block for more
+ // than the actual length
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (incoming.length ()+
+ ACE_CDR::MAX_ALIGNMENT);
- if (this->incoming_message_queue_.is_head_complete () != 1)
- return 1;
+ // Get the allocator..
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
- // Get the message on the head of the queue..
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_head ();
+ // Make message block..
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ // Duplicate the block..
+ qd->msg_block_ = mb.duplicate ();
+
+ // Align the message block
+ ACE_CDR::mb_align (qd->msg_block_);
+
+ // Copy the data..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ }
+
+
+ return qd;
+}
+int
+TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+{
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "the size of the queue is [%d]\n",
- this->id (),
- this->incoming_message_queue_.queue_length()));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
+ this->id ()));
}
- // Now that we have pulled out out one message out of the queue,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.queue_length () > 0)
+
+ // See if the message in the head of the queue is complete...
+ if (this->incoming_message_queue_.is_head_complete () > 0)
{
- if (TAO_debug_level > 0)
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
+
+ if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "notify reactor\n",
- this->id ()));
+ "the size of the queue is [%d]\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
+ }
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.is_head_complete () > 0)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "notify reactor\n",
+ this->id ()));
+ }
+ int retval =
+ this->notify_reactor ();
+
+ if (retval == 1)
+ {
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ }
+ else if (retval < 0)
+ return -1;
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
}
- int retval =
- this->notify_reactor ();
- if (retval == 1)
+ // Process the message...
+ if (this->process_parsed_messages (qd, rh) == -1)
{
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ return -1;
}
- else if (retval < 0)
- return -1;
- }
- else
- {
- // As we are ready to process the last message just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
- }
- // Process the message...
- int retval = this->process_parsed_messages (qd, rh);
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
- // Delete the Queued_Data..
- TAO_Queued_Data::release (qd);
+ return 0;
+ }
- return (retval == -1) ? -1 : 0;
+ return 1;
}
int
@@ -1945,8 +1959,6 @@ TAO_Transport::notify_reactor (void)
}
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
// Get the reactor associated with the event handler
ACE_Reactor *reactor = this->orb_core ()->reactor ();
@@ -1983,18 +1995,6 @@ TAO_Transport::transport_cache_manager (void)
return this->orb_core_->lane_resources ().transport_cache ();
}
-size_t
-TAO_Transport::recv_buffer_size (void)
-{
- return this->recv_buffer_size_;
-}
-
-size_t
-TAO_Transport::sent_byte_count (void)
-{
- return this->sent_byte_count_;
-}
-
void
TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
{
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 4b885eb40e5..b6059b211fe 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -447,6 +447,7 @@ public:
virtual ACE_Event_Handler * event_handler_i (void) = 0;
protected:
+
virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
public:
@@ -488,7 +489,6 @@ public:
virtual int handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time = 0,
int block = 0);
- void try_to_complete (ACE_Time_Value *max_wait_time);
enum
{
@@ -568,11 +568,60 @@ public:
protected:
+ /// Called by the handle_input_i (). This method is used to parse
+ /// message read by the handle_input_i () call. It also decides
+ /// whether the message needs consolidation before processing.
+ int parse_consolidate_messages (ACE_Message_Block &bl,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *time = 0);
+
+
+ /// Method does parsing of the message if we have a fresh message in
+ /// the <message_block> or just returns if we have read part of the
+ /// previously stored message.
+ int parse_incoming_messages (ACE_Message_Block &message_block);
+
+ /// Return if we have any missing data in the queue of messages
+ /// or determine if we have more information left out in the
+ /// presently read message to make it complete.
+ size_t missing_data (ACE_Message_Block &message_block);
+
+ /// Consolidate the currently read message or consolidate the last
+ /// message in the queue. The consolidation of the last message in
+ /// the queue is done by calling consolidate_message_queue ().
+ virtual int consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
+
+ /// @@Bala: Docu???
+ int consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh);
+
+ /// First consolidate the message queue. If the message is still not
+ /// complete, try to read from the handle again to make it
+ /// complete. If these dont help put the message back in the queue
+ /// and try to check the queue if we have message to process. (the
+ /// thread needs to do some work anyway :-))
+ int consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
+
+ /// Called by parse_consolidate_message () if we have more messages
+ /// in one read. Queue up the messages and try to process one of
+ /// them, atleast at the head of them.
+ int consolidate_extra_messages (ACE_Message_Block &incoming,
+ TAO_Resume_Handle &rh);
+
/// Process the message by sending it to the higher layers of the
/// ORB.
int process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh);
+ /// Make a queued data from the <incoming> message block
+ TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming);
+
/// Implement send_message_shared() assuming the handler_lock_ is
/// held.
int send_message_shared_i (TAO_Stub *stub,
@@ -620,40 +669,6 @@ public:
int handle_timeout (const ACE_Time_Value &current_time,
const void* act);
- /// Accessor to recv_buffer_size_
- size_t recv_buffer_size (void);
-
- /// Accessor to sent_byte_count_
- size_t sent_byte_count (void);
-
-
- /*!
- \name Incoming Queue Methods
- */
- //@{
- /*!
- \brief Queue up \a queueable_message as a completely-received incoming message.
-
- This method queues up a completely-received queueable GIOP message
- (i.e., it must be dynamically-allocated). It does not assemble a
- complete GIOP message; that should be done prior to calling this
- message, and is currently done in handle_input_i.
-
- This does, however, assure that a completely-received GIOP
- FRAGMENT gets associated with any previously-received related
- fragments. It does this through collaboration with the messaging
- object (since fragment reassembly is protocol specific).
-
- \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received
-
- \return 0 successfully enqueued \a queueable_message
-
- \return -1 failed to enqueue \a queueable_message
- \todo How do we indicate \em what may have failed?
- */
- int enqueue_incoming_message (TAO_Queued_Data *queueable_message);
- //@}
-
/// CodeSet Negotiation - Get the char codeset translator factory
///
TAO_Codeset_Translator_Factory *char_translator (void) const;
@@ -777,14 +792,10 @@ private:
/// Print out error messages if the event handler is not valid
void report_invalid_event_handler (const char *caller);
- /**
+ /*
* Process the message that is in the head of the incoming queue.
* If there are more messages in the queue, this method calls
- * this->notify_reactor () to wake up a thread.
- *
- * \return -1 An error occurred; occurs independent presence of messages in the queue.
- * \return 1 No messages in the queue to process; nothing processed.
- * \return 0 Messages were in the queue to process and one got processed.
+ * this->notify_reactor () to wake up a thread
*/
int process_queue_head (TAO_Resume_Handle &rh);
@@ -845,12 +856,9 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
- /// Queue of the completely-received incoming messages..
+ /// Queue of the incoming messages..
TAO_Incoming_Message_Queue incoming_message_queue_;
- /// Place to hold a partially-received (waiting-to-be-completed) message
- TAO_Queued_Data * uncompleted_message_;
-
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;
@@ -885,11 +893,8 @@ protected:
/// Used by the LRU, LFU and FIFO Connection Purging Strategies.
unsigned long purging_order_;
- /// Size of the buffer received.
- size_t recv_buffer_size_;
+private:
- /// Number of bytes sent.
- size_t sent_byte_count_;
/// @@Phil, I think it would be nice if we could think of a way to
/// do the following.
/// We have been trying to use the transport for marking about
@@ -902,7 +907,6 @@ protected:
/// we can move this to the connection_handler and it may more sense
/// with the DSCP stuff around there. Do you agree?
-private:
/// Additional member values required to support codeset translation
TAO_Codeset_Translator_Factory *char_translator_;
TAO_Codeset_Translator_Factory *wchar_translator_;