summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Cleeland <chris.cleeland@gmail.com>2003-03-18 18:46:51 +0000
committerChris Cleeland <chris.cleeland@gmail.com>2003-03-18 18:46:51 +0000
commitec2d0ee6d02253a14e88337b6575a3d679ae5f82 (patch)
tree034e1b26b680a589ec9f838b0c8a6402f50fefe3
parent8dd085b29748a224a8ca5b425cfcd80c497c63e5 (diff)
downloadATCD-ec2d0ee6d02253a14e88337b6575a3d679ae5f82.tar.gz
Code cleanup (removal of #if0 stuff), performance improvements, etc.
-rw-r--r--TAO/PMBChangeLog351
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp319
-rw-r--r--TAO/tao/GIOP_Message_Base.h27
-rw-r--r--TAO/tao/GIOP_Message_Base.i7
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp34
-rw-r--r--TAO/tao/GIOP_Message_Lite.h36
-rw-r--r--TAO/tao/GIOP_Message_State.cpp142
-rw-r--r--TAO/tao/GIOP_Message_State.h19
-rw-r--r--TAO/tao/GIOP_Message_State.inl21
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp222
-rw-r--r--TAO/tao/Incoming_Message_Queue.h15
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl18
-rw-r--r--TAO/tao/Pluggable_Messaging.h27
-rw-r--r--TAO/tao/Transport.cpp813
-rw-r--r--TAO/tao/Transport.h51
15 files changed, 435 insertions, 1667 deletions
diff --git a/TAO/PMBChangeLog b/TAO/PMBChangeLog
index 06d916b5f15..5cf756fe720 100644
--- a/TAO/PMBChangeLog
+++ b/TAO/PMBChangeLog
@@ -1,266 +1,343 @@
+Mon Mar 17 11:09:00 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/GIOP_Message_Base.cpp (check_for_valid_header): Optimized
+ implementation.
+
+ * tao/GIOP_Message_Base.* (header_length): Moved to inline for
+ optimization.
+
+ * tao/GIOP_Message_State.cpp (TAO_Debug_Msg_Emitter_Guard):
+ Improved implementation so it didn't make copies of the message
+ even when it was never going to output anything due to debug
+ level being lower than the value that would cause debug output.
+
+ * tao/GIOP_Message_State.cpp (take_values_from_message_block):
+ * tao/GIOP_Message_State.cpp (set_version_info_from_buffer):
+ * tao/GIOP_Message_State.cpp (set_byte_order_info_from_buffer):
+ * tao/GIOP_Message_State.cpp (set_payload_size_from_buffer):
+
+ Removed use of TAO_Debug_Msg_Emitter_Guard.
+
+ * tao/GIOP_Message_State.inl (set_payload_size_from_buffer):
+
+ Moved to inline file.
+
+ * tao/Incoming_Message_Queue.h (TAO_Incoming_Message_Queue):
+ Changed name of member 'queued_data_' to 'last_added_' to be
+ more clear about what the pointer actually points to.
+
+ Added documentation that describes the structure of the linked
+ list so that hopefully future maintainers don't have to go
+ through the same learning curve I did.
+
+ * tao/Incoming_Message_Queue.*: Updates to reflect last_added_
+ member name change.
+
+ * tao/Incoming_Message_Queue.cpp (dequeue_head): Add check to not
+ allow dequeuing when size is zero, plus we now zero-out the
+ last_added_ pointer after we've dequeued the last item in the
+ list.
+
+ * tao/Incoming_Message_Queue.cpp (clone_mb_nocopy_size): Fix
+ setting and copying of flags on the message and data blocks so
+ that the DONT_DELETE flag specifically doesn't get copied. This
+ cured a HUGE memory leak that was causing substantial
+ performance problems.
+
+ * tao/Incoming_Message_Queue.cpp (make_uncompleted_message):
+ * tao/Incoming_Message_Queue.cpp (make_completed_message):
+
+ Cache values used over and over again within these methods such
+ as header and message block lengths
+
+ * tao/Transport.cpp (handle_input_i):
+
+ Added flag noting that we enqueued a message.
+
+ Changed how re-reading is performed; see entry for
+ try_to_complete for information.
+
+ Used ACE_CDR::grow to grow the uncompleted_message message block
+ once the payload size is known. This insures that the growth
+ happens with proper alignment constraints.
+
+ * tao/Transport.* (try_to_complete): Added this method which
+ tries to complete whatever partial message is held in
+ uncompleted_message_. The difference between doing this and
+ simply revisiting the top of handle_input_i is that this will
+ only try to read enough to complete the message rather than also
+ read another partial.
+
+ * tao/Transport.cpp (process_queue_head): Added check to insure
+ that we don't try to do any processing when there's nothing in
+ the queue. Also, removed old code leftover from when the
+ incoming queue could have partial GIOP messages in it. Finally,
+ insured that the return value matched previous requirements.
+
+
Thu Mar 6 15:27:14 2003 Chris Cleeland <cleeland_c@ociweb.com>
* tao/Incoming_Message_Queue.cpp: Removed Shattering Encapsulation
- hack around ace/Message_Block.h inclusion to make this link
- properly on VC6.
+ hack around ace/Message_Block.h inclusion to make this link
+ properly on VC6.
* tao/GIOP_Message_State.cpp: Moved initialization of
- TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class
- declaration.
+ TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class
+ declaration.
Wed Mar 5 13:08:37 2003 Chris Cleeland <cleeland_c@ociweb.com>
* tao/GIOP_Message_Generator_Parser_Impl.inl (check_revision):
- Modified to only explicitly allow versions of GIOP that TAO
- supports, and no others. Note that this change means that
- whenever a new version of GIOP gets added to TAO, this function
- must be updated, too. Hopefully that won't happen so often that
- this becomes horribly burdensome. Thanks to Chad Elliott and Paul
- Calabrese for ideas in getting this right.
+ Modified to only explicitly allow versions of GIOP that TAO
+ supports, and no others. Note that this change means that
+ whenever a new version of GIOP gets added to TAO, this function
+ must be updated, too. Hopefully that won't happen so often that
+ this becomes horribly burdensome. Thanks to Chad Elliott and
+ Paul Calabrese for ideas in getting this right.
* tao/Incoming_Message_Queue.* (make_uncompleted_message):
- This now behaves similar to make_completed_message, i.e., it
- allocates the message block held in TAO_Queued_Data and advnaced
- the rd_ptr past the bytes it consumes.
+ This now behaves similar to make_completed_message, i.e., it
+ allocates the message block held in TAO_Queued_Data and advnaced
+ the rd_ptr past the bytes it consumes.
* tao/Incoming_Message_Queue.cpp: Eliminated hacks on
- ACE_Message_Block and replaced with functions
- clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the
- public API. make_completed_message() and
- make_uncompleted_message() now use these.
+ ACE_Message_Block and replaced with functions
+ clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the
+ public API. make_completed_message() and
+ make_uncompleted_message() now use these.
* tao/Transport.cpp (handle_input_i): Enlarged the buffer size and
- replaced the magic number for the maximum number of re-read
- attempts with the manifest constant
- TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h.
+ replaced the magic number for the maximum number of re-read
+ attempts with the manifest constant
+ TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h.
- Changed uses of make_uncompleted_message() to be consistent with
- changes outlined above.
+ Changed uses of make_uncompleted_message() to be consistent with
+ changes outlined above.
- Corrected a bug where, upon re-reading after completely emptying
- the stack-allocated message block, I forgot to reset the
- rd_ptr/wr_ptr in the message block.
+ Corrected a bug where, upon re-reading after completely emptying
+ the stack-allocated message block, I forgot to reset the
+ rd_ptr/wr_ptr in the message block.
* tao/orbconf.h: Added new manifest constant
- TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file.
+ TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file.
Wed Feb 26 21:44:34 2003 Chris Cleeland <cleeland_c@ociweb.com>
* TAO/tao/GIOP_Message_Base.cpp:
- Eliminate parse_incoming_messages(), missing_data(),
- extract_next_message(), consolidate_node(),
- consolidate_fragments(), get_message_data(), make_queued_data().
+ Eliminate parse_incoming_messages(), missing_data(),
+ extract_next_message(), consolidate_node(),
+ consolidate_fragments(), get_message_data(), make_queued_data().
- Add new methods set_queued_data_from_message_header() and
- check_for_valid_header().
+ Add new methods set_queued_data_from_message_header() and
+ check_for_valid_header().
* TAO/tao/GIOP_Message_Base.h:
- Eliminate #if0 parse_incoming_messages(), missing_data(),
- extract_next_message(), consolidate_node(),
- consolidate_fragments(), get_message_data(), and
- make_queued_data() declarations.
+ Eliminate #if0 parse_incoming_messages(), missing_data(),
+ extract_next_message(), consolidate_node(),
+ consolidate_fragments(), get_message_data(), and
+ make_queued_data() declarations.
- Add declarations for set_queued_data_from_message_header() and
- check_for_valid_header().
+ Add declarations for set_queued_data_from_message_header() and
+ check_for_valid_header().
- Make message_type() static.
+ Make message_type() static.
* TAO/tao/GIOP_Message_Generator_Parser_Impl.inl:
- Reimplement check_revision() to be exact about which GIOP
- revisions are okay. This is different from before where it
- loosely assumed that everything was valid unless it want higher
- than a particular range.
+ Reimplement check_revision() to be exact about which GIOP
+ revisions are okay. This is different from before where it
+ loosely assumed that everything was valid unless it want higher
+ than a particular range.
* TAO/tao/GIOP_Message_List.cpp:
- Remove parse_incoming_messages() thru consolidate_fragments();
- remove make_queued_data().
+ Remove parse_incoming_messages() thru consolidate_fragments();
+ remove make_queued_data().
- Add set_queued_data_from_message_header() and
- check_for_valid_header().
+ Add set_queued_data_from_message_header() and
+ check_for_valid_header().
* TAO/tao/GIOP_Message_Lite.h:
- Remove declarations for parse_incoming_messages() thru
- consolidate_fragments(); remove make_queued_data().
+ Remove declarations for parse_incoming_messages() thru
+ consolidate_fragments(); remove make_queued_data().
- Add declarations for set_queued_data_from_message_header() and
- check_for_valid_header().
+ Add declarations for set_queued_data_from_message_header() and
+ check_for_valid_header().
* TAO/tao/GIOP_Message_State.cpp:
- Add TAO_Debug_Message_Emitter_Guard class within the compilation
- unit.
+ Add TAO_Debug_Message_Emitter_Guard class within the compilation
+ unit.
- Change CTOR not to use the 2nd argument.
+ Change CTOR not to use the 2nd argument.
- Change CTOR not to initialize this->base_with the 2nd arg.
+ Change CTOR not to initialize this->base_with the 2nd arg.
- Add take_values_from_message_block().
+ Add take_values_from_message_block().
- Remove parse_magic_bytes(), parse_message_header_i(),
- parse_message_header().
+ Remove parse_magic_bytes(), parse_message_header_i(),
+ parse_message_header().
- Rename get_version_info() to set_version_info_from_buffer();
- change argument type to 'const char*'.
+ Rename get_version_info() to set_version_info_from_buffer();
+ change argument type to 'const char*'.
- Rename get_byte_order_info() to set_byte_order_info_from_buffer();
- change argument type to 'const char*'.
+ Rename get_byte_order_info() to set_byte_order_info_from_buffer();
+ change argument type to 'const char*'.
- Rename get_payload_size() to set_payload_size_from_buffer();
- change argument type to 'const char*'.
+ Rename get_payload_size() to set_payload_size_from_buffer();
+ change argument type to 'const char*'.
- Change argument type for read_ulong() to 'const char*'.
+ Change argument type for read_ulong() to 'const char*'.
* TAO/tao/GIOP_Message_State.h:
- Add default value for CTOR arguments so we don't have to pass
- them.
+ Add default value for CTOR arguments so we don't have to pass
+ them.
- Add declaration for take_values_from_message_block().
+ Add declaration for take_values_from_message_block().
- #if0 parse_message_header() declaration.
+ #if0 parse_message_header() declaration.
- Add documentation for byte_order().
+ Add documentation for byte_order().
- Add new GIOP information accessors: giop_version(),
- more_fragments(), and message_type().
+ Add new GIOP information accessors: giop_version(),
+ more_fragments(), and message_type().
- #if0 declaration for parse_message_header_i() and
- parse_magic_bytes().
+ #if0 declaration for parse_message_header_i() and
+ parse_magic_bytes().
- Rename delcarations for get_version_info(), get_byte_order_info(),
- and get_payload_size().
+ Rename delcarations for get_version_info(), get_byte_order_info(),
+ and get_payload_size().
- #if0 base_ member variable.
+ #if0 base_ member variable.
* TAO/tao/GIOP_Message_State.inl:
- Add definitions for giop_version(), more_fragments(), and
- message_type().
+ Add definitions for giop_version(), more_fragments(), and
+ message_type().
* TAO/tao/Incoming_Message_Queue.cpp:
- Add #include for ace/Message_Block.h, but shatter encapsulation
- and effectively extend its interface so that we can clone spans of
- message blocks.
+ Add #include for ace/Message_Block.h, but shatter encapsulation
+ and effectively extend its interface so that we can clone spans of
+ message blocks.
- Modify copy_tail() to use new TAO_Queued_Data member names.
+ Modify copy_tail() to use new TAO_Queued_Data member names.
* TAO/tao/Incoming_Message_Queue.cpp (class TAO_Queued_Data):
- Update CTORs to initialize new this->current_state_ data member.
+ Update CTORs to initialize new this->current_state_ data member.
- Add new make_uncompleted_message(), ACE_Message_Block hack
- "extensions", make_completed_message().
+ Add new make_uncompleted_message(), ACE_Message_Block hack
+ "extensions", make_completed_message().
- Rename get_queued_data() to make_queued_data().
+ Rename get_queued_data() to make_queued_data().
* TAO/tao/Incoming_Message_Queue.h:
- Changed #include <Pluggable_Messaging_Utils.h> to #include
- <Pluggable_Message.h>
+ Changed #include <Pluggable_Messaging_Utils.h> to #include
+ <Pluggable_Message.h>
- Documentation for is_tail_complete() and is_head_complete() and
- is_tail_fragmented().
+ Documentation for is_tail_complete() and is_head_complete() and
+ is_tail_fragmented().
- #if0 get_node().
+ #if0 get_node().
* TAO/tao/Incoming_Message_Queue.h (class TAO_Queued_Data):
- Make CTORs protected so creation can only go through new factory
- methods, i.e., make_completed_message() and
- make_uncompleted_message().
+ Make CTORs protected so creation can only go through new factory
+ methods, i.e., make_completed_message() and
+ make_uncompleted_message().
- Add new factory methods make_completed_message() and
- make_uncompleted_message().
+ Add new factory methods make_completed_message() and
+ make_uncompleted_message().
- Add release() method [in addition to static of same name].
+ Add release() method [in addition to static of same name].
- Add explicit definition of various states and a member to hold the
- current state (this->current_state_).
+ Add explicit definition of various states and a member to hold the
+ current state (this->current_state_).
- Rename this->missing_data_ flag/value with
- this->missing_data_bytes_ count.
+ Rename this->missing_data_ flag/value with
+ this->missing_data_bytes_ count.
- Documentation for this->byte_order_.
+ Documentation for this->byte_order_.
* TAO/tao/Incoming_Message_Queue.inl:
- Replace uses of this->missing_data_ with
- this->missing_data_bytes_.
+ Replace uses of this->missing_data_ with
+ this->missing_data_bytes_.
- #if0 get_node().
+ #if0 get_node().
- Add release() method.
+ Add release() method.
* TAO/tao/Pluggable_Message.h:
- #if0 parse_incoming_messages() through consolidate_fragments().
+ #if0 parse_incoming_messages() through consolidate_fragments().
- Add declarations for check_for_valid_header() and
- set_queued_data_from_message_header().
+ Add declarations for check_for_valid_header() and
+ set_queued_data_from_message_header().
* TAO/tao/Transport.cpp:
- Add #include <ace/Min_Max.h>
+ Add #include <ace/Min_Max.h>
- Add uncompleted_message_ to initialization list.
+ Add uncompleted_message_ to initialization list.
- Modify what happens when reading in handle_input_i():
+ Modify what happens when reading in handle_input_i():
- - number_of_read_attempts is how many times we'll try to read
- again to complete a message before we give up.
+ - number_of_read_attempts is how many times we'll try to read
+ again to complete a message before we give up.
- - read_from_the_connection label added
+ - read_from_the_connection label added
- - correct 1st argument to this->recv() to be wr_ptr() rather than
- rd_ptr().
+ - correct 1st argument to this->recv() to be wr_ptr() rather than
+ rd_ptr().
- - lots of changes that are substantially documented in the code.
+ - lots of changes that are substantially documented in the code.
- #if0 parse_consolidate_messages(), parse_incoming_messages(),
- missing_data(), consolidate_message(), consolidate_fragments(),
- consolidate_message_queue(), consolidate_extra_messages(), and
- make_queued_data().
+ #if0 parse_consolidate_messages(), parse_incoming_messages(),
+ missing_data(), consolidate_message(), consolidate_fragments(),
+ consolidate_message_queue(), consolidate_extra_messages(), and
+ make_queued_data().
* TAO/tao/Transport.h:
- #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(),
- missing_data(), consolidate_message(), consolidate_fragments(),
- consolidate_message_queue(), consolidate_extra_messages(), and
- make_queued_data().
+ #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(),
+ missing_data(), consolidate_message(), consolidate_fragments(),
+ consolidate_message_queue(), consolidate_extra_messages(), and
+ make_queued_data().
- Add this->uncompleted_message_ data member.
+ Add this->uncompleted_message_ data member.
* TAO/tao/Strategies/DIOP_Transport.cpp:
- Change 1st arg of this->recv() in handle_input_i() to wr_ptr()
- from rd_ptr().
+ Change 1st arg of this->recv() in handle_input_i() to wr_ptr()
+ from rd_ptr().
* TAO/tao/Strategies/SHMIOP_Transport.cpp:
- #if0 consolidate_message().
+ #if0 consolidate_message().
* TAO/tao/Strategies/SHMIOP_Transport.h:
- #if0 declaration for consolidate_message().
+ #if0 declaration for consolidate_message().
* TAO/tests/AMI/run_test.pl:
- Increase default number of iterations from 1 to 10000. Earlier
- versions of the PMB changes succeeded just fine with low numbers
- of iterations, but began to fail miserably when the number of
- iterations climbed.
+ Increase default number of iterations from 1 to 10000. Earlier
+ versions of the PMB changes succeeded just fine with low numbers
+ of iterations, but began to fail miserably when the number of
+ iterations climbed.
* TAO/tests/BiDirectional/run_test.pl:
- Added level 10 debug.
+ Added level 10 debug.
* TAO/tests/InterOp-Naming/INS_test_client.cpp:
- Changes that don't appear to be related in any way to PMB...
+ Changes that don't appear to be related in any way to PMB...
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 1e3d72512d8..10f852d3656 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -309,266 +309,6 @@ TAO_GIOP_Message_Base::message_type (
return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
}
-#if 0
-int
-TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
-{
-
- if (this->message_state_.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- return 0;
-}
-
-ssize_t
-TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
-{
- // Actual message size including the header..
- CORBA::ULong msg_size =
- this->message_state_.message_size ();
-
- size_t len = incoming.length ();
-
- // If we have too many messages or if we have less than even a size
- // of the GIOP header then ..
- if (len > msg_size ||
- len < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- return -1;
- }
- else if (len == msg_size)
- return 0;
-
- return msg_size - len;
-}
-
-
-int
-TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd)
-{
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- if (incoming.length () > 0)
- {
- // Make a node which has a message block of the size of
- // MESSAGE_HEADER_LEN.
- qd =
- this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- qd->missing_data_ = -1;
- }
- return 0;
- }
-
- if (state.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- size_t copying_len = state.message_size ();
-
- qd = this->make_queued_data (copying_len);
-
- if (copying_len > incoming.length ())
- {
- qd->missing_data_ =
- copying_len - incoming.length ();
-
- copying_len = incoming.length ();
- }
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- copying_len);
-
- incoming.rd_ptr (copying_len);
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- return 1;
-}
-
-int
-TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming)
-{
- // Look to see whether we had atleast parsed the GIOP header ...
- if (qd->missing_data_ == -1)
- {
- // The data length that has been stuck in there during the last
- // read ....
- size_t len =
- qd->msg_block_->length ();
-
- // We know that we would have space for
- // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
- // from the <incoming> into the message block in <qd>
- qd->msg_block_->copy (incoming.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- // Move the rd_ptr () in the incoming message block..
- incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- // Parse the message header now...
- if (state.parse_message_header (*qd->msg_block_) == -1)
- return -1;
-
- // Now grow the message block so that we can copy the rest of
- // the data...
- if (qd->msg_block_->space () < state.message_size ())
- {
- ACE_CDR::grow (qd->msg_block_,
- state.message_size ());
- }
-
- // Copy the pay load..
- // Calculate the bytes that needs to be copied in the queue...
- size_t copy_len =
- state.payload_size ();
-
- // If the data that needs to be copied is more than that is
- // available to us ..
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
- else
- {
- qd->missing_data_ = 0;
- }
-
- // ..now we are set to copy the right amount of data to the
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- incoming.rd_ptr (copy_len);
-
- // Get the other details...
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- }
- else
- {
- // @@todo: Need to abstract this out to a seperate method...
- size_t copy_len = qd->missing_data_;
-
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
-
- // Copy the right amount of data in to the node...
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- qd->msg_block_->rd_ptr (copy_len);
-
- }
-
-
- return 0;
-}
-
-
-int
-TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd)
-{
- if (dqd->byte_order_ != sqd->byte_order_
- || dqd->major_version_ != sqd->major_version_
- || dqd->minor_version_ != sqd->minor_version_)
- {
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
- ACE_TEXT ("different GIOP versions or byte order\n")));
- return -1;
- }
-
- // Skip the header in the incoming message
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // If we have a fragment header skip the header length too..
- if (sqd->minor_version_ == 2 &&
- sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
-
- // Get the length of the incoming message block..
- size_t incoming_length =
- sqd->msg_block_->length ();
-
- // Increase the size of the destination message block if we need
- // to.
- ACE_Message_Block *mb =
- dqd->msg_block_;
-
- // Check space before growing.
- if (mb->space () < incoming_length)
- {
- ACE_CDR::grow (mb,
- mb->length () + incoming_length);
- }
-
- // Copy the data
- dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
- incoming_length);
- return 0;
-}
-
-void
-TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
-{
- // Get the message information
- qd->byte_order_ =
- this->message_state_.byte_order_;
- qd->major_version_ =
- this->message_state_.giop_version_.major;
- qd->minor_version_ =
- this->message_state_.giop_version_.minor;
-
- //qd->more_fragments_ = this->message_state_.more_fragments_;
-
- if (this->message_state_.more_fragments_)
- qd->more_fragments_ = 1;
- else
- qd->more_fragments_ = 0;
-
- qd->msg_type_=
- this->message_type (this->message_state_);
-
- // Reset the message_state
- this->message_state_.reset ();
-}
-#endif
-
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -1534,52 +1274,6 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
-#if 0
-TAO_Queued_Data *
-TAO_GIOP_Message_Base::make_queued_data (size_t sz)
-{
- // Get a node for the queue..
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // @@todo: We have a similar method in Transport.cpp. Need to see how
- // we can factor them out..
- // Make a datablock for the size requested + something. The
- // "something" is required because we are going to align the data
- // block in the message block. During alignment we could loose some
- // bytes. As we may not know how many bytes will be lost, we will
- // allocate ACE_CDR::MAX_ALIGNMENT extra.
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (sz +
- ACE_CDR::MAX_ALIGNMENT);
-
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- ACE_Message_Block *new_mb = mb.duplicate ();
-
- ACE_CDR::mb_align (new_mb);
-
- qd->msg_block_ = new_mb;
-
-
- return qd;
-}
-#endif
-
-
-
-size_t
-TAO_GIOP_Message_Base::header_length (void) const
-{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
-}
-
void
TAO_GIOP_Message_Base::set_queued_data_from_message_header (
TAO_Queued_Data *qd,
@@ -1613,14 +1307,15 @@ TAO_GIOP_Message_Base::check_for_valid_header (
const ACE_Message_Block &mb
) const
{
- const char* magic_bytes = "GIOP";
- ACE_ASSERT (ACE_OS::strlen (magic_bytes) < header_length ());
- if (mb.length () < header_length ())
+ // NOTE! We don't hardcode the length of the header b/c header_length should
+ // be eligible for inlining by pretty much any compiler, and it should return
+ // a constant. The rest of this method is hard-coded and hand-optimized because
+ // this method gets called A LOT.
+ if (mb.length () < this->header_length ())
return 0;
// Is finding that it's the right length and the magic bytes present
// enough to declare it a valid header? I think so...
- return (ACE_OS::memcmp (mb.rd_ptr (),
- magic_bytes,
- ACE_OS::strlen (magic_bytes)) == 0);
+ register const char* h = mb.rd_ptr ();
+ return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P');
}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 41845f24e22..139e0d10334 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -93,33 +93,6 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
-#if 0
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block);
-
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &message_block);
-
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
- */
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd);
-
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming);
-
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd);
-
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
-#endif
-
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i
index a589447a413..25a644bc313 100644
--- a/TAO/tao/GIOP_Message_Base.i
+++ b/TAO/tao/GIOP_Message_Base.i
@@ -4,3 +4,10 @@
//
// GIOP_Message_Base
//
+
+ACE_INLINE size_t
+TAO_GIOP_Message_Base::header_length (void) const
+{
+ return TAO_GIOP_MESSAGE_HEADER_LEN;
+}
+
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index 5d923762331..5509ec8a81b 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -1598,40 +1598,6 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
}
}
-#if 0
-TAO_Queued_Data *
-TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
-{
- // Get a node for the queue..
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data ();
-
- // Make a datablock for the size requested + something. The
- // "something" is required because we are going to align the data
- // block in the message block. During alignment we could loose some
- // bytes. As we may not know how many bytes will be lost, we will
- // allocate ACE_CDR::MAX_ALIGNMENT extra.
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (sz +
- ACE_CDR::MAX_ALIGNMENT);
-
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- ACE_Message_Block *new_mb = mb.duplicate ();
-
- ACE_CDR::mb_align (new_mb);
-
- qd->msg_block_ = new_mb;
-
- return qd;
-}
-#endif
-
int
TAO_GIOP_Message_Lite::generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index 7affd27e6b8..c7abea1ba8d 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -85,42 +85,6 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
-#if 0
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block);
-
- /// Get the message type. The return value would be one of the
- /// following:
- /// TAO_PLUGGABLE_MESSAGE_REQUEST,
- /// TAO_PLUGGABLE_MESSAGE_REPLY,
- /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
- /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (void);
-
-
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &message_block);
-
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
- */
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd);
-
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming);
-
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd);
-
- /// @@Bala: Docu???
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
-#endif
-
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index ef6cd7ce057..2eb62611184 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -17,6 +17,11 @@ public:
TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg)
: which_level_(debug_level)
{
+ if (TAO_debug_level < this->which_level_)
+ {
+ msg_ = 0;
+ return;
+ }
this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ];
ACE_OS::strcpy (this->msg_, msg);
ACE_OS::strcat (this->msg_, " begin\n");
@@ -26,14 +31,17 @@ public:
~TAO_Debug_Msg_Emitter_Guard ()
{
- if (TAO_debug_level >= this->which_level_)
+ if (this->msg_)
{
- char* begin_start =
- this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1;
- ACE_OS::strcpy (begin_start, " end\n");
- ACE_DEBUG ((LM_DEBUG, this->msg_));
+ if (TAO_debug_level >= this->which_level_)
+ {
+ char* begin_start =
+ this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1;
+ ACE_OS::strcpy (begin_start, " end\n");
+ ACE_DEBUG ((LM_DEBUG, this->msg_));
+ }
+ delete[] this->msg_;
}
- delete[] this->msg_;
}
private:
@@ -68,8 +76,6 @@ TAO_GIOP_Message_State::take_values_from_message_block (
const ACE_Message_Block& mb
)
{
- TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::take_values_from_message_block");
-
const char* buf = mb.rd_ptr ();
// Get the version information
@@ -104,117 +110,9 @@ TAO_GIOP_Message_State::take_values_from_message_block (
return 0;
}
-#if 0
-int
-TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
-{
- if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // Parse the GIOP header
- if (this->parse_message_header_i (incoming) == -1)
- return -1;
- }
-
- return 0;
-}
-
-int
-TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
-{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n"
- ));
- }
-
- // Grab the rd_ptr_ from the message block..
- char *buf = incoming.rd_ptr ();
-
- // Parse the magic bytes first
- if (this->parse_magic_bytes (buf) == -1)
- {
- return -1;
- }
-
- // Get the version information
- if (this->get_version_info (buf) == -1)
- return -1;
-
- // Get the byte order information...
- if (this->get_byte_order_info (buf) == -1)
- return -1;
-
- // Get the message type
- this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
-
- // Get the size of the message..
- this->get_payload_size (buf);
-
- if (this->message_size_ == 0)
- {
- if (this->message_type_ == TAO_GIOP_MESSAGERROR)
- {
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) -"
- "GIOP_MESSAGE_ERROR received \n"));
- }
- return 0;
- }
- else
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - "
- "Message of size zero recd. \n"));
- return -1;
- }
- }
-
- if (this->more_fragments_)
- {
- (void) this->parse_fragment_header (buf,
- incoming.length ());
- }
-
- return 0;
-}
-
-
-
-
-int
-TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
-{
- // The values are hard-coded to support non-ASCII platforms.
- if (!(buf [0] == 0x47 // 'G'
- && buf [1] == 0x49 // 'I'
- && buf [2] == 0x4f // 'O'
- && buf [3] == 0x50)) // 'P'
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ")
- ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"),
- buf[0],
- buf[1],
- buf[2],
- buf[3]));
- return -1;
- }
-
- return 0;
-}
-#endif
-
int
TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
{
- TAO_Debug_Msg_Emitter_Guard (8, ACE_TEXT("TAO (%P|%t) GIOP_Message_State::set_version_info"));
-
// We have a GIOP message on hand. Get its revision numbers
CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
@@ -246,8 +144,6 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
int
TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
{
- TAO_Debug_Msg_Emitter_Guard (8, "TAO (%P|%t) GIOP_Message_State::set_byte_order_info_from_buffer");
-
// Let us be specific that this is for 1.0
if (this->giop_version_.minor == 0 &&
this->giop_version_.major == 1)
@@ -292,16 +188,6 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
return 0;
}
-void
-TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr)
-{
- TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::set_payload_size_from_buffer");
- // Move the read pointer
- rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
-
- this->message_size_ = this->read_ulong (rd_ptr);
-}
-
int
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index 319e37a3933..d6263022d0c 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -47,11 +47,6 @@ public:
int take_values_from_message_block (const ACE_Message_Block& mb);
-#if 0
- /// Parse the message header.
- int parse_message_header (ACE_Message_Block &incoming);
-#endif
-
/// Return the message size
CORBA::ULong message_size (void) const;
@@ -83,15 +78,6 @@ private:
friend class TAO_GIOP_Message_Base;
-#if 0
- /// Parse the message header.
- int parse_message_header_i (ACE_Message_Block &incoming);
-
- /// Checks for the magic word 'GIOP' in the start of the incoing
- /// stream
- int parse_magic_bytes (char *buf);
-#endif
-
/// Extracts the version information from the incoming
/// stream. Performs a check for whether the version information is
/// right and sets the information in the <state>
@@ -116,11 +102,6 @@ private:
private:
-#if 0
- /// The GIOP base class..
- TAO_GIOP_Message_Base *base_;
-#endif
-
// GIOP version information..
TAO_GIOP_Message_Version giop_version_;
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
index f2f0011a7f6..80d421c7340 100644
--- a/TAO/tao/GIOP_Message_State.inl
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -51,22 +51,11 @@ TAO_GIOP_Message_State::message_type () const
return this->message_type_;
}
-#if 0
-ACE_INLINE int
-TAO_GIOP_Message_State::message_fragmented (void)
+ACE_INLINE void
+TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr)
{
- if (this->more_fragments)
- return 1;
+ // Move the read pointer
+ rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- return 0;
+ this->message_size_ = this->read_ulong (rd_ptr);
}
-
-
-
-ACE_INLINE CORBA::Boolean
-TAO_GIOP_Message_State::header_received (void) const
-{
- return this->message_size != 0;
-}
-
-#endif
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 028491a8a09..19e29aaeff4 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -12,7 +12,7 @@ ACE_RCSID (tao, Incoming_Message_Queue, "$Id$")
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
- : queued_data_ (0),
+ : last_added_ (0),
size_ (0),
orb_core_ (orb_core)
{
@@ -42,21 +42,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if (block.length () <= this->queued_data_->missing_data_bytes_)
+ if (block.length () <= this->last_added_->missing_data_bytes_)
{
n = block.length ();
}
else
{
- n = this->queued_data_->missing_data_bytes_;
+ n = this->last_added_->missing_data_bytes_;
}
// Do the copy
- this->queued_data_->msg_block_->copy (block.rd_ptr (),
+ this->last_added_->msg_block_->copy (block.rd_ptr (),
n);
// Decerement the missing data
- this->queued_data_->missing_data_bytes_ -= n;
+ this->last_added_->missing_data_bytes_ -= n;
}
return n;
@@ -65,17 +65,20 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
TAO_Queued_Data *
TAO_Incoming_Message_Queue::dequeue_head (void)
{
+ if (this->size_ == 0)
+ return 0;
+
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head = this->last_added_->next_;
// Reset the head node..
- this->queued_data_->next_ = tmp->next_;
-
- // Decrease the size
- --this->size_;
+ this->last_added_->next_ = head->next_;
+
+ // Decrease the size and reset last_added_ if empty
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
- return tmp;
+ return head;
}
TAO_Queued_Data *
@@ -86,23 +89,24 @@ TAO_Incoming_Message_Queue::dequeue_tail (void)
return 0;
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head =
+ this->last_added_->next_;
- while (tmp->next_ != this->queued_data_)
+ while (head->next_ != this->last_added_)
{
- tmp = tmp->next_;
+ head = head->next_;
}
// Put the head in tmp.
- tmp->next_ = this->queued_data_->next_;
+ head->next_ = this->last_added_->next_;
- TAO_Queued_Data *ret_qd = this->queued_data_;
+ TAO_Queued_Data *ret_qd = this->last_added_;
- this->queued_data_ = tmp;
+ this->last_added_ = head;
// Decrease the size
- --this->size_;
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
return ret_qd;
}
@@ -113,14 +117,14 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
{
if (this->size_ == 0)
{
- this->queued_data_ = nd;
- this->queued_data_->next_ = this->queued_data_;
+ this->last_added_ = nd;
+ this->last_added_->next_ = this->last_added_;
}
else
{
- nd->next_ = this->queued_data_->next_;
- this->queued_data_->next_ = nd;
- this->queued_data_ = nd;
+ nd->next_ = this->last_added_->next_;
+ this->last_added_->next_ = nd;
+ this->last_added_ = nd;
}
++ this->size_;
@@ -226,10 +230,10 @@ clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
ACE_CDR::mb_align (nb);
- // Do whatever with the flags
+ // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
+ // we just dynamically allocated the two things.
nb->set_flags (mb->flags());
- nb->set_self_flags (mb->self_flags());
- // nb->clr_flags (mask);
+ nb->clr_flags (ACE_Message_Block::DONT_DELETE);
return nb;
}
@@ -270,8 +274,9 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
TAO_Pluggable_Messaging &msging_obj,
ACE_Allocator *alloc)
{
- TAO_Queued_Data *new_qd = 0;
- const size_t HDR_LEN = msging_obj.header_length ();
+ register TAO_Queued_Data *new_qd = 0;
+ register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */
+ register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */
// Validate arguments.
if (mb == 0)
@@ -282,7 +287,7 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
goto failure;
// do we have enough bytes to make a complete header?
- if (mb->length() >= msging_obj.header_length ())
+ if (MB_LEN >= HDR_LEN)
{
// Since we have enough bytes to make a complete header,
// the header needs to be valid. Check that now, and punt
@@ -304,7 +309,7 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
// Of course, we don't have the whole message (if we did, we
// wouldn't be here!), so we copy only what we've got, i.e., whatever's
// in the message block.
- if (copy_mb_span (new_qd->msg_block_, mb, mb->length ()) == 0)
+ if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
goto failure;
// missing_data_bytes_ now has the full GIOP message size, but
@@ -313,8 +318,8 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
// payload bytes" in mb.
//
// "actual payload bytes" :== length of mb (which included the header) - header length
- new_qd->missing_data_bytes_ -= (mb->length () - HDR_LEN);
- mb->rd_ptr (mb->length ());
+ new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN);
+ mb->rd_ptr (MB_LEN);
}
}
else
@@ -322,10 +327,10 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER;
new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN);
if (new_qd->msg_block_ == 0 ||
- copy_mb_span (new_qd->msg_block_, mb, mb->length ()) == 0)
+ copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
goto failure;
- new_qd->missing_data_bytes_ = msging_obj.header_length () - mb->length ();
- mb->rd_ptr (mb->length ());
+ new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN;
+ mb->rd_ptr (MB_LEN);
}
ACE_ASSERT (new_qd->current_state_ != INVALID);
@@ -359,130 +364,6 @@ failure:
return 0;
}
-#if 0
-/*!
- \brief Act like ACE_Message_Block::clone, but only clone the part btw. rd_ptr and wr_ptr.
- */
-// Follow #def's swiped from Message_Block.cpp
-#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
-#define ACE_EXECUTION_TIME this->execution_time_
-#define ACE_DEADLINE_TIME this->deadline_time_
-#else
-#define ACE_EXECUTION_TIME ACE_Time_Value::zero
-#define ACE_DEADLINE_TIME ACE_Time_Value::max_time
-#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
-
-static ACE_Data_Block*
-clone_span_nocopy (/*const*/ ACE_Data_Block *the_db,
- const char* beg, size_t span,
- ACE_Message_Block::Message_Flags mask = 0)
-{
- // Allocate a new data block through the same allocator as 'the_db'
- const ACE_Message_Block::Message_Flags always_clear =
- ACE_Message_Block::DONT_DELETE;
-
- ACE_Data_Block *nb;
- ACE_Allocator *db_allocator = the_db->data_block_allocator ();
-
- ACE_NEW_MALLOC_RETURN (nb,
- ACE_static_cast(ACE_Data_Block*,
- db_allocator->malloc (sizeof (ACE_Data_Block))),
- ACE_Data_Block (span, // size
- the_db->msg_type (), // type
- 0, // data
- the_db->allocator_strategy (), // allocator
- the_db->locking_strategy (), // locking strategy
- the_db->flags (), // flags
- db_allocator),
- 0);
-
-
- // Set new flags minus the mask...
- nb->clr_flags (mask | always_clear);
-
- // Copy in the data, and set the pointer
- // ACE_OS::memcpy (nb->base (), beg, span);
-
- return nb;
-}
-
-static ACE_Message_Block*
-clone_span (/*const*/ ACE_Message_Block *the_mb, size_t span_size, ACE_Message_Block::Message_Flags mask = 0)
-{
- // Get a pointer to a "cloned" <ACE_Data_Block> (will copy the
- // values rather than increment the reference count).
- size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
- ACE_Data_Block *db = clone_span_nocopy (the_mb->data_block (), the_mb->rd_ptr (), aligned_size, mask);
- if (db == 0)
- return 0;
-
- ACE_Message_Block *nb;
-
- if(the_mb->message_block_allocator_ == 0)
- {
- ACE_NEW_RETURN (nb,
- ACE_Message_Block (0, // size
- ACE_Message_Block::ACE_Message_Type (0), // type
- 0, // cont
- 0, // data
- 0, // allocator
- 0, // locking strategy
- 0, // flags
- the_mb->priority_, // priority
- ACE_EXECUTION_TIME, // execution time
- ACE_DEADLINE_TIME, // absolute time to deadline
- // Get a pointer to a
- // "duplicated" <ACE_Data_Block>
- // (will simply increment the
- // reference count).
- db,
- db->data_block_allocator (),
- the_mb->message_block_allocator_),
- 0);
- }
- else
- {
- // This is the ACE_NEW_MALLOC macro with the return check removed.
- // We need to do it this way because if it fails we need to release
- // the cloned data block that was created above. If we used
- // ACE_NEW_MALLOC_RETURN, there would be a memory leak because the
- // above db pointer would be left dangling.
- nb = ACE_static_cast(ACE_Message_Block*,the_mb->message_block_allocator_->malloc (sizeof (ACE_Message_Block)));
- if(nb != 0)
- new (nb) ACE_Message_Block (0, // size
- ACE_Message_Block::ACE_Message_Type (0), // type
- 0, // cont
- 0, // data
- 0, // allocator
- 0, // locking strategy
- 0, // flags
- the_mb->priority_, // priority
- ACE_EXECUTION_TIME, // execution time
- ACE_DEADLINE_TIME, // absolute time to deadline
- db,
- db->data_block_allocator (),
- the_mb->message_block_allocator_);
- }
-
- if (nb == 0)
- {
- db->release ();
- return 0;
- }
-
- ACE_CDR::mb_align (nb);
- nb->copy (the_mb->rd_ptr(), span_size);
-
- // Clone all the continuation messages if necessary.
- if (the_mb->cont () != 0
- && (nb->cont_ = the_mb->cont ()->clone (mask)) == 0)
- {
- nb->release ();
- return 0;
- }
- return nb;
-}
-#endif
/*static*/
TAO_Queued_Data *
@@ -490,18 +371,15 @@ TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
TAO_Pluggable_Messaging &msging_obj,
ACE_Allocator *alloc)
{
- // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE!
- // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN
- // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS
- // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT
- // BEHAVES DIFFERENTLY FROM make_uncompleted_message.
+ register const size_t HDR_LEN = msging_obj.header_length ();
+ register const size_t MB_LEN = mb.length ();
// Validate arguments.
- if (mb.length() < msging_obj.header_length ())
+ if (MB_LEN < HDR_LEN)
return 0;
size_t total_msg_len = 0;
- TAO_Queued_Data *new_qd = make_queued_data (alloc);
+ register TAO_Queued_Data *new_qd = make_queued_data (alloc);
if (new_qd == 0)
goto failure;
@@ -517,8 +395,8 @@ TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
// *at least* the length of the message. Verify that we have that
// many bytes in the message block and, if we don't, release the new
// qd and fail.
- total_msg_len = new_qd->missing_data_bytes_ + msging_obj.header_length ();
- if (total_msg_len > mb.length ())
+ total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN;
+ if (total_msg_len > MB_LEN)
goto failure;
// Make a copy of the relevant portion of mb and hang on to it
@@ -551,10 +429,10 @@ failure:
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"),
- &mb, mb.length ()));
+ &mb, MB_LEN));
if (TAO_debug_level >= 10)
ACE_HEX_DUMP ((LM_DEBUG,
- mb.rd_ptr (), mb.length (),
+ mb.rd_ptr (), MB_LEN,
ACE_TEXT (" residual bytes in buffer")));
}
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index d4a830e10ba..6733c382264 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -120,14 +120,15 @@ private:
friend class TAO_Transport;
-#if 0
- /// Make a node for the queue.
- TAO_Queued_Data *get_node (void);
-#endif
-
private:
- /// A linked listof messages that await processing
- TAO_Queued_Data *queued_data_;
+ /*!
+ \brief A circular linked list of messages awaiting processing.
+
+ \a last_message_added_ points to the most recent message added to
+ the list. The earliest addition can be easily accessed via
+ \a last_message_added_->next_.
+ */
+ TAO_Queued_Data *last_added_;
/// The size of the queue
CORBA::ULong size_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index d04512d81b0..f82267b5cea 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->missing_data_bytes_ == 0)
+ this->last_added_->missing_data_bytes_ == 0)
return 1;
return 0;
@@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->next_->missing_data_bytes_ == 0 &&
- this->queued_data_->next_->more_fragments_ == 0)
+ this->last_added_->next_->missing_data_bytes_ == 0 &&
+ this->last_added_->next_->more_fragments_ == 0)
return 1;
return 0;
@@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void)
return 0;
if (this->size_ &&
- this->queued_data_->more_fragments_ == 1)
+ this->last_added_->more_fragments_ == 1)
return 1;
return 0;
@@ -55,20 +55,12 @@ ACE_INLINE size_t
TAO_Incoming_Message_Queue::missing_data_tail (void) const
{
if (this->size_ != 0)
- return this->queued_data_->missing_data_bytes_;
+ return this->last_added_->missing_data_bytes_;
return 0;
}
-#if 0
-ACE_INLINE TAO_Queued_Data *
-TAO_Incoming_Message_Queue::get_node (void)
-{
- return TAO_Queued_Data::get_queued_data ();
-}
-#endif
-
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 2d460580d57..0d4acf4e485 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -113,33 +113,6 @@ public:
virtual void init (CORBA::Octet major,
CORBA::Octet minor) = 0;
-#if 0
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
-
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0;
-
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd) = 0;
-
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
- */
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd) = 0;
-
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming) = 0;
-
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd) = 0;
-#endif
-
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index af00665aaca..3a76879c67a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -1273,7 +1273,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
}
-
/*
*
* All the methods relevant to the incoming data path of the ORB are
@@ -1294,7 +1293,6 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// First try to process messages off the head of the incoming queue.
int retval = this->process_queue_head (rh);
-
if (retval <= 0)
{
if (retval == -1)
@@ -1315,8 +1313,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// The buffer on the stack which will be used to hold the input
// messages
- // char buf [TAO_MAXBUFSIZE];
- char buf[2 * TAO_MAXBUFSIZE];
+ char buf[TAO_MAXBUFSIZE];
#if defined (ACE_HAS_PURIFY)
(void) ACE_OS::memset (buf,
@@ -1342,10 +1339,11 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// and that's it.
unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
+ unsigned int did_queue_message = 0;
+
// Align the message block
ACE_CDR::mb_align (&message_block);
-read_from_the_connection:
size_t recv_size = 0;
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
@@ -1366,9 +1364,7 @@ read_from_the_connection:
if (n <= 0)
{
if (n == -1)
- {
- this->send_connection_closed_notifications ();
- }
+ this->send_connection_closed_notifications ();
return n;
}
@@ -1390,19 +1386,8 @@ read_from_the_connection:
message_block.length (),
ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
- // **** BEGIN CJC PMB CHANGES ****
- //
- // - Does this properly handle GIOP FRAGMENT messages??
- // @@ BALA: I doubt it.. You could get an incomplete message
- // fragment or a complete message fragment but the message could
- // incomplete.
-
- // @@ BALA: How do you handle messages that didnt fit the message
- // block here?
- //
- // @@ CJC: I'm not sure what you mean...If we need more space, then
- // we allocate more.
+complete_message_and_possibly_enqueue:
// Check to see if we're still working to complete a message
if (this->uncompleted_message_)
{
@@ -1475,8 +1460,9 @@ read_from_the_connection:
// ==> Resize the message block to have capacity for
// the rest of the incoming message
ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
- mb.size (mb.size ()
- + this->uncompleted_message_->missing_data_bytes_);
+ ACE_CDR::grow (&mb,
+ mb.size ()
+ + this->uncompleted_message_->missing_data_bytes_);
if (TAO_debug_level > 2)
{
@@ -1506,6 +1492,15 @@ read_from_the_connection:
break;
case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
+ // Here we have an opportunity to try to finish reading the
+ // uncompleted message. This is a Good Idea(TM) because there are
+ // good odds that either more data came available since the last
+ // time we read, or that we simply didn't read the whole message on
+ // the first read. So, we try to read again.
+ //
+ // NOTE! this changes this->uncompleted_message_!
+ this->try_to_complete (max_wait_time);
+
// ==> if (bytes missing from uncompleted_message_ == 0)
if (this->uncompleted_message_->missing_data_bytes_ == 0)
{
@@ -1519,8 +1514,10 @@ read_from_the_connection:
// ---> NOTE: whoever pulls this off the queue must delete it!
this->uncompleted_message_->current_state_
= TAO_Queued_Data::COMPLETED;
+
// @@CJC NEED TO CHECK RETURN VALUE HERE!
this->enqueue_incoming_message (this->uncompleted_message_);
+ did_queue_message = 1;
// zero out uncompleted_message_;
this->uncompleted_message_ = 0;
@@ -1535,6 +1532,7 @@ read_from_the_connection:
}
else
{
+
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1561,69 +1559,14 @@ read_from_the_connection:
while (message_block.length() != 0 && this->uncompleted_message_);
}
-#if !defined(LOOP_READ_OPTIMIZATION_LATE)
- // If, at the end, we're still waiting to complete the message,
- // we should effectively return.
- //
- // @@BALA: Returning here will reduce the throughput. We shold try
- // reading again to see if we could get more data..
+ // *****************************
+ // @@ CJC
//
- // @@CJC: Good point; maybe we can go to the top again, but only try
- // to read at most 2-3 times before we just go on. Obviously, if we
- // complete the message (i.e., there's no uncompleted message), then
- // we go on to the next step.
- if (this->uncompleted_message_)
- {
- if (number_of_read_attempts--)
- {
- // We try to read again just in case more data arrived while
- // we were doing the stuff above. This way, we can increase
- // throughput without much of a penalty.
-
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "still have an uncompleted message; "
- "will try %d more times before letting "
- "somebody else have a chance.\n",
- this->id (),
- number_of_read_attempts));
- }
+ // Once upon a time we tried to complete reading the uncompleted
+ // message here, but testing found that completing later worked
+ // better.
+ // *****************************
- // We're about to go back and try to read, but we need to insure
- // that there's space available in the message block to read!
- // We'll reset the message block...
- ACE_ASSERT (message_block.space() == 0);
- message_block.reset ();
-
- // Yes, this uses the much-maligned "goto"; get over it. TCP
- // implementations use them, too, for just the same sort of
- // situation.
- goto read_from_the_connection;
- }
- else
- {
- // The queue should be empty because it should have been processed
- // above. But I wonder if I should put a check in here anyway.
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "giving up reading for now and returning "
- "with incoming queue length = %d\n",
- this->id (),
- this->incoming_message_queue_.queue_length ()));
- if (this->uncompleted_message_)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "missing bytes from uncompleted message = %u\n",
- this->uncompleted_message_->missing_data_bytes_));
- }
- return 1;
- }
- }
-#endif
// At this point, there should be nothing in uncompleted_message_.
// We now need to chop up the bytes in message_block and store any
@@ -1645,11 +1588,15 @@ read_from_the_connection:
message_block.length (),
ACE_TEXT (" from this message buffer")));
}
+
complete_message =
TAO_Queued_Data::make_completed_message (
message_block, *this->messaging_object ());
if (complete_message)
- this->enqueue_incoming_message (complete_message);
+ {
+ this->enqueue_incoming_message (complete_message);
+ did_queue_message = 1;
+ }
}
while (complete_message != 0);
// On exit from this frame we have one of the following states:
@@ -1674,17 +1621,15 @@ read_from_the_connection:
// We should have consumed ALL the bytes by now.
ACE_ASSERT (message_block.length () == 0);
-#if defined(LOOP_READ_OPTIMIZATION_LATE)
- // If, at the end, we're still waiting to complete the message,
- // we should effectively return.
//
- // @@BALA: Returning here will reduce the throughput. We shold try
- // reading again to see if we could get more data..
+ // We don't want to try to re-read earlier because we may not have
+ // an uncompleted message until we get to this point. So, if we did
+ // it earlier, we could have missed the opportunity to complete it
+ // and dispatch.
//
- // @@CJC: Good point; maybe we can go to the top again, but only try
- // to read at most 2-3 times before we just go on. Obviously, if we
- // complete the message (i.e., there's no uncompleted message), then
- // we go on to the next step.
+ // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
+ // to increase throughput!
+
if (this->uncompleted_message_)
{
if (number_of_read_attempts--)
@@ -1704,16 +1649,7 @@ read_from_the_connection:
number_of_read_attempts));
}
- // We're about to go back and try to read, but we need to insure
- // that there's space available in the message block to read!
- // We'll reset the message block...
- ACE_ASSERT (message_block.space() == 0);
- message_block.reset ();
-
- // Yes, this uses the much-maligned "goto"; get over it. TCP
- // implementations use them, too, for just the same sort of
- // situation.
- goto read_from_the_connection;
+ goto complete_message_and_possibly_enqueue;
}
else
{
@@ -1736,13 +1672,52 @@ read_from_the_connection:
return 1;
}
}
-#endif
+
// **** END CJC PMG CHANGES ****
- // Process the message
- return this->process_queue_head (rh);
+ return did_queue_message ? this->process_queue_head (rh) : 1;
}
+
+void
+TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
+{
+ if (this->uncompleted_message_ == 0)
+ return;
+
+ ssize_t n = 0;
+ size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
+ ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
+
+ // Try to complete this until we error or block right here...
+ for (ssize_t bytes = missing_data;
+ bytes != 0;
+ bytes -= n)
+ {
+ // .. do a read on the socket again.
+ n = this->recv (mb.wr_ptr (),
+ bytes,
+ max_wait_time);
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "read %d bytes on attempt\n",
+ this->id(), n));
+ }
+
+ if (n == 0 || n == -1)
+ {
+ break;
+ }
+
+ mb.wr_ptr (n);
+ missing_data -= n;
+ }
+}
+
+
int
TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
{
@@ -1827,490 +1802,6 @@ TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
#endif
}
-#if 0
-int
-TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
-{
- // Parse the incoming message for validity. The check needs to be
- // performed by the messaging objects.
- if (this->parse_incoming_messages (block) == -1)
- return -1;
-
- // Check whether we have a complete message for processing
- ssize_t missing_data = this->missing_data (block);
-
-
- if (missing_data < 0)
- {
- // If we have more than one message
- return this->consolidate_extra_messages (block,
- rh);
- }
- else if (missing_data > 0)
- {
- // If we have missing data then try doing a read or try queueing
- // them.
- return this->consolidate_message (block,
- missing_data,
- rh,
- max_wait_time);
- }
-
- return 1;
-}
-
-int
-TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
-{
- // If we have a queue and if the last message is not complete a
- // complete one, then this read will get us the remaining data. So
- // do not try to parse the header if we have an incomplete message
- // in the queue.
- if (this->incoming_message_queue_.is_tail_complete () != 0)
- {
- // As it looks like a new message has been read, process the
- // message. Call the messaging object to do the parsing..
- int retval =
- this->messaging_object ()->parse_incoming_messages (block);
-
- if (retval == -1)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
- "error in incoming message\n",
- this->id ()));
-
- this->send_connection_closed_notifications ();
- return -1;
- }
- }
-
- return 0;
-}
-
-
-size_t
-TAO_Transport::missing_data (ACE_Message_Block &incoming)
-{
- // If we have a incomplete message in the queue then find out how
- // much of data is required to get a complete message.
- if (this->incoming_message_queue_.is_tail_complete () == 0)
- {
- return this->incoming_message_queue_.missing_data_tail ();
- }
-
- return this->messaging_object ()->missing_data (incoming);
-}
-
-
-int
-TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
-{
- // Check whether the last message in the queue is complete..
- if (this->incoming_message_queue_.is_tail_complete () == 0)
- return this->consolidate_message_queue (incoming,
- missing_data,
- rh,
- max_wait_time);
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
- this->id ()));
- }
-
- // Calculate the actual length of the load that we are supposed to
- // read which is equal to the <missing_data> + length of the buffer
- // that we have..
- size_t payload = missing_data + incoming.size ();
-
- // Grow the buffer to the size of the message
- ACE_CDR::grow (&incoming,
- payload);
-
- ssize_t n = 0;
-
- // As this used for transports where things are available in one
- // shot this looping should not create any problems.
- for (ssize_t bytes = missing_data;
- bytes != 0;
- bytes -= n)
- {
- // .. do a read on the socket again.
- n = this->recv (incoming.wr_ptr (),
- bytes,
- max_wait_time);
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "read %d bytes on attempt\n",
- this->id(), n));
- }
-
- if (n == 0 || n == -1)
- {
- break;
- }
-
- incoming.wr_ptr (n);
- missing_data -= n;
- }
-
- // If we got an error..
- if (n == -1)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Trasport[%d]::consolidate_message, "
- "error while trying to consolidate\n",
- this->id ()));
- }
- this->send_connection_closed_notifications ();
- return -1;
- }
-
- // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
- // have to put the message in the queue anyway. So let us proceed
- // to do that and return...
-
- // Check to see if we have messages in queue or if we have missing
- // data . AT this point we cannot have have semi-complete messages
- // in the queue as they would have been taken care before. Put
- // ourselves in the queue and then try processing one of the
- // messages..
- if ((missing_data > 0
- ||this->incoming_message_queue_.queue_length ())
- && this->incoming_message_queue_.is_tail_fragmented () == 0)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "queueing up the message\n",
- this->id ()));
- }
-
- // Get a queued data
- TAO_Queued_Data *qd =
- this->make_queued_data (incoming);
-
- // Add the missing data to the queue
- qd->missing_data_ = missing_data;
-
- // Get the rest of the messaging data
- this->messaging_object ()->get_message_data (qd);
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- if (this->incoming_message_queue_.is_head_complete ())
- return this->process_queue_head (rh);
-
- return 0;
- }
-
- // We dont have any missing data. Just make a queued_data node with
- // the existing message block and send it to the higher layers of
- // the ORB.
- TAO_Queued_Data pqd (&incoming,
- this->orb_core_->transport_message_buffer_allocator ());
- pqd.missing_data_ = missing_data;
- this->messaging_object ()->get_message_data (&pqd);
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (pqd.more_fragments_ ||
- (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the queued data as it is on stack..
- TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
-
- return this->consolidate_fragments (nqd, rh);
- }
-
- // Now we have a full message in our buffer. Just go ahead and
- // process that
- return this->process_parsed_messages (&pqd,
- rh);
-}
-
-int
-TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
- TAO_Resume_Handle &rh)
-{
- // If we have received a fragment message then we have to
- // consolidate <qd> with the last message in queue
- // @@todo: this piece of logic follows GIOP a bit... Need to revisit
- // if we have protocols other than GIOP
-
- // @@todo: Fragments now have too much copying overhead. Need to get
- // them out if we want to have some reasonable performance metrics
- // in future.. Post 1.2 seems a nice time..
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- TAO_Queued_Data *tqd =
- this->incoming_message_queue_.dequeue_tail ();
-
- tqd->more_fragments_ = qd->more_fragments_;
- tqd->missing_data_ = qd->missing_data_;
-
- if (this->messaging_object ()->consolidate_fragments (tqd,
- qd) == -1)
- return -1;
-
-
- TAO_Queued_Data::release (qd);
-
- this->incoming_message_queue_.enqueue_tail (tqd);
-
- this->process_queue_head (rh);
- }
- else
- {
- // if we dont have a fragment already in the queue just add it in
- // the queue
- this->incoming_message_queue_.enqueue_tail (qd);
- }
-
- return 0;
-}
-
-int
-TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
-{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
- this->id ()));
- }
-
- // If the queue did not have a complete message put this piece of
- // message in the queue. We kow it did not have a complete
- // message. That is why we are here.
- size_t n =
- this->incoming_message_queue_.copy_tail (incoming);
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) TAO_Transport[%d]::consolidate_message_queue",
- "copied [%d] bytes to the tail \n",
- this->id (),
- n));
- }
-
- // Update the missing data...
- missing_data =
- this->incoming_message_queue_.missing_data_tail ();
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) TAO_Transport[%d]::consolidate_message_queue",
- "missing [%d] bytes in the tail messahe \n",
- this->id (),
- missing_data));
- }
-
- // Move the read pointer of the <incoming> message block to the end
- // of the copied message and process the remaining portion...
- incoming.rd_ptr (n);
-
- // If we have some more information left in the message block..
- if (incoming.length ())
- {
- // We may have to parse & consolidate. This part of the message
- // doesn't seem to be part of the last message in the queue (as
- // the copy () hasn't taken away this message).
- int retval = this->parse_consolidate_messages (incoming,
- rh,
- max_wait_time);
-
- // If there is an error return
- if (retval == -1)
- {
- if (TAO_debug_level)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "error while consolidating, part of the read message\n",
- this->id ()));
- }
- return retval;
- }
- else if (retval == 1)
- {
- // If the message in the <incoming> message block has only
- // one message left we need to process that seperately.
-
- // Get a queued data
- TAO_Queued_Data *qd = this->make_queued_data (incoming);
-
- // Get the rest of the message data
- this->messaging_object ()->get_message_data (qd);
-
- // Add the missing data to the queue
- qd->missing_data_ = 0;
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (qd->more_fragments_ ||
- (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- return this->consolidate_fragments (qd, rh);
- }
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // We should surely have a message in queue now. So just
- // process that.
- return this->process_queue_head (rh);
- }
-
- // parse_consolidate_messages () would have processed one of the
- // messages, so we better return as we dont want to starve other
- // threads.
- return 0;
- }
-
- // If we still have some missing data..
- if (missing_data > 0)
- {
- // Get the last message from the Queue
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (TAO_debug_level > 5)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) TAO_Transport[%d]::consolidate_message_queue",
- " trying recv, again \n",
- this->id ()));
-
- // Try to do a read again. If we have some luck it would be
- // great..
- ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
- missing_data,
- max_wait_time);
-
- if (TAO_debug_level > 5)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) TAO_Transport[%d]::consolidate_message_queue",
- " recv retval [%d] \n",
- this->id (),
- n));
- // Error...
- if (n < 0)
- return n;
-
- // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
- // message in queue before returning..
- // Move the write pointer
- qd->msg_block_->wr_ptr (n);
-
- // Decrement the missing data
- qd->missing_data_ -= n;
-
- // Now put the TAO_Queued_Data back in the queue
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // Any way as we have come this far and are about to return,
- // just try to process a message if it is there in the queue.
- if (this->incoming_message_queue_.is_head_complete ())
- return this->process_queue_head (rh);
-
- return 0;
- }
-
- // Process a message in the head of the queue if we have one..
- return this->process_queue_head (rh);
-}
-
-
-int
-TAO_Transport::consolidate_extra_messages (ACE_Message_Block
- &incoming,
- TAO_Resume_Handle &rh)
-{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
- this->id ()));
- }
-
- // Pick the tail of the queue
- TAO_Queued_Data *tail =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (tail)
- {
- // If we have a node in the tail, checek to see whether it needs
- // consolidation. If so, just consolidate it.
- if (this->messaging_object ()->consolidate_node (tail,
- incoming) == -1)
- return -1;
-
- // .. put the tail back in queue..
- this->incoming_message_queue_.enqueue_tail (tail);
- }
-
- int retval = 1;
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
- "extracting extra messages\n",
- this->id ()));
- }
-
- // Extract messages..
- while (retval == 1)
- {
- TAO_Queued_Data *q_data = 0;
-
- retval =
- this->messaging_object ()->extract_next_message (incoming,
- q_data);
- if (q_data)
- {
- // If we have read a framented message then...
- if (q_data->more_fragments_ ||
- q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- this->consolidate_fragments (q_data, rh);
- }
- else
- {
- this->incoming_message_queue_.enqueue_tail (q_data);
- }
- }
- }
-
- // In case of error return..
- if (retval == -1)
- {
- return retval;
- }
-
- return this->process_queue_head (rh);
-}
-#endif
int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
@@ -2422,59 +1913,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
-#if 0
-TAO_Queued_Data *
-TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
-{
- // Get an instance of TAO_Queued_Data
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // Get the flag for the details of the data block...
- ACE_Message_Block::Message_Flags flg =
- incoming.self_flags ();
-
- if (ACE_BIT_DISABLED (flg,
- ACE_Message_Block::DONT_DELETE))
- {
- // Duplicate the data block before putting it in the queue.
- qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
- }
- else
- {
- // As we are in CORBA mode, all the data blocks would be aligned
- // on an 8 byte boundary. Hence create a data block for more
- // than the actual length
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (incoming.length ()+
- ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocator..
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- // Make message block..
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- // Duplicate the block..
- qd->msg_block_ = mb.duplicate ();
-
- // Align the message block
- ACE_CDR::mb_align (qd->msg_block_);
-
- // Copy the data..
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- }
-
-
- return qd;
-}
-#endif
-
int
TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
{
@@ -2484,65 +1922,62 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
"TAO (%P|%t) - Transport[%d]::process_queue_head\n",
this->id ()));
}
+
+ if (this->incoming_message_queue_.queue_length () == 0)
+ return 1;
- // See if the message in the head of the queue is complete...
- if (this->incoming_message_queue_.is_head_complete () > 0)
- {
- // Get the message on the head of the queue..
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_head ();
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
- if (TAO_debug_level > 3)
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "the size of the queue is [%d]\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
+ }
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.queue_length () > 0)
+ {
+ if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "the size of the queue is [%d]\n",
- this->id (),
- this->incoming_message_queue_.queue_length()));
- }
- // Now that we have pulled out out one message out of the queue,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.is_head_complete () > 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "notify reactor\n",
- this->id ()));
-
- }
- int retval =
- this->notify_reactor ();
+ "notify reactor\n",
+ this->id ()));
- if (retval == 1)
- {
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
- }
- else if (retval < 0)
- return -1;
}
- else
+ int retval =
+ this->notify_reactor ();
+
+ if (retval == 1)
{
- // As we are ready to process the last message just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
}
-
- // Process the message...
- if (this->process_parsed_messages (qd,
- rh) == -1)
+ else if (retval < 0)
return -1;
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ }
- // Delete the Queued_Data..
- TAO_Queued_Data::release (qd);
+ // Process the message...
+ if (this->process_parsed_messages (qd,
+ rh) == -1)
+ return -1;
- return 0;
- }
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
- return 1;
+ return 0;
}
int
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index dc542568f9b..51bec616bc2 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -555,6 +555,7 @@ public:
virtual int handle_input_i (TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time = 0,
int block = 0);
+ void try_to_complete (ACE_Time_Value *max_wait_time);
enum
@@ -649,56 +650,6 @@ protected:
*/
virtual int register_handler_i (void) = 0;
-#if 0
- /// Called by the handle_input_i (). This method is used to parse
- /// message read by the handle_input_i () call. It also decides
- /// whether the message needs consolidation before processing.
- int parse_consolidate_messages (ACE_Message_Block &bl,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *time = 0);
-
-
- /// Method does parsing of the message if we have a fresh message in
- /// the <message_block> or just returns if we have read part of the
- /// previously stored message.
- int parse_incoming_messages (ACE_Message_Block &message_block);
-
- /// Return if we have any missing data in the queue of messages
- /// or determine if we have more information left out in the
- /// presently read message to make it complete.
- size_t missing_data (ACE_Message_Block &message_block);
-
- /// Consolidate the currently read message or consolidate the last
- /// message in the queue. The consolidation of the last message in
- /// the queue is done by calling consolidate_message_queue ().
- virtual int consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time);
-
- /// @@Bala: Docu???
- int consolidate_fragments (TAO_Queued_Data *qd,
- TAO_Resume_Handle &rh);
-
-
-
- /// First consolidate the message queue. If the message is still not
- /// complete, try to read from the handle again to make it
- /// complete. If these dont help put the message back in the queue
- /// and try to check the queue if we have message to process. (the
- /// thread needs to do some work anyway :-))
- int consolidate_message_queue (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time);
-
- /// Called by parse_consolidate_message () if we have more messages
- /// in one read. Queue up the messages and try to process one of
- /// them, atleast at the head of them.
- int consolidate_extra_messages (ACE_Message_Block &incoming,
- TAO_Resume_Handle &rh);
-#endif
-
/// Process the message by sending it to the higher layers of the
/// ORB.
int process_parsed_messages (TAO_Queued_Data *qd,