diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 2003-03-18 18:46:51 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 2003-03-18 18:46:51 +0000 |
commit | ec2d0ee6d02253a14e88337b6575a3d679ae5f82 (patch) | |
tree | 034e1b26b680a589ec9f838b0c8a6402f50fefe3 | |
parent | 8dd085b29748a224a8ca5b425cfcd80c497c63e5 (diff) | |
download | ATCD-ec2d0ee6d02253a14e88337b6575a3d679ae5f82.tar.gz |
Code cleanup (removal of #if0 stuff), performance improvements, etc.
-rw-r--r-- | TAO/PMBChangeLog | 351 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 319 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 27 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.i | 7 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.cpp | 34 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.h | 36 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.cpp | 142 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.h | 19 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.inl | 21 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 222 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 15 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 18 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 27 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 813 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 51 |
15 files changed, 435 insertions, 1667 deletions
diff --git a/TAO/PMBChangeLog b/TAO/PMBChangeLog index 06d916b5f15..5cf756fe720 100644 --- a/TAO/PMBChangeLog +++ b/TAO/PMBChangeLog @@ -1,266 +1,343 @@ +Mon Mar 17 11:09:00 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/GIOP_Message_Base.cpp (check_for_valid_header): Optimized + implementation. + + * tao/GIOP_Message_Base.* (header_length): Moved to inline for + optimization. + + * tao/GIOP_Message_State.cpp (TAO_Debug_Msg_Emitter_Guard): + Improved implementation so it didn't make copies of the message + even when it was never going to output anything due to debug + level being lower than the value that would cause debug output. + + * tao/GIOP_Message_State.cpp (take_values_from_message_block): + * tao/GIOP_Message_State.cpp (set_version_info_from_buffer): + * tao/GIOP_Message_State.cpp (set_byte_order_info_from_buffer): + * tao/GIOP_Message_State.cpp (set_payload_size_from_buffer): + + Removed use of TAO_Debug_Msg_Emitter_Guard. + + * tao/GIOP_Message_State.inl (set_payload_size_from_buffer): + + Moved to inline file. + + * tao/Incoming_Message_Queue.h (TAO_Incoming_Message_Queue): + Changed name of member 'queued_data_' to 'last_added_' to be + more clear about what the pointer actually points to. + + Added documentation that describes the structure of the linked + list so that hopefully future maintainers don't have to go + through the same learning curve I did. + + * tao/Incoming_Message_Queue.*: Updates to reflect last_added_ + member name change. + + * tao/Incoming_Message_Queue.cpp (dequeue_head): Add check to not + allow dequeuing when size is zero, plus we now zero-out the + last_added_ pointer after we've dequeued the last item in the + list. + + * tao/Incoming_Message_Queue.cpp (clone_mb_nocopy_size): Fix + setting and copying of flags on the message and data blocks so + that the DONT_DELETE flag specifically doesn't get copied. This + cured a HUGE memory leak that was causing substantial + performance problems. + + * tao/Incoming_Message_Queue.cpp (make_uncompleted_message): + * tao/Incoming_Message_Queue.cpp (make_completed_message): + + Cache values used over and over again within these methods such + as header and message block lengths + + * tao/Transport.cpp (handle_input_i): + + Added flag noting that we enqueued a message. + + Changed how re-reading is performed; see entry for + try_to_complete for information. + + Used ACE_CDR::grow to grow the uncompleted_message message block + once the payload size is known. This insures that the growth + happens with proper alignment constraints. + + * tao/Transport.* (try_to_complete): Added this method which + tries to complete whatever partial message is held in + uncompleted_message_. The difference between doing this and + simply revisiting the top of handle_input_i is that this will + only try to read enough to complete the message rather than also + read another partial. + + * tao/Transport.cpp (process_queue_head): Added check to insure + that we don't try to do any processing when there's nothing in + the queue. Also, removed old code leftover from when the + incoming queue could have partial GIOP messages in it. Finally, + insured that the return value matched previous requirements. + + Thu Mar 6 15:27:14 2003 Chris Cleeland <cleeland_c@ociweb.com> * tao/Incoming_Message_Queue.cpp: Removed Shattering Encapsulation - hack around ace/Message_Block.h inclusion to make this link - properly on VC6. + hack around ace/Message_Block.h inclusion to make this link + properly on VC6. * tao/GIOP_Message_State.cpp: Moved initialization of - TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class - declaration. + TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class + declaration. Wed Mar 5 13:08:37 2003 Chris Cleeland <cleeland_c@ociweb.com> * tao/GIOP_Message_Generator_Parser_Impl.inl (check_revision): - Modified to only explicitly allow versions of GIOP that TAO - supports, and no others. Note that this change means that - whenever a new version of GIOP gets added to TAO, this function - must be updated, too. Hopefully that won't happen so often that - this becomes horribly burdensome. Thanks to Chad Elliott and Paul - Calabrese for ideas in getting this right. + Modified to only explicitly allow versions of GIOP that TAO + supports, and no others. Note that this change means that + whenever a new version of GIOP gets added to TAO, this function + must be updated, too. Hopefully that won't happen so often that + this becomes horribly burdensome. Thanks to Chad Elliott and + Paul Calabrese for ideas in getting this right. * tao/Incoming_Message_Queue.* (make_uncompleted_message): - This now behaves similar to make_completed_message, i.e., it - allocates the message block held in TAO_Queued_Data and advnaced - the rd_ptr past the bytes it consumes. + This now behaves similar to make_completed_message, i.e., it + allocates the message block held in TAO_Queued_Data and advnaced + the rd_ptr past the bytes it consumes. * tao/Incoming_Message_Queue.cpp: Eliminated hacks on - ACE_Message_Block and replaced with functions - clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the - public API. make_completed_message() and - make_uncompleted_message() now use these. + ACE_Message_Block and replaced with functions + clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the + public API. make_completed_message() and + make_uncompleted_message() now use these. * tao/Transport.cpp (handle_input_i): Enlarged the buffer size and - replaced the magic number for the maximum number of re-read - attempts with the manifest constant - TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h. + replaced the magic number for the maximum number of re-read + attempts with the manifest constant + TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h. - Changed uses of make_uncompleted_message() to be consistent with - changes outlined above. + Changed uses of make_uncompleted_message() to be consistent with + changes outlined above. - Corrected a bug where, upon re-reading after completely emptying - the stack-allocated message block, I forgot to reset the - rd_ptr/wr_ptr in the message block. + Corrected a bug where, upon re-reading after completely emptying + the stack-allocated message block, I forgot to reset the + rd_ptr/wr_ptr in the message block. * tao/orbconf.h: Added new manifest constant - TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file. + TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file. Wed Feb 26 21:44:34 2003 Chris Cleeland <cleeland_c@ociweb.com> * TAO/tao/GIOP_Message_Base.cpp: - Eliminate parse_incoming_messages(), missing_data(), - extract_next_message(), consolidate_node(), - consolidate_fragments(), get_message_data(), make_queued_data(). + Eliminate parse_incoming_messages(), missing_data(), + extract_next_message(), consolidate_node(), + consolidate_fragments(), get_message_data(), make_queued_data(). - Add new methods set_queued_data_from_message_header() and - check_for_valid_header(). + Add new methods set_queued_data_from_message_header() and + check_for_valid_header(). * TAO/tao/GIOP_Message_Base.h: - Eliminate #if0 parse_incoming_messages(), missing_data(), - extract_next_message(), consolidate_node(), - consolidate_fragments(), get_message_data(), and - make_queued_data() declarations. + Eliminate #if0 parse_incoming_messages(), missing_data(), + extract_next_message(), consolidate_node(), + consolidate_fragments(), get_message_data(), and + make_queued_data() declarations. - Add declarations for set_queued_data_from_message_header() and - check_for_valid_header(). + Add declarations for set_queued_data_from_message_header() and + check_for_valid_header(). - Make message_type() static. + Make message_type() static. * TAO/tao/GIOP_Message_Generator_Parser_Impl.inl: - Reimplement check_revision() to be exact about which GIOP - revisions are okay. This is different from before where it - loosely assumed that everything was valid unless it want higher - than a particular range. + Reimplement check_revision() to be exact about which GIOP + revisions are okay. This is different from before where it + loosely assumed that everything was valid unless it want higher + than a particular range. * TAO/tao/GIOP_Message_List.cpp: - Remove parse_incoming_messages() thru consolidate_fragments(); - remove make_queued_data(). + Remove parse_incoming_messages() thru consolidate_fragments(); + remove make_queued_data(). - Add set_queued_data_from_message_header() and - check_for_valid_header(). + Add set_queued_data_from_message_header() and + check_for_valid_header(). * TAO/tao/GIOP_Message_Lite.h: - Remove declarations for parse_incoming_messages() thru - consolidate_fragments(); remove make_queued_data(). + Remove declarations for parse_incoming_messages() thru + consolidate_fragments(); remove make_queued_data(). - Add declarations for set_queued_data_from_message_header() and - check_for_valid_header(). + Add declarations for set_queued_data_from_message_header() and + check_for_valid_header(). * TAO/tao/GIOP_Message_State.cpp: - Add TAO_Debug_Message_Emitter_Guard class within the compilation - unit. + Add TAO_Debug_Message_Emitter_Guard class within the compilation + unit. - Change CTOR not to use the 2nd argument. + Change CTOR not to use the 2nd argument. - Change CTOR not to initialize this->base_with the 2nd arg. + Change CTOR not to initialize this->base_with the 2nd arg. - Add take_values_from_message_block(). + Add take_values_from_message_block(). - Remove parse_magic_bytes(), parse_message_header_i(), - parse_message_header(). + Remove parse_magic_bytes(), parse_message_header_i(), + parse_message_header(). - Rename get_version_info() to set_version_info_from_buffer(); - change argument type to 'const char*'. + Rename get_version_info() to set_version_info_from_buffer(); + change argument type to 'const char*'. - Rename get_byte_order_info() to set_byte_order_info_from_buffer(); - change argument type to 'const char*'. + Rename get_byte_order_info() to set_byte_order_info_from_buffer(); + change argument type to 'const char*'. - Rename get_payload_size() to set_payload_size_from_buffer(); - change argument type to 'const char*'. + Rename get_payload_size() to set_payload_size_from_buffer(); + change argument type to 'const char*'. - Change argument type for read_ulong() to 'const char*'. + Change argument type for read_ulong() to 'const char*'. * TAO/tao/GIOP_Message_State.h: - Add default value for CTOR arguments so we don't have to pass - them. + Add default value for CTOR arguments so we don't have to pass + them. - Add declaration for take_values_from_message_block(). + Add declaration for take_values_from_message_block(). - #if0 parse_message_header() declaration. + #if0 parse_message_header() declaration. - Add documentation for byte_order(). + Add documentation for byte_order(). - Add new GIOP information accessors: giop_version(), - more_fragments(), and message_type(). + Add new GIOP information accessors: giop_version(), + more_fragments(), and message_type(). - #if0 declaration for parse_message_header_i() and - parse_magic_bytes(). + #if0 declaration for parse_message_header_i() and + parse_magic_bytes(). - Rename delcarations for get_version_info(), get_byte_order_info(), - and get_payload_size(). + Rename delcarations for get_version_info(), get_byte_order_info(), + and get_payload_size(). - #if0 base_ member variable. + #if0 base_ member variable. * TAO/tao/GIOP_Message_State.inl: - Add definitions for giop_version(), more_fragments(), and - message_type(). + Add definitions for giop_version(), more_fragments(), and + message_type(). * TAO/tao/Incoming_Message_Queue.cpp: - Add #include for ace/Message_Block.h, but shatter encapsulation - and effectively extend its interface so that we can clone spans of - message blocks. + Add #include for ace/Message_Block.h, but shatter encapsulation + and effectively extend its interface so that we can clone spans of + message blocks. - Modify copy_tail() to use new TAO_Queued_Data member names. + Modify copy_tail() to use new TAO_Queued_Data member names. * TAO/tao/Incoming_Message_Queue.cpp (class TAO_Queued_Data): - Update CTORs to initialize new this->current_state_ data member. + Update CTORs to initialize new this->current_state_ data member. - Add new make_uncompleted_message(), ACE_Message_Block hack - "extensions", make_completed_message(). + Add new make_uncompleted_message(), ACE_Message_Block hack + "extensions", make_completed_message(). - Rename get_queued_data() to make_queued_data(). + Rename get_queued_data() to make_queued_data(). * TAO/tao/Incoming_Message_Queue.h: - Changed #include <Pluggable_Messaging_Utils.h> to #include - <Pluggable_Message.h> + Changed #include <Pluggable_Messaging_Utils.h> to #include + <Pluggable_Message.h> - Documentation for is_tail_complete() and is_head_complete() and - is_tail_fragmented(). + Documentation for is_tail_complete() and is_head_complete() and + is_tail_fragmented(). - #if0 get_node(). + #if0 get_node(). * TAO/tao/Incoming_Message_Queue.h (class TAO_Queued_Data): - Make CTORs protected so creation can only go through new factory - methods, i.e., make_completed_message() and - make_uncompleted_message(). + Make CTORs protected so creation can only go through new factory + methods, i.e., make_completed_message() and + make_uncompleted_message(). - Add new factory methods make_completed_message() and - make_uncompleted_message(). + Add new factory methods make_completed_message() and + make_uncompleted_message(). - Add release() method [in addition to static of same name]. + Add release() method [in addition to static of same name]. - Add explicit definition of various states and a member to hold the - current state (this->current_state_). + Add explicit definition of various states and a member to hold the + current state (this->current_state_). - Rename this->missing_data_ flag/value with - this->missing_data_bytes_ count. + Rename this->missing_data_ flag/value with + this->missing_data_bytes_ count. - Documentation for this->byte_order_. + Documentation for this->byte_order_. * TAO/tao/Incoming_Message_Queue.inl: - Replace uses of this->missing_data_ with - this->missing_data_bytes_. + Replace uses of this->missing_data_ with + this->missing_data_bytes_. - #if0 get_node(). + #if0 get_node(). - Add release() method. + Add release() method. * TAO/tao/Pluggable_Message.h: - #if0 parse_incoming_messages() through consolidate_fragments(). + #if0 parse_incoming_messages() through consolidate_fragments(). - Add declarations for check_for_valid_header() and - set_queued_data_from_message_header(). + Add declarations for check_for_valid_header() and + set_queued_data_from_message_header(). * TAO/tao/Transport.cpp: - Add #include <ace/Min_Max.h> + Add #include <ace/Min_Max.h> - Add uncompleted_message_ to initialization list. + Add uncompleted_message_ to initialization list. - Modify what happens when reading in handle_input_i(): + Modify what happens when reading in handle_input_i(): - - number_of_read_attempts is how many times we'll try to read - again to complete a message before we give up. + - number_of_read_attempts is how many times we'll try to read + again to complete a message before we give up. - - read_from_the_connection label added + - read_from_the_connection label added - - correct 1st argument to this->recv() to be wr_ptr() rather than - rd_ptr(). + - correct 1st argument to this->recv() to be wr_ptr() rather than + rd_ptr(). - - lots of changes that are substantially documented in the code. + - lots of changes that are substantially documented in the code. - #if0 parse_consolidate_messages(), parse_incoming_messages(), - missing_data(), consolidate_message(), consolidate_fragments(), - consolidate_message_queue(), consolidate_extra_messages(), and - make_queued_data(). + #if0 parse_consolidate_messages(), parse_incoming_messages(), + missing_data(), consolidate_message(), consolidate_fragments(), + consolidate_message_queue(), consolidate_extra_messages(), and + make_queued_data(). * TAO/tao/Transport.h: - #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(), - missing_data(), consolidate_message(), consolidate_fragments(), - consolidate_message_queue(), consolidate_extra_messages(), and - make_queued_data(). + #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(), + missing_data(), consolidate_message(), consolidate_fragments(), + consolidate_message_queue(), consolidate_extra_messages(), and + make_queued_data(). - Add this->uncompleted_message_ data member. + Add this->uncompleted_message_ data member. * TAO/tao/Strategies/DIOP_Transport.cpp: - Change 1st arg of this->recv() in handle_input_i() to wr_ptr() - from rd_ptr(). + Change 1st arg of this->recv() in handle_input_i() to wr_ptr() + from rd_ptr(). * TAO/tao/Strategies/SHMIOP_Transport.cpp: - #if0 consolidate_message(). + #if0 consolidate_message(). * TAO/tao/Strategies/SHMIOP_Transport.h: - #if0 declaration for consolidate_message(). + #if0 declaration for consolidate_message(). * TAO/tests/AMI/run_test.pl: - Increase default number of iterations from 1 to 10000. Earlier - versions of the PMB changes succeeded just fine with low numbers - of iterations, but began to fail miserably when the number of - iterations climbed. + Increase default number of iterations from 1 to 10000. Earlier + versions of the PMB changes succeeded just fine with low numbers + of iterations, but began to fail miserably when the number of + iterations climbed. * TAO/tests/BiDirectional/run_test.pl: - Added level 10 debug. + Added level 10 debug. * TAO/tests/InterOp-Naming/INS_test_client.cpp: - Changes that don't appear to be related in any way to PMB... + Changes that don't appear to be related in any way to PMB... diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 1e3d72512d8..10f852d3656 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -309,266 +309,6 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } -#if 0 -int -TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) -{ - - if (this->message_state_.parse_message_header (incoming) == -1) - { - return -1; - } - - return 0; -} - -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) -{ - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return -1; - } - else if (len == msg_size) - return 0; - - return msg_size - len; -} - - -int -TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) -{ - TAO_GIOP_Message_State state (this->orb_core_, - this); - - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - if (incoming.length () > 0) - { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - qd->missing_data_ = -1; - } - return 0; - } - - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - size_t copying_len = state.message_size (); - - qd = this->make_queued_data (copying_len); - - if (copying_len > incoming.length ()) - { - qd->missing_data_ = - copying_len - incoming.length (); - - copying_len = incoming.length (); - } - - qd->msg_block_->copy (incoming.rd_ptr (), - copying_len); - - incoming.rd_ptr (copying_len); - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - return 1; -} - -int -TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) -{ - // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) - { - // The data length that has been stuck in there during the last - // read .... - size_t len = - qd->msg_block_->length (); - - // We know that we would have space for - // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data - // from the <incoming> into the message block in <qd> - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); - - // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); - - TAO_GIOP_Message_State state (this->orb_core_, - this); - - // Parse the message header now... - if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) - { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); - } - - // Copy the pay load.. - // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.payload_size (); - - // If the data that needs to be copied is more than that is - // available to us .. - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - else - { - qd->missing_data_ = 0; - } - - // ..now we are set to copy the right amount of data to the - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - incoming.rd_ptr (copy_len); - - // Get the other details... - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - } - else - { - // @@todo: Need to abstract this out to a seperate method... - size_t copy_len = qd->missing_data_; - - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - - // Copy the right amount of data in to the node... - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - qd->msg_block_->rd_ptr (copy_len); - - } - - - return 0; -} - - -int -TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) -{ - if (dqd->byte_order_ != sqd->byte_order_ - || dqd->major_version_ != sqd->major_version_ - || dqd->minor_version_ != sqd->minor_version_) - { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:") - ACE_TEXT ("different GIOP versions or byte order\n"))); - return -1; - } - - // Skip the header in the incoming message - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // If we have a fragment header skip the header length too.. - if (sqd->minor_version_ == 2 && - sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - - // Get the length of the incoming message block.. - size_t incoming_length = - sqd->msg_block_->length (); - - // Increase the size of the destination message block if we need - // to. - ACE_Message_Block *mb = - dqd->msg_block_; - - // Check space before growing. - if (mb->space () < incoming_length) - { - ACE_CDR::grow (mb, - mb->length () + incoming_length); - } - - // Copy the data - dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), - incoming_length); - return 0; -} - -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - qd->byte_order_ = - this->message_state_.byte_order_; - qd->major_version_ = - this->message_state_.giop_version_.major; - qd->minor_version_ = - this->message_state_.giop_version_.minor; - - //qd->more_fragments_ = this->message_state_.more_fragments_; - - if (this->message_state_.more_fragments_) - qd->more_fragments_ = 1; - else - qd->more_fragments_ = 0; - - qd->msg_type_= - this->message_type (this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} -#endif - int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -1534,52 +1274,6 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -#if 0 -TAO_Queued_Data * -TAO_GIOP_Message_Base::make_queued_data (size_t sz) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // @@todo: We have a similar method in Transport.cpp. Need to see how - // we can factor them out.. - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - - - return qd; -} -#endif - - - -size_t -TAO_GIOP_Message_Base::header_length (void) const -{ - return TAO_GIOP_MESSAGE_HEADER_LEN; -} - void TAO_GIOP_Message_Base::set_queued_data_from_message_header ( TAO_Queued_Data *qd, @@ -1613,14 +1307,15 @@ TAO_GIOP_Message_Base::check_for_valid_header ( const ACE_Message_Block &mb ) const { - const char* magic_bytes = "GIOP"; - ACE_ASSERT (ACE_OS::strlen (magic_bytes) < header_length ()); - if (mb.length () < header_length ()) + // NOTE! We don't hardcode the length of the header b/c header_length should + // be eligible for inlining by pretty much any compiler, and it should return + // a constant. The rest of this method is hard-coded and hand-optimized because + // this method gets called A LOT. + if (mb.length () < this->header_length ()) return 0; // Is finding that it's the right length and the magic bytes present // enough to declare it a valid header? I think so... - return (ACE_OS::memcmp (mb.rd_ptr (), - magic_bytes, - ACE_OS::strlen (magic_bytes)) == 0); + register const char* h = mb.rd_ptr (); + return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P'); } diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 41845f24e22..139e0d10334 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -93,33 +93,6 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); -#if 0 - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); - - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); - - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); - - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd); - - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); -#endif - /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index a589447a413..25a644bc313 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -4,3 +4,10 @@ // // GIOP_Message_Base // + +ACE_INLINE size_t +TAO_GIOP_Message_Base::header_length (void) const +{ + return TAO_GIOP_MESSAGE_HEADER_LEN; +} + diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 5d923762331..5509ec8a81b 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -1598,40 +1598,6 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label, } } -#if 0 -TAO_Queued_Data * -TAO_GIOP_Message_Lite::make_queued_data (size_t sz) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data (); - - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - - return qd; -} -#endif - int TAO_GIOP_Message_Lite::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index 7affd27e6b8..c7abea1ba8d 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -85,42 +85,6 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); -#if 0 - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Get the message type. The return value would be one of the - /// following: - /// TAO_PLUGGABLE_MESSAGE_REQUEST, - /// TAO_PLUGGABLE_MESSAGE_REPLY, - /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, - /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - TAO_Pluggable_Message_Type message_type (void); - - - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); - - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); - - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); - - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd); - - /// @@Bala: Docu??? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); -#endif - /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index ef6cd7ce057..2eb62611184 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -17,6 +17,11 @@ public: TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg) : which_level_(debug_level) { + if (TAO_debug_level < this->which_level_) + { + msg_ = 0; + return; + } this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ]; ACE_OS::strcpy (this->msg_, msg); ACE_OS::strcat (this->msg_, " begin\n"); @@ -26,14 +31,17 @@ public: ~TAO_Debug_Msg_Emitter_Guard () { - if (TAO_debug_level >= this->which_level_) + if (this->msg_) { - char* begin_start = - this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; - ACE_OS::strcpy (begin_start, " end\n"); - ACE_DEBUG ((LM_DEBUG, this->msg_)); + if (TAO_debug_level >= this->which_level_) + { + char* begin_start = + this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; + ACE_OS::strcpy (begin_start, " end\n"); + ACE_DEBUG ((LM_DEBUG, this->msg_)); + } + delete[] this->msg_; } - delete[] this->msg_; } private: @@ -68,8 +76,6 @@ TAO_GIOP_Message_State::take_values_from_message_block ( const ACE_Message_Block& mb ) { - TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::take_values_from_message_block"); - const char* buf = mb.rd_ptr (); // Get the version information @@ -104,117 +110,9 @@ TAO_GIOP_Message_State::take_values_from_message_block ( return 0; } -#if 0 -int -TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) -{ - if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN) - { - // Parse the GIOP header - if (this->parse_message_header_i (incoming) == -1) - return -1; - } - - return 0; -} - -int -TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) -{ - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n" - )); - } - - // Grab the rd_ptr_ from the message block.. - char *buf = incoming.rd_ptr (); - - // Parse the magic bytes first - if (this->parse_magic_bytes (buf) == -1) - { - return -1; - } - - // Get the version information - if (this->get_version_info (buf) == -1) - return -1; - - // Get the byte order information... - if (this->get_byte_order_info (buf) == -1) - return -1; - - // Get the message type - this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - - - // Get the size of the message.. - this->get_payload_size (buf); - - if (this->message_size_ == 0) - { - if (this->message_type_ == TAO_GIOP_MESSAGERROR) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) -" - "GIOP_MESSAGE_ERROR received \n")); - } - return 0; - } - else - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - " - "Message of size zero recd. \n")); - return -1; - } - } - - if (this->more_fragments_) - { - (void) this->parse_fragment_header (buf, - incoming.length ()); - } - - return 0; -} - - - - -int -TAO_GIOP_Message_State::parse_magic_bytes (char *buf) -{ - // The values are hard-coded to support non-ASCII platforms. - if (!(buf [0] == 0x47 // 'G' - && buf [1] == 0x49 // 'I' - && buf [2] == 0x4f // 'O' - && buf [3] == 0x50)) // 'P' - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ") - ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), - buf[0], - buf[1], - buf[2], - buf[3])); - return -1; - } - - return 0; -} -#endif - int TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) { - TAO_Debug_Msg_Emitter_Guard (8, ACE_TEXT("TAO (%P|%t) GIOP_Message_State::set_version_info")); - // We have a GIOP message on hand. Get its revision numbers CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; @@ -246,8 +144,6 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) int TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) { - TAO_Debug_Msg_Emitter_Guard (8, "TAO (%P|%t) GIOP_Message_State::set_byte_order_info_from_buffer"); - // Let us be specific that this is for 1.0 if (this->giop_version_.minor == 0 && this->giop_version_.major == 1) @@ -292,16 +188,6 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) return 0; } -void -TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) -{ - TAO_Debug_Msg_Emitter_Guard (8, "(%P|%t) GIOP_Message_State::set_payload_size_from_buffer"); - // Move the read pointer - rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - - this->message_size_ = this->read_ulong (rd_ptr); -} - int diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index 319e37a3933..d6263022d0c 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -47,11 +47,6 @@ public: int take_values_from_message_block (const ACE_Message_Block& mb); -#if 0 - /// Parse the message header. - int parse_message_header (ACE_Message_Block &incoming); -#endif - /// Return the message size CORBA::ULong message_size (void) const; @@ -83,15 +78,6 @@ private: friend class TAO_GIOP_Message_Base; -#if 0 - /// Parse the message header. - int parse_message_header_i (ACE_Message_Block &incoming); - - /// Checks for the magic word 'GIOP' in the start of the incoing - /// stream - int parse_magic_bytes (char *buf); -#endif - /// Extracts the version information from the incoming /// stream. Performs a check for whether the version information is /// right and sets the information in the <state> @@ -116,11 +102,6 @@ private: private: -#if 0 - /// The GIOP base class.. - TAO_GIOP_Message_Base *base_; -#endif - // GIOP version information.. TAO_GIOP_Message_Version giop_version_; diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index f2f0011a7f6..80d421c7340 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -51,22 +51,11 @@ TAO_GIOP_Message_State::message_type () const return this->message_type_; } -#if 0 -ACE_INLINE int -TAO_GIOP_Message_State::message_fragmented (void) +ACE_INLINE void +TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) { - if (this->more_fragments) - return 1; + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - return 0; + this->message_size_ = this->read_ulong (rd_ptr); } - - - -ACE_INLINE CORBA::Boolean -TAO_GIOP_Message_State::header_received (void) const -{ - return this->message_size != 0; -} - -#endif diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 028491a8a09..19e29aaeff4 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -12,7 +12,7 @@ ACE_RCSID (tao, Incoming_Message_Queue, "$Id$") TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) - : queued_data_ (0), + : last_added_ (0), size_ (0), orb_core_ (orb_core) { @@ -42,21 +42,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) { // Check to see if the length of the incoming block is less than // that of the <missing_data_> of the tail. - if (block.length () <= this->queued_data_->missing_data_bytes_) + if (block.length () <= this->last_added_->missing_data_bytes_) { n = block.length (); } else { - n = this->queued_data_->missing_data_bytes_; + n = this->last_added_->missing_data_bytes_; } // Do the copy - this->queued_data_->msg_block_->copy (block.rd_ptr (), + this->last_added_->msg_block_->copy (block.rd_ptr (), n); // Decerement the missing data - this->queued_data_->missing_data_bytes_ -= n; + this->last_added_->missing_data_bytes_ -= n; } return n; @@ -65,17 +65,20 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) TAO_Queued_Data * TAO_Incoming_Message_Queue::dequeue_head (void) { + if (this->size_ == 0) + return 0; + // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = this->last_added_->next_; // Reset the head node.. - this->queued_data_->next_ = tmp->next_; - - // Decrease the size - --this->size_; + this->last_added_->next_ = head->next_; + + // Decrease the size and reset last_added_ if empty + if (--this->size_ == 0) + this->last_added_ = 0; - return tmp; + return head; } TAO_Queued_Data * @@ -86,23 +89,24 @@ TAO_Incoming_Message_Queue::dequeue_tail (void) return 0; // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = + this->last_added_->next_; - while (tmp->next_ != this->queued_data_) + while (head->next_ != this->last_added_) { - tmp = tmp->next_; + head = head->next_; } // Put the head in tmp. - tmp->next_ = this->queued_data_->next_; + head->next_ = this->last_added_->next_; - TAO_Queued_Data *ret_qd = this->queued_data_; + TAO_Queued_Data *ret_qd = this->last_added_; - this->queued_data_ = tmp; + this->last_added_ = head; // Decrease the size - --this->size_; + if (--this->size_ == 0) + this->last_added_ = 0; return ret_qd; } @@ -113,14 +117,14 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) { if (this->size_ == 0) { - this->queued_data_ = nd; - this->queued_data_->next_ = this->queued_data_; + this->last_added_ = nd; + this->last_added_->next_ = this->last_added_; } else { - nd->next_ = this->queued_data_->next_; - this->queued_data_->next_ = nd; - this->queued_data_ = nd; + nd->next_ = this->last_added_->next_; + this->last_added_->next_ = nd; + this->last_added_ = nd; } ++ this->size_; @@ -226,10 +230,10 @@ clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) ACE_CDR::mb_align (nb); - // Do whatever with the flags + // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since + // we just dynamically allocated the two things. nb->set_flags (mb->flags()); - nb->set_self_flags (mb->self_flags()); - // nb->clr_flags (mask); + nb->clr_flags (ACE_Message_Block::DONT_DELETE); return nb; } @@ -270,8 +274,9 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, TAO_Pluggable_Messaging &msging_obj, ACE_Allocator *alloc) { - TAO_Queued_Data *new_qd = 0; - const size_t HDR_LEN = msging_obj.header_length (); + register TAO_Queued_Data *new_qd = 0; + register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */ + register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */ // Validate arguments. if (mb == 0) @@ -282,7 +287,7 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, goto failure; // do we have enough bytes to make a complete header? - if (mb->length() >= msging_obj.header_length ()) + if (MB_LEN >= HDR_LEN) { // Since we have enough bytes to make a complete header, // the header needs to be valid. Check that now, and punt @@ -304,7 +309,7 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, // Of course, we don't have the whole message (if we did, we // wouldn't be here!), so we copy only what we've got, i.e., whatever's // in the message block. - if (copy_mb_span (new_qd->msg_block_, mb, mb->length ()) == 0) + if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) goto failure; // missing_data_bytes_ now has the full GIOP message size, but @@ -313,8 +318,8 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, // payload bytes" in mb. // // "actual payload bytes" :== length of mb (which included the header) - header length - new_qd->missing_data_bytes_ -= (mb->length () - HDR_LEN); - mb->rd_ptr (mb->length ()); + new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN); + mb->rd_ptr (MB_LEN); } } else @@ -322,10 +327,10 @@ TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER; new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN); if (new_qd->msg_block_ == 0 || - copy_mb_span (new_qd->msg_block_, mb, mb->length ()) == 0) + copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) goto failure; - new_qd->missing_data_bytes_ = msging_obj.header_length () - mb->length (); - mb->rd_ptr (mb->length ()); + new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN; + mb->rd_ptr (MB_LEN); } ACE_ASSERT (new_qd->current_state_ != INVALID); @@ -359,130 +364,6 @@ failure: return 0; } -#if 0 -/*! - \brief Act like ACE_Message_Block::clone, but only clone the part btw. rd_ptr and wr_ptr. - */ -// Follow #def's swiped from Message_Block.cpp -#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) -#define ACE_EXECUTION_TIME this->execution_time_ -#define ACE_DEADLINE_TIME this->deadline_time_ -#else -#define ACE_EXECUTION_TIME ACE_Time_Value::zero -#define ACE_DEADLINE_TIME ACE_Time_Value::max_time -#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */ - -static ACE_Data_Block* -clone_span_nocopy (/*const*/ ACE_Data_Block *the_db, - const char* beg, size_t span, - ACE_Message_Block::Message_Flags mask = 0) -{ - // Allocate a new data block through the same allocator as 'the_db' - const ACE_Message_Block::Message_Flags always_clear = - ACE_Message_Block::DONT_DELETE; - - ACE_Data_Block *nb; - ACE_Allocator *db_allocator = the_db->data_block_allocator (); - - ACE_NEW_MALLOC_RETURN (nb, - ACE_static_cast(ACE_Data_Block*, - db_allocator->malloc (sizeof (ACE_Data_Block))), - ACE_Data_Block (span, // size - the_db->msg_type (), // type - 0, // data - the_db->allocator_strategy (), // allocator - the_db->locking_strategy (), // locking strategy - the_db->flags (), // flags - db_allocator), - 0); - - - // Set new flags minus the mask... - nb->clr_flags (mask | always_clear); - - // Copy in the data, and set the pointer - // ACE_OS::memcpy (nb->base (), beg, span); - - return nb; -} - -static ACE_Message_Block* -clone_span (/*const*/ ACE_Message_Block *the_mb, size_t span_size, ACE_Message_Block::Message_Flags mask = 0) -{ - // Get a pointer to a "cloned" <ACE_Data_Block> (will copy the - // values rather than increment the reference count). - size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); - ACE_Data_Block *db = clone_span_nocopy (the_mb->data_block (), the_mb->rd_ptr (), aligned_size, mask); - if (db == 0) - return 0; - - ACE_Message_Block *nb; - - if(the_mb->message_block_allocator_ == 0) - { - ACE_NEW_RETURN (nb, - ACE_Message_Block (0, // size - ACE_Message_Block::ACE_Message_Type (0), // type - 0, // cont - 0, // data - 0, // allocator - 0, // locking strategy - 0, // flags - the_mb->priority_, // priority - ACE_EXECUTION_TIME, // execution time - ACE_DEADLINE_TIME, // absolute time to deadline - // Get a pointer to a - // "duplicated" <ACE_Data_Block> - // (will simply increment the - // reference count). - db, - db->data_block_allocator (), - the_mb->message_block_allocator_), - 0); - } - else - { - // This is the ACE_NEW_MALLOC macro with the return check removed. - // We need to do it this way because if it fails we need to release - // the cloned data block that was created above. If we used - // ACE_NEW_MALLOC_RETURN, there would be a memory leak because the - // above db pointer would be left dangling. - nb = ACE_static_cast(ACE_Message_Block*,the_mb->message_block_allocator_->malloc (sizeof (ACE_Message_Block))); - if(nb != 0) - new (nb) ACE_Message_Block (0, // size - ACE_Message_Block::ACE_Message_Type (0), // type - 0, // cont - 0, // data - 0, // allocator - 0, // locking strategy - 0, // flags - the_mb->priority_, // priority - ACE_EXECUTION_TIME, // execution time - ACE_DEADLINE_TIME, // absolute time to deadline - db, - db->data_block_allocator (), - the_mb->message_block_allocator_); - } - - if (nb == 0) - { - db->release (); - return 0; - } - - ACE_CDR::mb_align (nb); - nb->copy (the_mb->rd_ptr(), span_size); - - // Clone all the continuation messages if necessary. - if (the_mb->cont () != 0 - && (nb->cont_ = the_mb->cont ()->clone (mask)) == 0) - { - nb->release (); - return 0; - } - return nb; -} -#endif /*static*/ TAO_Queued_Data * @@ -490,18 +371,15 @@ TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, TAO_Pluggable_Messaging &msging_obj, ACE_Allocator *alloc) { - // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! - // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN - // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS - // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT - // BEHAVES DIFFERENTLY FROM make_uncompleted_message. + register const size_t HDR_LEN = msging_obj.header_length (); + register const size_t MB_LEN = mb.length (); // Validate arguments. - if (mb.length() < msging_obj.header_length ()) + if (MB_LEN < HDR_LEN) return 0; size_t total_msg_len = 0; - TAO_Queued_Data *new_qd = make_queued_data (alloc); + register TAO_Queued_Data *new_qd = make_queued_data (alloc); if (new_qd == 0) goto failure; @@ -517,8 +395,8 @@ TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, // *at least* the length of the message. Verify that we have that // many bytes in the message block and, if we don't, release the new // qd and fail. - total_msg_len = new_qd->missing_data_bytes_ + msging_obj.header_length (); - if (total_msg_len > mb.length ()) + total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN; + if (total_msg_len > MB_LEN) goto failure; // Make a copy of the relevant portion of mb and hang on to it @@ -551,10 +429,10 @@ failure: ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"), - &mb, mb.length ())); + &mb, MB_LEN)); if (TAO_debug_level >= 10) ACE_HEX_DUMP ((LM_DEBUG, - mb.rd_ptr (), mb.length (), + mb.rd_ptr (), MB_LEN, ACE_TEXT (" residual bytes in buffer"))); } diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index d4a830e10ba..6733c382264 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -120,14 +120,15 @@ private: friend class TAO_Transport; -#if 0 - /// Make a node for the queue. - TAO_Queued_Data *get_node (void); -#endif - private: - /// A linked listof messages that await processing - TAO_Queued_Data *queued_data_; + /*! + \brief A circular linked list of messages awaiting processing. + + \a last_message_added_ points to the most recent message added to + the list. The earliest addition can be easily accessed via + \a last_message_added_->next_. + */ + TAO_Queued_Data *last_added_; /// The size of the queue CORBA::ULong size_; diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index d04512d81b0..f82267b5cea 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void) return -1; if (this->size_ && - this->queued_data_->missing_data_bytes_ == 0) + this->last_added_->missing_data_bytes_ == 0) return 1; return 0; @@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void) return -1; if (this->size_ && - this->queued_data_->next_->missing_data_bytes_ == 0 && - this->queued_data_->next_->more_fragments_ == 0) + this->last_added_->next_->missing_data_bytes_ == 0 && + this->last_added_->next_->more_fragments_ == 0) return 1; return 0; @@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void) return 0; if (this->size_ && - this->queued_data_->more_fragments_ == 1) + this->last_added_->more_fragments_ == 1) return 1; return 0; @@ -55,20 +55,12 @@ ACE_INLINE size_t TAO_Incoming_Message_Queue::missing_data_tail (void) const { if (this->size_ != 0) - return this->queued_data_->missing_data_bytes_; + return this->last_added_->missing_data_bytes_; return 0; } -#if 0 -ACE_INLINE TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_node (void) -{ - return TAO_Queued_Data::get_queued_data (); -} -#endif - /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 2d460580d57..0d4acf4e485 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -113,33 +113,6 @@ public: virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; -#if 0 - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; - - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; - - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd) = 0; - - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) = 0; - - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) = 0; - - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) = 0; -#endif - /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index af00665aaca..3a76879c67a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1273,7 +1273,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, } - /* * * All the methods relevant to the incoming data path of the ORB are @@ -1294,7 +1293,6 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // First try to process messages off the head of the incoming queue. int retval = this->process_queue_head (rh); - if (retval <= 0) { if (retval == -1) @@ -1315,8 +1313,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // The buffer on the stack which will be used to hold the input // messages - // char buf [TAO_MAXBUFSIZE]; - char buf[2 * TAO_MAXBUFSIZE]; + char buf[TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1342,10 +1339,11 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // and that's it. unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; + unsigned int did_queue_message = 0; + // Align the message block ACE_CDR::mb_align (&message_block); -read_from_the_connection: size_t recv_size = 0; if (this->orb_core_->orb_params ()->single_read_optimization ()) { @@ -1366,9 +1364,7 @@ read_from_the_connection: if (n <= 0) { if (n == -1) - { - this->send_connection_closed_notifications (); - } + this->send_connection_closed_notifications (); return n; } @@ -1390,19 +1386,8 @@ read_from_the_connection: message_block.length (), ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); - // **** BEGIN CJC PMB CHANGES **** - // - // - Does this properly handle GIOP FRAGMENT messages?? - // @@ BALA: I doubt it.. You could get an incomplete message - // fragment or a complete message fragment but the message could - // incomplete. - - // @@ BALA: How do you handle messages that didnt fit the message - // block here? - // - // @@ CJC: I'm not sure what you mean...If we need more space, then - // we allocate more. +complete_message_and_possibly_enqueue: // Check to see if we're still working to complete a message if (this->uncompleted_message_) { @@ -1475,8 +1460,9 @@ read_from_the_connection: // ==> Resize the message block to have capacity for // the rest of the incoming message ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; - mb.size (mb.size () - + this->uncompleted_message_->missing_data_bytes_); + ACE_CDR::grow (&mb, + mb.size () + + this->uncompleted_message_->missing_data_bytes_); if (TAO_debug_level > 2) { @@ -1506,6 +1492,15 @@ read_from_the_connection: break; case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: + // Here we have an opportunity to try to finish reading the + // uncompleted message. This is a Good Idea(TM) because there are + // good odds that either more data came available since the last + // time we read, or that we simply didn't read the whole message on + // the first read. So, we try to read again. + // + // NOTE! this changes this->uncompleted_message_! + this->try_to_complete (max_wait_time); + // ==> if (bytes missing from uncompleted_message_ == 0) if (this->uncompleted_message_->missing_data_bytes_ == 0) { @@ -1519,8 +1514,10 @@ read_from_the_connection: // ---> NOTE: whoever pulls this off the queue must delete it! this->uncompleted_message_->current_state_ = TAO_Queued_Data::COMPLETED; + // @@CJC NEED TO CHECK RETURN VALUE HERE! this->enqueue_incoming_message (this->uncompleted_message_); + did_queue_message = 1; // zero out uncompleted_message_; this->uncompleted_message_ = 0; @@ -1535,6 +1532,7 @@ read_from_the_connection: } else { + if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, @@ -1561,69 +1559,14 @@ read_from_the_connection: while (message_block.length() != 0 && this->uncompleted_message_); } -#if !defined(LOOP_READ_OPTIMIZATION_LATE) - // If, at the end, we're still waiting to complete the message, - // we should effectively return. - // - // @@BALA: Returning here will reduce the throughput. We shold try - // reading again to see if we could get more data.. + // ***************************** + // @@ CJC // - // @@CJC: Good point; maybe we can go to the top again, but only try - // to read at most 2-3 times before we just go on. Obviously, if we - // complete the message (i.e., there's no uncompleted message), then - // we go on to the next step. - if (this->uncompleted_message_) - { - if (number_of_read_attempts--) - { - // We try to read again just in case more data arrived while - // we were doing the stuff above. This way, we can increase - // throughput without much of a penalty. - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "still have an uncompleted message; " - "will try %d more times before letting " - "somebody else have a chance.\n", - this->id (), - number_of_read_attempts)); - } + // Once upon a time we tried to complete reading the uncompleted + // message here, but testing found that completing later worked + // better. + // ***************************** - // We're about to go back and try to read, but we need to insure - // that there's space available in the message block to read! - // We'll reset the message block... - ACE_ASSERT (message_block.space() == 0); - message_block.reset (); - - // Yes, this uses the much-maligned "goto"; get over it. TCP - // implementations use them, too, for just the same sort of - // situation. - goto read_from_the_connection; - } - else - { - // The queue should be empty because it should have been processed - // above. But I wonder if I should put a check in here anyway. - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "giving up reading for now and returning " - "with incoming queue length = %d\n", - this->id (), - this->incoming_message_queue_.queue_length ())); - if (this->uncompleted_message_) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "missing bytes from uncompleted message = %u\n", - this->uncompleted_message_->missing_data_bytes_)); - } - return 1; - } - } -#endif // At this point, there should be nothing in uncompleted_message_. // We now need to chop up the bytes in message_block and store any @@ -1645,11 +1588,15 @@ read_from_the_connection: message_block.length (), ACE_TEXT (" from this message buffer"))); } + complete_message = TAO_Queued_Data::make_completed_message ( message_block, *this->messaging_object ()); if (complete_message) - this->enqueue_incoming_message (complete_message); + { + this->enqueue_incoming_message (complete_message); + did_queue_message = 1; + } } while (complete_message != 0); // On exit from this frame we have one of the following states: @@ -1674,17 +1621,15 @@ read_from_the_connection: // We should have consumed ALL the bytes by now. ACE_ASSERT (message_block.length () == 0); -#if defined(LOOP_READ_OPTIMIZATION_LATE) - // If, at the end, we're still waiting to complete the message, - // we should effectively return. // - // @@BALA: Returning here will reduce the throughput. We shold try - // reading again to see if we could get more data.. + // We don't want to try to re-read earlier because we may not have + // an uncompleted message until we get to this point. So, if we did + // it earlier, we could have missed the opportunity to complete it + // and dispatch. // - // @@CJC: Good point; maybe we can go to the top again, but only try - // to read at most 2-3 times before we just go on. Obviously, if we - // complete the message (i.e., there's no uncompleted message), then - // we go on to the next step. + // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again + // to increase throughput! + if (this->uncompleted_message_) { if (number_of_read_attempts--) @@ -1704,16 +1649,7 @@ read_from_the_connection: number_of_read_attempts)); } - // We're about to go back and try to read, but we need to insure - // that there's space available in the message block to read! - // We'll reset the message block... - ACE_ASSERT (message_block.space() == 0); - message_block.reset (); - - // Yes, this uses the much-maligned "goto"; get over it. TCP - // implementations use them, too, for just the same sort of - // situation. - goto read_from_the_connection; + goto complete_message_and_possibly_enqueue; } else { @@ -1736,13 +1672,52 @@ read_from_the_connection: return 1; } } -#endif + // **** END CJC PMG CHANGES **** - // Process the message - return this->process_queue_head (rh); + return did_queue_message ? this->process_queue_head (rh) : 1; } + +void +TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) +{ + if (this->uncompleted_message_ == 0) + return; + + ssize_t n = 0; + size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; + ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; + + // Try to complete this until we error or block right here... + for (ssize_t bytes = missing_data; + bytes != 0; + bytes -= n) + { + // .. do a read on the socket again. + n = this->recv (mb.wr_ptr (), + bytes, + max_wait_time); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "read %d bytes on attempt\n", + this->id(), n)); + } + + if (n == 0 || n == -1) + { + break; + } + + mb.wr_ptr (n); + missing_data -= n; + } +} + + int TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) { @@ -1827,490 +1802,6 @@ TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) #endif } -#if 0 -int -TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // Parse the incoming message for validity. The check needs to be - // performed by the messaging objects. - if (this->parse_incoming_messages (block) == -1) - return -1; - - // Check whether we have a complete message for processing - ssize_t missing_data = this->missing_data (block); - - - if (missing_data < 0) - { - // If we have more than one message - return this->consolidate_extra_messages (block, - rh); - } - else if (missing_data > 0) - { - // If we have missing data then try doing a read or try queueing - // them. - return this->consolidate_message (block, - missing_data, - rh, - max_wait_time); - } - - return 1; -} - -int -TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) -{ - // If we have a queue and if the last message is not complete a - // complete one, then this read will get us the remaining data. So - // do not try to parse the header if we have an incomplete message - // in the queue. - if (this->incoming_message_queue_.is_tail_complete () != 0) - { - // As it looks like a new message has been read, process the - // message. Call the messaging object to do the parsing.. - int retval = - this->messaging_object ()->parse_incoming_messages (block); - - if (retval == -1) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " - "error in incoming message\n", - this->id ())); - - this->send_connection_closed_notifications (); - return -1; - } - } - - return 0; -} - - -size_t -TAO_Transport::missing_data (ACE_Message_Block &incoming) -{ - // If we have a incomplete message in the queue then find out how - // much of data is required to get a complete message. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->incoming_message_queue_.missing_data_tail (); - } - - return this->messaging_object ()->missing_data (incoming); -} - - -int -TAO_Transport::consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // Check whether the last message in the queue is complete.. - if (this->incoming_message_queue_.is_tail_complete () == 0) - return this->consolidate_message_queue (incoming, - missing_data, - rh, - max_wait_time); - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message\n", - this->id ())); - } - - // Calculate the actual length of the load that we are supposed to - // read which is equal to the <missing_data> + length of the buffer - // that we have.. - size_t payload = missing_data + incoming.size (); - - // Grow the buffer to the size of the message - ACE_CDR::grow (&incoming, - payload); - - ssize_t n = 0; - - // As this used for transports where things are available in one - // shot this looping should not create any problems. - for (ssize_t bytes = missing_data; - bytes != 0; - bytes -= n) - { - // .. do a read on the socket again. - n = this->recv (incoming.wr_ptr (), - bytes, - max_wait_time); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " - "read %d bytes on attempt\n", - this->id(), n)); - } - - if (n == 0 || n == -1) - { - break; - } - - incoming.wr_ptr (n); - missing_data -= n; - } - - // If we got an error.. - if (n == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Trasport[%d]::consolidate_message, " - "error while trying to consolidate\n", - this->id ())); - } - this->send_connection_closed_notifications (); - return -1; - } - - // If we had gotten a EWOULDBLOCK n would be equal to zero. But we - // have to put the message in the queue anyway. So let us proceed - // to do that and return... - - // Check to see if we have messages in queue or if we have missing - // data . AT this point we cannot have have semi-complete messages - // in the queue as they would have been taken care before. Put - // ourselves in the queue and then try processing one of the - // messages.. - if ((missing_data > 0 - ||this->incoming_message_queue_.queue_length ()) - && this->incoming_message_queue_.is_tail_fragmented () == 0) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " - "queueing up the message\n", - this->id ())); - } - - // Get a queued data - TAO_Queued_Data *qd = - this->make_queued_data (incoming); - - // Add the missing data to the queue - qd->missing_data_ = missing_data; - - // Get the rest of the messaging data - this->messaging_object ()->get_message_data (qd); - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - if (this->incoming_message_queue_.is_head_complete ()) - return this->process_queue_head (rh); - - return 0; - } - - // We dont have any missing data. Just make a queued_data node with - // the existing message block and send it to the higher layers of - // the ORB. - TAO_Queued_Data pqd (&incoming, - this->orb_core_->transport_message_buffer_allocator ()); - pqd.missing_data_ = missing_data; - this->messaging_object ()->get_message_data (&pqd); - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (pqd.more_fragments_ || - (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the queued data as it is on stack.. - TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); - - return this->consolidate_fragments (nqd, rh); - } - - // Now we have a full message in our buffer. Just go ahead and - // process that - return this->process_parsed_messages (&pqd, - rh); -} - -int -TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh) -{ - // If we have received a fragment message then we have to - // consolidate <qd> with the last message in queue - // @@todo: this piece of logic follows GIOP a bit... Need to revisit - // if we have protocols other than GIOP - - // @@todo: Fragments now have too much copying overhead. Need to get - // them out if we want to have some reasonable performance metrics - // in future.. Post 1.2 seems a nice time.. - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - TAO_Queued_Data *tqd = - this->incoming_message_queue_.dequeue_tail (); - - tqd->more_fragments_ = qd->more_fragments_; - tqd->missing_data_ = qd->missing_data_; - - if (this->messaging_object ()->consolidate_fragments (tqd, - qd) == -1) - return -1; - - - TAO_Queued_Data::release (qd); - - this->incoming_message_queue_.enqueue_tail (tqd); - - this->process_queue_head (rh); - } - else - { - // if we dont have a fragment already in the queue just add it in - // the queue - this->incoming_message_queue_.enqueue_tail (qd); - } - - return 0; -} - -int -TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", - this->id ())); - } - - // If the queue did not have a complete message put this piece of - // message in the queue. We kow it did not have a complete - // message. That is why we are here. - size_t n = - this->incoming_message_queue_.copy_tail (incoming); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Transport[%d]::consolidate_message_queue", - "copied [%d] bytes to the tail \n", - this->id (), - n)); - } - - // Update the missing data... - missing_data = - this->incoming_message_queue_.missing_data_tail (); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Transport[%d]::consolidate_message_queue", - "missing [%d] bytes in the tail messahe \n", - this->id (), - missing_data)); - } - - // Move the read pointer of the <incoming> message block to the end - // of the copied message and process the remaining portion... - incoming.rd_ptr (n); - - // If we have some more information left in the message block.. - if (incoming.length ()) - { - // We may have to parse & consolidate. This part of the message - // doesn't seem to be part of the last message in the queue (as - // the copy () hasn't taken away this message). - int retval = this->parse_consolidate_messages (incoming, - rh, - max_wait_time); - - // If there is an error return - if (retval == -1) - { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "error while consolidating, part of the read message\n", - this->id ())); - } - return retval; - } - else if (retval == 1) - { - // If the message in the <incoming> message block has only - // one message left we need to process that seperately. - - // Get a queued data - TAO_Queued_Data *qd = this->make_queued_data (incoming); - - // Get the rest of the message data - this->messaging_object ()->get_message_data (qd); - - // Add the missing data to the queue - qd->missing_data_ = 0; - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (qd->more_fragments_ || - (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - return this->consolidate_fragments (qd, rh); - } - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - // We should surely have a message in queue now. So just - // process that. - return this->process_queue_head (rh); - } - - // parse_consolidate_messages () would have processed one of the - // messages, so we better return as we dont want to starve other - // threads. - return 0; - } - - // If we still have some missing data.. - if (missing_data > 0) - { - // Get the last message from the Queue - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_tail (); - - if (TAO_debug_level > 5) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Transport[%d]::consolidate_message_queue", - " trying recv, again \n", - this->id ())); - - // Try to do a read again. If we have some luck it would be - // great.. - ssize_t n = this->recv (qd->msg_block_->wr_ptr (), - missing_data, - max_wait_time); - - if (TAO_debug_level > 5) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Transport[%d]::consolidate_message_queue", - " recv retval [%d] \n", - this->id (), - n)); - // Error... - if (n < 0) - return n; - - // If we get a EWOULDBLOCK ie. n==0, we should anyway put the - // message in queue before returning.. - // Move the write pointer - qd->msg_block_->wr_ptr (n); - - // Decrement the missing data - qd->missing_data_ -= n; - - // Now put the TAO_Queued_Data back in the queue - this->incoming_message_queue_.enqueue_tail (qd); - - // Any way as we have come this far and are about to return, - // just try to process a message if it is there in the queue. - if (this->incoming_message_queue_.is_head_complete ()) - return this->process_queue_head (rh); - - return 0; - } - - // Process a message in the head of the queue if we have one.. - return this->process_queue_head (rh); -} - - -int -TAO_Transport::consolidate_extra_messages (ACE_Message_Block - &incoming, - TAO_Resume_Handle &rh) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", - this->id ())); - } - - // Pick the tail of the queue - TAO_Queued_Data *tail = - this->incoming_message_queue_.dequeue_tail (); - - if (tail) - { - // If we have a node in the tail, checek to see whether it needs - // consolidation. If so, just consolidate it. - if (this->messaging_object ()->consolidate_node (tail, - incoming) == -1) - return -1; - - // .. put the tail back in queue.. - this->incoming_message_queue_.enqueue_tail (tail); - } - - int retval = 1; - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " - "extracting extra messages\n", - this->id ())); - } - - // Extract messages.. - while (retval == 1) - { - TAO_Queued_Data *q_data = 0; - - retval = - this->messaging_object ()->extract_next_message (incoming, - q_data); - if (q_data) - { - // If we have read a framented message then... - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - this->consolidate_fragments (q_data, rh); - } - else - { - this->incoming_message_queue_.enqueue_tail (q_data); - } - } - } - - // In case of error return.. - if (retval == -1) - { - return retval; - } - - return this->process_queue_head (rh); -} -#endif int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, @@ -2422,59 +1913,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } -#if 0 -TAO_Queued_Data * -TAO_Transport::make_queued_data (ACE_Message_Block &incoming) -{ - // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // Get the flag for the details of the data block... - ACE_Message_Block::Message_Flags flg = - incoming.self_flags (); - - if (ACE_BIT_DISABLED (flg, - ACE_Message_Block::DONT_DELETE)) - { - // Duplicate the data block before putting it in the queue. - qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); - } - else - { - // As we are in CORBA mode, all the data blocks would be aligned - // on an 8 byte boundary. Hence create a data block for more - // than the actual length - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (incoming.length ()+ - ACE_CDR::MAX_ALIGNMENT); - - // Get the allocator.. - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - // Make message block.. - ACE_Message_Block mb (db, - 0, - alloc); - - // Duplicate the block.. - qd->msg_block_ = mb.duplicate (); - - // Align the message block - ACE_CDR::mb_align (qd->msg_block_); - - // Copy the data.. - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - } - - - return qd; -} -#endif - int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { @@ -2484,65 +1922,62 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) "TAO (%P|%t) - Transport[%d]::process_queue_head\n", this->id ())); } + + if (this->incoming_message_queue_.queue_length () == 0) + return 1; - // See if the message in the head of the queue is complete... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); - if (TAO_debug_level > 3) + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); - } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); - - } - int retval = - this->notify_reactor (); + "notify reactor\n", + this->id ())); - if (retval == 1) - { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0) - return -1; } - else + int retval = + this->notify_reactor (); + + if (retval == 1) { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } - - // Process the message... - if (this->process_parsed_messages (qd, - rh) == -1) + else if (retval < 0) return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + // Process the message... + if (this->process_parsed_messages (qd, + rh) == -1) + return -1; - return 0; - } + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - return 1; + return 0; } int diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index dc542568f9b..51bec616bc2 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -555,6 +555,7 @@ public: virtual int handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time = 0, int block = 0); + void try_to_complete (ACE_Time_Value *max_wait_time); enum @@ -649,56 +650,6 @@ protected: */ virtual int register_handler_i (void) = 0; -#if 0 - /// Called by the handle_input_i (). This method is used to parse - /// message read by the handle_input_i () call. It also decides - /// whether the message needs consolidation before processing. - int parse_consolidate_messages (ACE_Message_Block &bl, - TAO_Resume_Handle &rh, - ACE_Time_Value *time = 0); - - - /// Method does parsing of the message if we have a fresh message in - /// the <message_block> or just returns if we have read part of the - /// previously stored message. - int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Return if we have any missing data in the queue of messages - /// or determine if we have more information left out in the - /// presently read message to make it complete. - size_t missing_data (ACE_Message_Block &message_block); - - /// Consolidate the currently read message or consolidate the last - /// message in the queue. The consolidation of the last message in - /// the queue is done by calling consolidate_message_queue (). - virtual int consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// @@Bala: Docu??? - int consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh); - - - - /// First consolidate the message queue. If the message is still not - /// complete, try to read from the handle again to make it - /// complete. If these dont help put the message back in the queue - /// and try to check the queue if we have message to process. (the - /// thread needs to do some work anyway :-)) - int consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// Called by parse_consolidate_message () if we have more messages - /// in one read. Queue up the messages and try to process one of - /// them, atleast at the head of them. - int consolidate_extra_messages (ACE_Message_Block &incoming, - TAO_Resume_Handle &rh); -#endif - /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, |