diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 2003-12-10 20:03:57 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 2003-12-10 20:03:57 +0000 |
commit | fc9c046e4dd9b6f63bc0a1dc7d5251b36c734465 (patch) | |
tree | 3c58d861979da2c37d0a8c221b4740888c57752f | |
parent | c869905174c8b01084f5c7583211a84dc3219327 (diff) | |
download | ATCD-fc9c046e4dd9b6f63bc0a1dc7d5251b36c734465.tar.gz |
Wed Dec 10 13:58:41 2003 Chris Cleeland <cleeland_c@ociweb.com>
PMB changes and minor related changes.
-rw-r--r-- | TAO/ChangeLog | 81 | ||||
-rw-r--r-- | TAO/ChangeLogs/PMBChangeLog | 381 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 353 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 52 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.i | 8 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Generator_Parser_Impl.inl | 23 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.cpp | 57 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.h | 45 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.cpp | 197 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.h | 44 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.inl | 31 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 459 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 162 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 23 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 44 | ||||
-rw-r--r-- | TAO/tao/Strategies/DIOP_Transport.cpp | 229 | ||||
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.h | 4 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 1208 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 104 | ||||
-rw-r--r-- | TAO/tao/default_client.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/orbconf.h | 19 |
23 files changed, 2208 insertions, 1322 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index a3aad3e0413..687635a1f78 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,84 @@ +Wed Dec 10 13:58:41 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/default_client.cpp (parse_args): Corrected erroneous option + name printed when -ORBConnectStrategy option parsing hits an + error. + + * tao/IIOP_Transport.cpp (recv): Corrected method name printed in + debug message. + + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Base.h: + * tao/GIOP_Message_Base.i: + * tao/GIOP_Message_Generator_Parser_Impl.inl: + * tao/GIOP_Message_Lite.cpp: + * tao/GIOP_Message_Lite.h: + * tao/GIOP_Message_State.cpp: + * tao/GIOP_Message_State.h: + * tao/GIOP_Message_State.inl: + * tao/Incoming_Message_Queue.cpp: + * tao/Incoming_Message_Queue.h: + * tao/Incoming_Message_Queue.inl: + * tao/Pluggable_Messaging.h: + * tao/Transport.cpp: + * tao/Transport.h: + * tao/orbconf.h: + * tao/Strategies/DIOP_Transport.cpp + * tao/Strategies/SHMIOP_Transport.cpp + * tao/Strategies/SHMIOP_Transport.h + + + Integrated aggregate fix to infamous Parse Magic Bytes problem. + The fix was originally developed in OCI's repository and + migrated here. An overall description of the problem and + solution follow, and a detailed changelog that follows the + development of this solution is found in + ChangeLogs/PMBChangeLog. + + The old way of handling input naively assumed that a header would + never be broken across multiple read()s from the socket. Back in + the old days, this assumption was okay, because TAO performed a + recv_n() specifically for the header, and thus insured that it + wouldn't get less than a full header. However, when it was + changed to use a single read, that no longer worked. + + Further complicating matters was the incoming message queue, and + the myriad of methods that manipulated elements on the message + queue. + + The PMB changes rework things in the following ways: + + 1. The incoming message queue now holds only completely-read + GIOP messages awaiting processing. Each GIOP message is held in + a heap-allocated ACE_Message_Block. When the message has been + processed, the processing code is responsible for calling + release on the queued message to return the memory to the heap. + + 2. Since GIOP, by definition, assumes an ordered byte stream, + there can be only one partially read, i.e., uncompleted, message; + this is now held explicitly in TAO_Transport::uncompleted_message_ + as a pointer to a heap-allocated ACE_Message_Block of the exact + correct size for the GIOP message. + + 3. States for reading from the transport have been explicitly + defined rather than implied from values of other data members. + + This change also correctly handles correct receipt of GIOP + fragments: + + a. In the case of a GIOP 1.2 fragment, each fragment is a complete + GIOP message. It is, however, unable to be processed until all + fragments have arrived. + + b. Unlike a TCP segment, a GIOP fragment is not stateless; all + GIOP 1.2 fragments for a specific request must be sent in-order. + However, with transport multiplexing, other messages--including + fragments for other requests--may interleave these fragments. + + c. By design, there is no way to know the total number of payload + bytes across all GIOP fragments until the final fragment is + received. + Wed Dec 10 15:12:34 UTC 2003 Johnny Willemsen <jwillemsen@remedy.nl> * tests/Smart_Proxies/Collocation/Smart_Proxy_Impl.cpp: diff --git a/TAO/ChangeLogs/PMBChangeLog b/TAO/ChangeLogs/PMBChangeLog new file mode 100644 index 00000000000..d93c29dae06 --- /dev/null +++ b/TAO/ChangeLogs/PMBChangeLog @@ -0,0 +1,381 @@ +Thu Mar 20 12:34:24 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * all: First merge back to OCI. + +Thu Mar 20 12:17:23 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/Strategies/DIOP_Transport.cpp (handle_input_i): + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp + (handle_input_i): + + Update to use new parsing and message processing methods + consistent with PMB changes. + + * tao/Transport.cpp (process_queue_head): Corrected a possible memory + leak where, if process_parsed_messages returned with an error, + the TAO_Queued_Data instance wouldn't be released. + +Tue Mar 18 14:57:07 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * VERSION: + * ChangeLog: + * tao/tao.mpc: + * tao/Version.h: + * tao/IIOP_Transport.cpp: + * tao/MCAST_Parser.cpp: + * tao/RTCORBA/Linear_Priority_Mapping.cpp: + * tao/RTCORBA/RT_ORBInitializer.cpp: + * tao/RTCORBA/RT_ORBInitializer.h: + * tao/RTCORBA/RT_ORB_Loader.cpp: + * tao/RTCORBA/RT_Protocols_Hooks.cpp: + + Merged in changes from TAO 1.3.1. The actual tag for the merge + was pmb_branch_mainline_mergeout_1. The command was + + $ cvs update -jpmb_branch_start -jpmb_branch_mainline_mergeout_1 + + after manually determining which files needed updates. + +Mon Mar 17 11:09:00 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/GIOP_Message_Base.cpp (check_for_valid_header): Optimized + implementation. + + * tao/GIOP_Message_Base.* (header_length): Moved to inline for + optimization. + + * tao/GIOP_Message_State.cpp (TAO_Debug_Msg_Emitter_Guard): + Improved implementation so it didn't make copies of the message + even when it was never going to output anything due to debug + level being lower than the value that would cause debug output. + + * tao/GIOP_Message_State.cpp (take_values_from_message_block): + * tao/GIOP_Message_State.cpp (set_version_info_from_buffer): + * tao/GIOP_Message_State.cpp (set_byte_order_info_from_buffer): + * tao/GIOP_Message_State.cpp (set_payload_size_from_buffer): + + Removed use of TAO_Debug_Msg_Emitter_Guard. + + * tao/GIOP_Message_State.inl (set_payload_size_from_buffer): + + Moved to inline file. + + * tao/Incoming_Message_Queue.h (TAO_Incoming_Message_Queue): + Changed name of member 'queued_data_' to 'last_added_' to be + more clear about what the pointer actually points to. + + Added documentation that describes the structure of the linked + list so that hopefully future maintainers don't have to go + through the same learning curve I did. + + * tao/Incoming_Message_Queue.*: Updates to reflect last_added_ + member name change. + + * tao/Incoming_Message_Queue.cpp (dequeue_head): Add check to not + allow dequeuing when size is zero, plus we now zero-out the + last_added_ pointer after we've dequeued the last item in the + list. + + * tao/Incoming_Message_Queue.cpp (clone_mb_nocopy_size): Fix + setting and copying of flags on the message and data blocks so + that the DONT_DELETE flag specifically doesn't get copied. This + cured a HUGE memory leak that was causing substantial + performance problems. + + * tao/Incoming_Message_Queue.cpp (make_uncompleted_message): + * tao/Incoming_Message_Queue.cpp (make_completed_message): + + Cache values used over and over again within these methods such + as header and message block lengths + + * tao/Transport.cpp (handle_input_i): + + Added flag noting that we enqueued a message. + + Changed how re-reading is performed; see entry for + try_to_complete for information. + + Used ACE_CDR::grow to grow the uncompleted_message message block + once the payload size is known. This insures that the growth + happens with proper alignment constraints. + + * tao/Transport.* (try_to_complete): Added this method which + tries to complete whatever partial message is held in + uncompleted_message_. The difference between doing this and + simply revisiting the top of handle_input_i is that this will + only try to read enough to complete the message rather than also + read another partial. + + * tao/Transport.cpp (process_queue_head): Added check to insure + that we don't try to do any processing when there's nothing in + the queue. Also, removed old code leftover from when the + incoming queue could have partial GIOP messages in it. Finally, + insured that the return value matched previous requirements. + + +Thu Mar 6 15:27:14 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/Incoming_Message_Queue.cpp: Removed Shattering Encapsulation + hack around ace/Message_Block.h inclusion to make this link + properly on VC6. + + * tao/GIOP_Message_State.cpp: Moved initialization of + TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class + declaration. + +Wed Mar 5 13:08:37 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * tao/GIOP_Message_Generator_Parser_Impl.inl (check_revision): + Modified to only explicitly allow versions of GIOP that TAO + supports, and no others. Note that this change means that + whenever a new version of GIOP gets added to TAO, this function + must be updated, too. Hopefully that won't happen so often that + this becomes horribly burdensome. Thanks to Chad Elliott and + Paul Calabrese for ideas in getting this right. + + * tao/Incoming_Message_Queue.* (make_uncompleted_message): + + This now behaves similar to make_completed_message, i.e., it + allocates the message block held in TAO_Queued_Data and advnaced + the rd_ptr past the bytes it consumes. + + * tao/Incoming_Message_Queue.cpp: Eliminated hacks on + ACE_Message_Block and replaced with functions + clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the + public API. make_completed_message() and + make_uncompleted_message() now use these. + + * tao/Transport.cpp (handle_input_i): Enlarged the buffer size and + replaced the magic number for the maximum number of re-read + attempts with the manifest constant + TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h. + + Changed uses of make_uncompleted_message() to be consistent with + changes outlined above. + + Corrected a bug where, upon re-reading after completely emptying + the stack-allocated message block, I forgot to reset the + rd_ptr/wr_ptr in the message block. + + * tao/orbconf.h: Added new manifest constant + TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file. + +Wed Feb 26 21:44:34 2003 Chris Cleeland <cleeland_c@ociweb.com> + + * TAO/tao/GIOP_Message_Base.cpp: + + Eliminate parse_incoming_messages(), missing_data(), + extract_next_message(), consolidate_node(), + consolidate_fragments(), get_message_data(), make_queued_data(). + + Add new methods set_queued_data_from_message_header() and + check_for_valid_header(). + + * TAO/tao/GIOP_Message_Base.h: + + Eliminate #if0 parse_incoming_messages(), missing_data(), + extract_next_message(), consolidate_node(), + consolidate_fragments(), get_message_data(), and + make_queued_data() declarations. + + Add declarations for set_queued_data_from_message_header() and + check_for_valid_header(). + + Make message_type() static. + + * TAO/tao/GIOP_Message_Generator_Parser_Impl.inl: + + Reimplement check_revision() to be exact about which GIOP + revisions are okay. This is different from before where it + loosely assumed that everything was valid unless it want higher + than a particular range. + + * TAO/tao/GIOP_Message_Lite.cpp: + + Remove parse_incoming_messages() thru consolidate_fragments(); + remove make_queued_data(). + + Add set_queued_data_from_message_header() and + check_for_valid_header(). + + * TAO/tao/GIOP_Message_Lite.h: + + Remove declarations for parse_incoming_messages() thru + consolidate_fragments(); remove make_queued_data(). + + Add declarations for set_queued_data_from_message_header() and + check_for_valid_header(). + + * TAO/tao/GIOP_Message_State.cpp: + + Add TAO_Debug_Message_Emitter_Guard class within the compilation + unit. + + Change CTOR not to use the 2nd argument. + + Change CTOR not to initialize this->base_with the 2nd arg. + + Add take_values_from_message_block(). + + Remove parse_magic_bytes(), parse_message_header_i(), + parse_message_header(). + + Rename get_version_info() to set_version_info_from_buffer(); + change argument type to 'const char*'. + + Rename get_byte_order_info() to set_byte_order_info_from_buffer(); + change argument type to 'const char*'. + + Rename get_payload_size() to set_payload_size_from_buffer(); + change argument type to 'const char*'. + + Change argument type for read_ulong() to 'const char*'. + + * TAO/tao/GIOP_Message_State.h: + + Add default value for CTOR arguments so we don't have to pass + them. + + Add declaration for take_values_from_message_block(). + + #if0 parse_message_header() declaration. + + Add documentation for byte_order(). + + Add new GIOP information accessors: giop_version(), + more_fragments(), and message_type(). + + #if0 declaration for parse_message_header_i() and + parse_magic_bytes(). + + Rename delcarations for get_version_info(), get_byte_order_info(), + and get_payload_size(). + + #if0 base_ member variable. + + * TAO/tao/GIOP_Message_State.inl: + + Add definitions for giop_version(), more_fragments(), and + message_type(). + + * TAO/tao/Incoming_Message_Queue.cpp: + + Add #include for ace/Message_Block.h, but shatter encapsulation + and effectively extend its interface so that we can clone spans of + message blocks. + + Modify copy_tail() to use new TAO_Queued_Data member names. + + * TAO/tao/Incoming_Message_Queue.cpp (class TAO_Queued_Data): + + Update CTORs to initialize new this->current_state_ data member. + + Add new make_uncompleted_message(), ACE_Message_Block hack + "extensions", make_completed_message(). + + Rename get_queued_data() to make_queued_data(). + + * TAO/tao/Incoming_Message_Queue.h: + + Changed #include <Pluggable_Messaging_Utils.h> to #include + <Pluggable_Messaging.h> + + Documentation for is_tail_complete() and is_head_complete() and + is_tail_fragmented(). + + #if0 get_node(). + + * TAO/tao/Incoming_Message_Queue.h (class TAO_Queued_Data): + + Make CTORs protected so creation can only go through new factory + methods, i.e., make_completed_message() and + make_uncompleted_message(). + + Add new factory methods make_completed_message() and + make_uncompleted_message(). + + Add release() method [in addition to static of same name]. + + Add explicit definition of various states and a member to hold the + current state (this->current_state_). + + Rename this->missing_data_ flag/value with + this->missing_data_bytes_ count. + + Documentation for this->byte_order_. + + * TAO/tao/Incoming_Message_Queue.inl: + + Replace uses of this->missing_data_ with + this->missing_data_bytes_. + + #if0 get_node(). + + Add release() method. + + * TAO/tao/Pluggable_Messaging.h: + + #if0 parse_incoming_messages() through consolidate_fragments(). + + Add declarations for check_for_valid_header() and + set_queued_data_from_message_header(). + + * TAO/tao/Transport.cpp: + + Add #include <ace/Min_Max.h> + + Add uncompleted_message_ to initialization list. + + Modify what happens when reading in handle_input_i(): + + - number_of_read_attempts is how many times we'll try to read + again to complete a message before we give up. + + - read_from_the_connection label added + + - correct 1st argument to this->recv() to be wr_ptr() rather than + rd_ptr(). + + - lots of changes that are substantially documented in the code. + + #if0 parse_consolidate_messages(), parse_incoming_messages(), + missing_data(), consolidate_message(), consolidate_fragments(), + consolidate_message_queue(), consolidate_extra_messages(), and + make_queued_data(). + + * TAO/tao/Transport.h: + + #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(), + missing_data(), consolidate_message(), consolidate_fragments(), + consolidate_message_queue(), consolidate_extra_messages(), and + make_queued_data(). + + Add this->uncompleted_message_ data member. + + * TAO/tao/Strategies/DIOP_Transport.cpp: + + Change 1st arg of this->recv() in handle_input_i() to wr_ptr() + from rd_ptr(). + + * TAO/tao/Strategies/SHMIOP_Transport.cpp: + + #if0 consolidate_message(). + + * TAO/tao/Strategies/SHMIOP_Transport.h: + + #if0 declaration for consolidate_message(). + + * TAO/tests/AMI/run_test.pl: + + Increase default number of iterations from 1 to 10000. Earlier + versions of the PMB changes succeeded just fine with low numbers + of iterations, but began to fail miserably when the number of + iterations climbed. + + * TAO/tests/BiDirectional/run_test.pl: + + Added level 10 debug. + + * TAO/tests/InterOp-Naming/INS_test_client.cpp: + + Changes that don't appear to be related in any way to PMB... diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 76aeeaee51e..7ada6c903ee 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, - this) + this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, @@ -295,6 +295,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) + const { // Convert to the right type of Pluggable Messaging message type. @@ -329,264 +330,6 @@ TAO_GIOP_Message_Base::message_type ( } int -TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) -{ - - if (this->message_state_.parse_message_header (incoming) == -1) - { - return -1; - } - - return 0; -} - -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) -{ - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return -1; - } - else if (len == msg_size) - return 0; - - return msg_size - len; -} - - -int -TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) -{ - TAO_GIOP_Message_State state (this->orb_core_, - this); - - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - if (incoming.length () > 0) - { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - qd->missing_data_ = -1; - } - return 0; - } - - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - size_t copying_len = state.message_size (); - - qd = this->make_queued_data (copying_len); - - if (copying_len > incoming.length ()) - { - qd->missing_data_ = - copying_len - incoming.length (); - - copying_len = incoming.length (); - } - - qd->msg_block_->copy (incoming.rd_ptr (), - copying_len); - - incoming.rd_ptr (copying_len); - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - return 1; -} - -int -TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) -{ - // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) - { - // The data length that has been stuck in there during the last - // read .... - size_t len = - qd->msg_block_->length (); - - // We know that we would have space for - // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data - // from the <incoming> into the message block in <qd> - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); - - // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); - - TAO_GIOP_Message_State state (this->orb_core_, - this); - - // Parse the message header now... - if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) - { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); - } - - // Copy the pay load.. - // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.payload_size (); - - // If the data that needs to be copied is more than that is - // available to us .. - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - else - { - qd->missing_data_ = 0; - } - - // ..now we are set to copy the right amount of data to the - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - incoming.rd_ptr (copy_len); - - // Get the other details... - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - } - else - { - // @@todo: Need to abstract this out to a seperate method... - size_t copy_len = qd->missing_data_; - - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - - // Copy the right amount of data in to the node... - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - qd->msg_block_->rd_ptr (copy_len); - - } - - - return 0; -} - - -int -TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) -{ - if (dqd->byte_order_ != sqd->byte_order_ - || dqd->major_version_ != sqd->major_version_ - || dqd->minor_version_ != sqd->minor_version_) - { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:") - ACE_TEXT ("different GIOP versions or byte order\n"))); - return -1; - } - - // Skip the header in the incoming message - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // If we have a fragment header skip the header length too.. - if (sqd->minor_version_ == 2 && - sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - - // Get the length of the incoming message block.. - size_t incoming_length = - sqd->msg_block_->length (); - - // Increase the size of the destination message block if we need - // to. - ACE_Message_Block *mb = - dqd->msg_block_; - - // Check space before growing. - if (mb->space () < incoming_length) - { - ACE_CDR::grow (mb, - mb->length () + incoming_length); - } - - // Copy the data - dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), - incoming_length); - return 0; -} - -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - qd->byte_order_ = - this->message_state_.byte_order_; - qd->major_version_ = - this->message_state_.giop_version_.major; - qd->minor_version_ = - this->message_state_.giop_version_.minor; - - //qd->more_fragments_ = this->message_state_.more_fragments_; - - if (this->message_state_.more_fragments_) - qd->more_fragments_ = 1; - else - qd->more_fragments_ = 0; - - qd->msg_type_= - this->message_type (this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} - -int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -764,13 +507,13 @@ TAO_GIOP_Message_Base::process_reply_message ( // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, - params); + params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, - params); + params); break; default: retval = -1; @@ -898,7 +641,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, parse_error = parser->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1098,9 +843,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), - locate_request.object_key ().length (), - locate_request.object_key ().get_buffer (), - 0); + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); // Set it to an error state parse_error = 1; @@ -1324,6 +1069,7 @@ TAO_GIOP_Message_Base::set_state ( } +#if 0 // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. @@ -1419,7 +1165,7 @@ TAO_GIOP_Message_Base:: transport-> id ())); } - +#endif int TAO_GIOP_Message_Base::send_reply_exception ( @@ -1576,44 +1322,49 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -TAO_Queued_Data * -TAO_GIOP_Message_Base::make_queued_data (size_t sz) +void +TAO_GIOP_Message_Base::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const { - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // @@todo: We have a similar method in Transport.cpp. Need to see how - // we can factor them out.. - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - + // @@CJC: Try leaving out the declaration for this->message_state_ + // and see what pukes. I don't think we need it any more. + TAO_GIOP_Message_State state; + if (state.take_values_from_message_block (mb) == -1) + { + // what the heck do we do here?! + qd->current_state_ = TAO_Queued_Data::INVALID; + return; + } - return qd; + // It'd be nice to have an abstract base for GIOP_Message_State + // so that there could just be a line like: + // qd->take_values_from (state); + // Get the message information + qd->byte_order_ = state.byte_order (); + qd->major_version_ = state.giop_version ().major; + qd->minor_version_ = state.giop_version ().minor; + qd->more_fragments_ = state.more_fragments () ? 1 : 0; + qd->request_id_ = state.request_id_; + qd->msg_type_= message_type (state); + qd->missing_data_bytes_ = state.payload_size (); } -size_t -TAO_GIOP_Message_Base::header_length (void) const +int +TAO_GIOP_Message_Base::check_for_valid_header ( + const ACE_Message_Block &mb + ) const { - return TAO_GIOP_MESSAGE_HEADER_LEN; + // NOTE! We don't hardcode the length of the header b/c header_length should + // be eligible for inlining by pretty much any compiler, and it should return + // a constant. The rest of this method is hard-coded and hand-optimized because + // this method gets called A LOT. + if (mb.length () < this->header_length ()) + return -1; + + // Is finding that it's the right length and the magic bytes present + // enough to declare it a valid header? I think so... + register const char* h = mb.rd_ptr (); + return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0; } diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 461673bd359..7bf61e340f7 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -94,35 +94,37 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd); - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. If \code mb does not + contain less than \code header_length() bytes, this method cannot make a + complete evaluation, and returns a commensurate value. - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); + \return 1 \code header_length() bytes found, and constitute a valid header + \return 0 \code header_length() bytes found, and do not constitute a valid header + \return -1 not enough bytes available to make a determination of header validity + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd); + /*! + \brief Set fields in \param qd based on values derived from \param mb. - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; - /// Process the request message that we have received on the - /// connection - virtual int process_request_message (TAO_Transport *transport, - TAO_Queued_Data *qd); /// Parse the reply message that we received and return the reply @@ -174,7 +176,7 @@ protected: /// TAO_PLUGGABLE_MESSAGE_REPLY, /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); + TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state) const; private: @@ -197,10 +199,12 @@ private: /// Send error messages int send_error (TAO_Transport *transport); +#if 0 /// Close a connection, first sending GIOP::CloseConnection. void send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport, void *ctx); +#endif /// We must send a LocateReply through <transport>, this request /// resulted in some kind of exception. diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index a589447a413..f5e39d9aa54 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -4,3 +4,11 @@ // // GIOP_Message_Base // + + +ACE_INLINE size_t +TAO_GIOP_Message_Base::header_length (void) const +{ + return TAO_GIOP_MESSAGE_HEADER_LEN; +} + diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl index 18bb7936ffd..47e4730befb 100644 --- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl +++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl @@ -5,9 +5,24 @@ TAO_GIOP_Message_Generator_Parser_Impl:: check_revision (CORBA::Octet incoming_major, CORBA::Octet incoming_minor) { - if (incoming_major > TAO_DEF_GIOP_MAJOR || - incoming_minor > TAO_DEF_GIOP_MINOR) + CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor; + CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR; + + CORBA::Boolean ret = 0; + + // If it's greater than the max, we know it's not allowed. + if (version_as_whole_num > max_allowable_version) return 0; - - return 1; + + // If it's less than the max, though, we still have to check for + // each explicit version and only allow the ones we know work. + switch (version_as_whole_num) + { + case 0x0100: + case 0x0101: + case 0x0102: + ret = 1; + } + + return ret; } diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 462266dd9bf..3ff0683729a 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -243,6 +243,7 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream) } +#if 0 int TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block) { @@ -501,6 +502,7 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/, // We dont know what fragments are??? return -1; } +#endif int TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, @@ -728,7 +730,9 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, parse_error = this->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1626,38 +1630,6 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label, } } -TAO_Queued_Data * -TAO_GIOP_Message_Lite::make_queued_data (size_t sz) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data (); - - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - - return qd; -} - int TAO_GIOP_Message_Lite::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, @@ -1679,3 +1651,22 @@ TAO_GIOP_Message_Lite::header_length (void) const { return TAO_GIOP_LITE_HEADER_LEN; } + +void +TAO_GIOP_Message_Lite::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (qd); + ACE_UNUSED_ARG (mb); +} + +int +TAO_GIOP_Message_Lite::check_for_valid_header ( + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (mb); + return 0; +} diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index b10ef17982c..bfc7f5f01ad 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -88,8 +88,11 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd); + /// Get the message type. The return value would be one of the /// following: @@ -99,33 +102,25 @@ public: /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. TAO_Pluggable_Message_Type message_type (void); + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); - - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); - - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd); + /*! + \brief Set fields in \param qd based on values derived from \param mb. - /// @@Bala: Docu??? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); - - /// Process the request message that we have received on the - /// connection - virtual int process_request_message (TAO_Transport *transport, - TAO_Queued_Data *qd); + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; /// Parse the reply message that we received and return the reply /// information though <reply_info> diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index e3a3ca20bf3..1d923ecb45f 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -10,15 +10,54 @@ # include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID (tao, - GIOP_Message_State, - "$Id$") + +class TAO_Debug_Msg_Emitter_Guard +{ +public: + TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg) + : which_level_(debug_level) + { + if (TAO_debug_level < this->which_level_) + { + msg_ = 0; + return; + } + this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ]; + ACE_OS::strcpy (this->msg_, msg); + ACE_OS::strcat (this->msg_, " begin\n"); + if (TAO_debug_level >= this->which_level_) + ACE_DEBUG ((LM_DEBUG, this->msg_ )); + } + + ~TAO_Debug_Msg_Emitter_Guard () + { + if (this->msg_) + { + if (TAO_debug_level >= this->which_level_) + { + char* begin_start = + this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; + ACE_OS::strcpy (begin_start, " end\n"); + ACE_DEBUG ((LM_DEBUG, this->msg_)); + } + delete[] this->msg_; + } + } + +private: + static const int MAGIC_LENGTH; + unsigned int which_level_; + char* msg_; +}; + +const int TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH = 8; // " begin\n" + \000 + +ACE_RCSID(tao, GIOP_Message_State, "$Id$") TAO_GIOP_Message_State::TAO_GIOP_Message_State ( TAO_ORB_Core * /*orb_core*/, - TAO_GIOP_Message_Base *base) - : base_ (base), - giop_version_ (TAO_DEF_GIOP_MAJOR, + TAO_GIOP_Message_Base * /*base*/) + : giop_version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), byte_order_ (0), message_type_ (0), @@ -29,125 +68,64 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State ( { } - +// This doesn't check the message block's length, so that means that +// the *caller* needs to do that first. int -TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) +TAO_GIOP_Message_State::take_values_from_message_block ( + const ACE_Message_Block& mb + ) { - if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN) - { - // Parse the GIOP header - if (this->parse_message_header_i (incoming) == -1) - return -1; - } - - return 0; -} - -int -TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) -{ - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n" - )); - } - - // Grab the rd_ptr_ from the message block.. - char *buf = incoming.rd_ptr (); - - // Parse the magic bytes first - if (this->parse_magic_bytes (buf) == -1) - { - return -1; - } + const char* buf = mb.rd_ptr (); // Get the version information - if (this->get_version_info (buf) == -1) + if (this->set_version_info_from_buffer (buf) == -1) return -1; // Get the byte order information... - if (this->get_byte_order_info (buf) == -1) + if (this->set_byte_order_info_from_buffer (buf) == -1) return -1; // Get the message type this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - // Get the size of the message.. - this->get_payload_size (buf); + this->set_payload_size_from_buffer (buf); + + // Get the request id + this->parse_fragment_header (buf, mb.length ()); if (this->message_size_ == 0) { - if (this->message_type_ == TAO_GIOP_MESSAGERROR) + const char* msgname = 0; + switch (this->message_type_) + { + case TAO_GIOP_MESSAGERROR: + msgname = "GIOP_MESSAGE_ERROR"; break; + case TAO_GIOP_CLOSECONNECTION: + msgname = "GIOP_CLOSE_CONNECTION"; break; + } + if (msgname != 0) { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) -" - "GIOP_MESSAGE_ERROR received \n")); - } - return 0; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: %s rcv'd.\n", + msgname)); } else { if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - " - "Message of size zero recd. \n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: Message of size zero rcv'd.\n")); return -1; } } - - if (this->more_fragments_) - { - (void) this->parse_fragment_header (buf, - incoming.length ()); - } - return 0; } - - - -int -TAO_GIOP_Message_State::parse_magic_bytes (char *buf) -{ - // The values are hard-coded to support non-ASCII platforms. - if (!(buf [0] == 0x47 // 'G' - && buf [1] == 0x49 // 'I' - && buf [2] == 0x4f // 'O' - && buf [3] == 0x50)) // 'P' - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ") - ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), - buf[0], - buf[1], - buf[2], - buf[3])); - return -1; - } - - return 0; -} - int -TAO_GIOP_Message_State::get_version_info (char *buf) +TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_version_info\n")); - } - // We have a GIOP message on hand. Get its revision numbers - CORBA::Octet incoming_major = - buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; - CORBA::Octet incoming_minor = - buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( @@ -157,7 +135,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"), + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:") + ACE_TEXT ("bad version <%d.%d>\n"), incoming_major, incoming_minor)); } @@ -172,15 +152,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf) } int -TAO_GIOP_Message_State::get_byte_order_info (char *buf) +TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n")); - } - - // Let us be specific that this is for 1.0 + // Let us be specific that this is for 1.0 if (this->giop_version_.minor == 0 && this->giop_version_.major == 1) { @@ -224,19 +198,10 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf) return 0; } -void -TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) -{ - // Move the read pointer - rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - - this->message_size_ = this->read_ulong (rd_ptr); -} - int -TAO_GIOP_Message_State::parse_fragment_header (char *buf, +TAO_GIOP_Message_State::parse_fragment_header (const char *buf, size_t length) { size_t len = @@ -246,9 +211,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf, // By this point we are doubly sure that we have a more or less // valid GIOP message with a valid major revision number. - if (this->giop_version_.minor == 2 && - this->message_type_ == TAO_GIOP_FRAGMENT && - length > len) + if (this->giop_version_.minor >= 2 && length > len) { // Fragmented message in GIOP 1.2 should have a fragment header // following the GIOP header. Grab the rd_ptr to get that @@ -263,7 +226,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf, } CORBA::ULong -TAO_GIOP_Message_State::read_ulong (char *rd_ptr) +TAO_GIOP_Message_State::read_ulong (const char *rd_ptr) { CORBA::ULong x = 0; diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index f902fa03a0e..9ae3db82230 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -41,11 +41,10 @@ class TAO_Export TAO_GIOP_Message_State public: /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, - TAO_GIOP_Message_Base *base); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0, + TAO_GIOP_Message_Base *base = 0); - /// Parse the message header. - int parse_message_header (ACE_Message_Block &incoming); + int take_values_from_message_block (const ACE_Message_Block& mb); /// Return the message size CORBA::ULong message_size (void) const; @@ -53,9 +52,24 @@ public: /// Return the message size CORBA::ULong payload_size (void) const; - /// Return the byte order information + /*! + \brief Return the byte order information. + \return 0 big-endian + \return 1 little-endian + */ CORBA::Octet byte_order (void) const; + /*! + \brief Return GIOP version information. + */ + const TAO_GIOP_Message_Version &giop_version () const; + + /// (Requests and Replys) + CORBA::Octet more_fragments () const; + + /// MsgType above + CORBA::Octet message_type () const; + /// Reset the state.. void reset (void); @@ -63,40 +77,30 @@ private: friend class TAO_GIOP_Message_Base; - /// Parse the message header. - int parse_message_header_i (ACE_Message_Block &incoming); - - /// Checks for the magic word 'GIOP' in the start of the incoing - /// stream - int parse_magic_bytes (char *buf); - /// Extracts the version information from the incoming /// stream. Performs a check for whether the version information is /// right and sets the information in the <state> - int get_version_info (char *buf); + int set_version_info_from_buffer (const char *buf); /// Extracts the byte order information from the incoming /// stream. Performs a check for whether the byte order information /// right and sets the information in the <state> - int get_byte_order_info (char *buf); + int set_byte_order_info_from_buffer (const char *buf); /// Gets the size of the payload and set the size in the <state> - void get_payload_size (char *buf); + void set_payload_size_from_buffer (const char *buf); /// Parses the GIOP FRAGMENT_HEADER information from the incoming /// stream. - int parse_fragment_header (char *buf, + int parse_fragment_header (const char *buf, size_t length); /// Read the unsigned long from the buffer. The <buf> should just /// point to the next 4 bytes data that represent the ULong - CORBA::ULong read_ulong (char *buf); + CORBA::ULong read_ulong (const char *buf); private: - /// The GIOP base class.. - TAO_GIOP_Message_Base *base_; - // GIOP version information.. TAO_GIOP_Message_Version giop_version_; diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index fe076bee689..80d421c7340 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -33,22 +33,29 @@ TAO_GIOP_Message_State::reset (void) this->missing_data_ = 0; } -#if 0 -ACE_INLINE int -TAO_GIOP_Message_State::message_fragmented (void) +ACE_INLINE const TAO_GIOP_Message_Version & +TAO_GIOP_Message_State::giop_version () const { - if (this->more_fragments) - return 1; - - return 0; + return this->giop_version_; } +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::more_fragments () const +{ + return this->more_fragments_; +} - -ACE_INLINE CORBA::Boolean -TAO_GIOP_Message_State::header_received (void) const +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::message_type () const { - return this->message_size != 0; + return this->message_type_; } -#endif +ACE_INLINE void +TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; + + this->message_size_ = this->read_ulong (rd_ptr); +} diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 1d802aa25c7..25b16308ff1 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -95,7 +95,7 @@ TAO_IIOP_Transport::recv (char *buf, { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv_i, ") + ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ") ACE_TEXT ("read failure - %m\n"), this->id ())); } diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 14970106b50..a510be13f4b 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,7 +1,8 @@ #include "Incoming_Message_Queue.h" +#include "Pluggable_Messaging.h" #include "debug.h" - -#include "ace/Log_Msg.h" +#include "ace/Malloc_T.h" +#include "ace/Message_Block.h" #if !defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" @@ -12,7 +13,7 @@ ACE_RCSID (tao, "$Id$") TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) - : queued_data_ (0), + : last_added_ (0), size_ (0), orb_core_ (orb_core) { @@ -42,21 +43,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) { // Check to see if the length of the incoming block is less than // that of the <missing_data_> of the tail. - if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) + if (block.length () <= this->last_added_->missing_data_bytes_) { n = block.length (); } else { - n = this->queued_data_->missing_data_; + n = this->last_added_->missing_data_bytes_; } // Do the copy - this->queued_data_->msg_block_->copy (block.rd_ptr (), + this->last_added_->msg_block_->copy (block.rd_ptr (), n); // Decerement the missing data - this->queued_data_->missing_data_ -= n; + this->last_added_->missing_data_bytes_ -= n; } return n; @@ -65,17 +66,20 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) TAO_Queued_Data * TAO_Incoming_Message_Queue::dequeue_head (void) { + if (this->size_ == 0) + return 0; + // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = this->last_added_->next_; // Reset the head node.. - this->queued_data_->next_ = tmp->next_; - - // Decrease the size - --this->size_; + this->last_added_->next_ = head->next_; + + // Decrease the size and reset last_added_ if empty + if (--this->size_ == 0) + this->last_added_ = 0; - return tmp; + return head; } TAO_Queued_Data * @@ -86,95 +90,412 @@ TAO_Incoming_Message_Queue::dequeue_tail (void) return 0; // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = + this->last_added_->next_; - while (tmp->next_ != this->queued_data_) + while (head->next_ != this->last_added_) { - tmp = tmp->next_; + head = head->next_; } // Put the head in tmp. - tmp->next_ = this->queued_data_->next_; + head->next_ = this->last_added_->next_; - TAO_Queued_Data *ret_qd = this->queued_data_; + TAO_Queued_Data *ret_qd = this->last_added_; - this->queued_data_ = tmp; + this->last_added_ = head; // Decrease the size - --this->size_; + if (--this->size_ == 0) + this->last_added_ = 0; return ret_qd; } - int TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) { if (this->size_ == 0) { - this->queued_data_ = nd; - this->queued_data_->next_ = this->queued_data_; + this->last_added_ = nd; + this->last_added_->next_ = this->last_added_; } else { - nd->next_ = this->queued_data_->next_; - this->queued_data_->next_ = nd; - this->queued_data_ = nd; + nd->next_ = this->last_added_->next_; + this->last_added_->next_ = nd; + this->last_added_ = nd; } ++ this->size_; return 0; } +TAO_Queued_Data * +TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major, + CORBA::Octet minor) const +{ + TAO_Queued_Data *found = 0; + if (this->last_added_ != 0) + { + TAO_Queued_Data *qd = this->last_added_->next_; + + do { + if (qd->more_fragments_ && + qd->major_version_ == major && qd->minor_version_ == minor) + { + found = qd; + } + else + { + qd = qd->next_; + } + } while (found == 0 && qd != this->last_added_->next_); + } + + return found; +} + +TAO_Queued_Data * +TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const +{ + TAO_Queued_Data *found = 0; + if (this->last_added_ != 0) + { + TAO_Queued_Data *qd = this->last_added_->next_; + + do { + if (qd->more_fragments_ && qd->request_id_ == request_id) + { + found = qd; + } + else + { + qd = qd->next_; + } + } while (found == 0 && qd != this->last_added_->next_); + } + + return found; +} + /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) - : msg_block_ (0), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (0) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , request_id_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc) - : msg_block_ (mb), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (mb) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , request_id_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) - : msg_block_ (qd.msg_block_->duplicate ()), - missing_data_ (qd.missing_data_), - byte_order_ (qd.byte_order_), - major_version_ (qd.major_version_), - minor_version_ (qd.minor_version_), - more_fragments_ (qd.more_fragments_), - msg_type_ (qd.msg_type_), - next_ (0), - allocator_ (qd.allocator_) + : msg_block_ (qd.msg_block_->duplicate ()) + , current_state_ (qd.current_state_) + , missing_data_bytes_ (qd.missing_data_bytes_) + , byte_order_ (qd.byte_order_) + , major_version_ (qd.major_version_) + , minor_version_ (qd.minor_version_) + , more_fragments_ (qd.more_fragments_) + , request_id_ (qd.request_id_) + , msg_type_ (qd.msg_type_) + , next_ (0) + , allocator_ (qd.allocator_) { } + +/*! + \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb. + + This function allocates a new aligned message block using the same + allocators and flags as found in \a mb. The size of the new message + block is at least \a new_size; the size may be adjusted up in order + to accomodate alignment requirements and still fit \a new_size bytes + into the aligned buffer. + + \param mb message block whose parameters should be mimicked + \param new_size size of the new message block (will be adjusted for proper alignment) + \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure + + \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block! + */ +static ACE_Message_Block* +clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) +{ + // Calculate the required size of the cloned block with alignment + size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); + + // Get the allocators + ACE_Allocator *data_allocator; + ACE_Allocator *data_block_allocator; + ACE_Allocator *message_block_allocator; + mb->access_allocators (data_allocator, + data_block_allocator, + message_block_allocator); + + // Create a new Message Block + ACE_Message_Block *nb; + ACE_NEW_MALLOC_RETURN (nb, + ACE_static_cast(ACE_Message_Block*, + message_block_allocator->malloc ( + sizeof (ACE_Message_Block))), + ACE_Message_Block(aligned_size, + mb->msg_type(), + mb->cont(), + 0, //we want the data block created + data_allocator, + mb->locking_strategy(), + mb->msg_priority(), + mb->msg_execution_time (), + mb->msg_deadline_time (), + data_block_allocator, + message_block_allocator), + 0); + + ACE_CDR::mb_align (nb); + + // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since + // we just dynamically allocated the two things. + nb->set_flags (mb->flags()); + nb->clr_flags (ACE_Message_Block::DONT_DELETE); + + return nb; +} + +/*! + \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes. + + (This is similar to memcpy, although with message blocks we can be a + little smarter.) This function assumes that \a dst has enough space + for \a span_size bytes, and that \a src has at least \a span_size + bytes available to copy. When everything is copied \a dst->wr_ptr + gets updated accordingly, but \a src->rd_ptr is left to the caller + to update. + + \param dst the destination message block + \param src the source message block + \param span_size size of the maximum span of bytes to be copied + \return 0 on failure, otherwise \a dst + */ +static ACE_Message_Block* +copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size) +{ + // @todo check for enough space in dst, and src contains at least span_size + + if (src == 0 || dst == 0) + return 0; + + if (span_size == 0) + return dst; + + dst->copy (src->rd_ptr (), span_size); + return dst; +} + /*static*/ TAO_Queued_Data * -TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) +TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + register TAO_Queued_Data *new_qd = 0; + register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */ + register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */ + + // Validate arguments. + if (mb == 0) + goto failure; + + new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // do we have enough bytes to make a complete header? + if (MB_LEN >= HDR_LEN) + { + // Since we have enough bytes to make a complete header, + // the header needs to be valid. Check that now, and punt + // if it's not valid. + if (! msging_obj.check_for_valid_header (*mb)) + { + goto failure; + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + msging_obj.set_queued_data_from_message_header (new_qd, *mb); + if (new_qd->current_state_ == INVALID) + goto failure; + + // missing_data_bytes_ now has the full GIOP message size, so we allocate + // a new message block of that size, plus the header. + new_qd->msg_block_ = clone_mb_nocopy_size (mb, + new_qd->missing_data_bytes_ + + HDR_LEN); + // Of course, we don't have the whole message (if we did, we + // wouldn't be here!), so we copy only what we've got, i.e., whatever's + // in the message block. + if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) + goto failure; + + // missing_data_bytes_ now has the full GIOP message size, but + // there might still be stuff in mb. Therefore, we have to adjust + // missing_data_bytes_, i.e., decrease it by the number of "actual + // payload bytes" in mb. + // + // "actual payload bytes" :== length of mb (which included the header) - header length + new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN); + mb->rd_ptr (MB_LEN); + } + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER; + new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN); + if (new_qd->msg_block_ == 0 || + copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) + goto failure; + new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN; + mb->rd_ptr (MB_LEN); + } + + ACE_ASSERT (new_qd->current_state_ != INVALID); + if (TAO_debug_level > 7) + { + const char* s = "?unk?"; + switch (new_qd->current_state_) + { + case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break; + case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break; + case INVALID: s = "INVALID"; break; + case COMPLETED: s = "COMPLETED"; break; + } + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:") + ACE_TEXT ("state=%s,missing_data_bytes=%u\n"), + new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_)); + } + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"), + mb, new_qd)); + } + TAO_Queued_Data::release (new_qd); + return 0; +} + + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + register const size_t HDR_LEN = msging_obj.header_length (); + register const size_t MB_LEN = mb.length (); + + // Validate arguments. + if (MB_LEN < HDR_LEN) + return 0; + + size_t total_msg_len = 0; + register TAO_Queued_Data *new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // We can assume that there are enough bytes for a header, so + // extract the header data. Don't assume that there's enough for + // the payload just yet. + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + msging_obj.set_queued_data_from_message_header (new_qd, mb); + if (new_qd->current_state_ == INVALID) + goto failure; + + // new_qd_->missing_data_bytes_ + protocol header length should be + // *at least* the length of the message. Verify that we have that + // many bytes in the message block and, if we don't, release the new + // qd and fail. + total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN; + if (total_msg_len > MB_LEN) + goto failure; + + // Make a copy of the relevant portion of mb and hang on to it + if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0) + goto failure; + + if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0) + goto failure; + + // Update missing data and the current state + new_qd->missing_data_bytes_ = 0; + new_qd->current_state_ = COMPLETED; + + // Advance the rd_ptr on the message block + mb.rd_ptr (total_msg_len); + + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"), + total_msg_len, &mb, new_qd)); + } + + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"), + &mb, MB_LEN)); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + mb.rd_ptr (), MB_LEN, + ACE_TEXT (" residual bytes in buffer"))); + + } + TAO_Queued_Data::release (new_qd); + return 0; +} + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc) { TAO_Queued_Data *qd = 0; @@ -281,3 +602,29 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd) return qd; } + +void +TAO_Queued_Data::consolidate (void) +{ + // Is this a chain of fragments? + if (this->more_fragments_ && this->msg_block_->cont () != 0) + { + // Create a message block big enough to hold the entire chain + ACE_Message_Block *dest = clone_mb_nocopy_size ( + this->msg_block_, + this->msg_block_->total_length ()); + // Reset the cont() parameter + dest->cont (0); + + // Use ACE_CDR to consolidate the chain for us + ACE_CDR::consolidate (dest, this->msg_block_); + + // free the original message block chain + this->msg_block_->release (); + + // Set the message block to the new consolidated message block + this->msg_block_ = dest; + this->more_fragments_ = 0; + } +} + diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 3adfc3d24ac..660a207090b 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -27,6 +27,7 @@ class ACE_Allocator; class TAO_ORB_Core; class TAO_Queued_Data; class TAO_Transport; +class TAO_Pluggable_Messaging; /** * @class TAO_Incoming_Message_Queue @@ -75,31 +76,68 @@ public: /// Return the length of the queue.. CORBA::ULong queue_length (void); - /// Methods for sanity check. Checks to see whether the node on the - /// head or tail is complete or not and ready for further - /// processing. + /*! + @name Node Inspection Predicates + + \brief These methods allow inspection of head and tail nodes for "completeness". + + These methods check to see whether the node on the head or tail is + "complete" and ready for further processing. See each method's + documentation for its definition of "complete". + */ + //@{ + /*! + "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though) + + \return -1 queue is empty + \return 0 tail is not "complete" + \return 1 tail is "complete" + */ int is_tail_complete (void); + + /*! + + "complete" == the GIOP message at the head is not missing any data + AND, if it's the first message in a series of GIOP fragments, all + the fragments have been received, parsed, and placed into the + queue + + \return -1 if queue is empty + \return 0 if head is not "complete" + \return 1 if head is "complete" + */ int is_head_complete (void); + //@} - /// This method checks whether the last message that was queued up - /// was fragmented... + /*! + \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment. + */ int is_tail_fragmented (void); /// Return the size of data that is missing in tail of the queue. size_t missing_data_tail (void) const; /// void missing_data (size_t data); + /// Find the first fragment that matches the GIOP version + TAO_Queued_Data *find_fragment (CORBA::Octet major, + CORBA::Octet minor) const; + + /// Find the first fragment that matches the request id + TAO_Queued_Data *find_fragment (CORBA::ULong request_id) const; + private: friend class TAO_Transport; - /// Make a node for the queue. - TAO_Queued_Data *get_node (void); - private: + /*! + \brief A circular linked list of messages awaiting processing. - /// A linked listof messages that await processing - TAO_Queued_Data *queued_data_; + \a last_message_added_ points to the most recent message added to + the list. The earliest addition can be easily accessed via + \a last_message_added_->next_. + */ + TAO_Queued_Data *last_added_; /// The size of the queue CORBA::ULong size_; @@ -123,20 +161,73 @@ private: class TAO_Export TAO_Queued_Data { -public: +protected: /// Default Constructor TAO_Queued_Data (ACE_Allocator *alloc = 0); /// Constructor. TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); +public: /// Copy constructor. TAO_Queued_Data (const TAO_Queued_Data &qd); - /// Creation and deletion of a node in the queue. - static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0); + /*! + \name Factory Methods + + These methods manufacture instances of TAO_Queued_Data and return + them. These instances should be removed via TAO_Queued_Data::release. + + Instances are initialized from data in the ACE_Message_Block, + interpreted according to rules defined in the + TAO_Pluggable_Messaging object. + + The manufactured instance adopts the message block \em without + duplicating it; therefore, the caller must duplicate or orphan the + message block. The caller also must insure that the message block + can be released via ACE_Message_Block::release, and that its life + endures beyond the calling scope. + + For the purposes of TAO_Queued_Data, a completed message is a + completely received message as defined by the messaging protocol + object. For GIOP, that means that the number of bytes specified + in the general GIOP header have been completely received. It + specifically DOES NOT mean that all \em fragments have been + received. Fragment reassembly is another matter altogether. + */ + //@{ + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message. + */ + static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message. + */ + // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! + // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN + // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS + // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT + // BEHAVES DIFFERENTLY FROM make_uncompleted_message. + static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + + /// Consolidate this fragments chained message blocks into one. + void consolidate (void); + + /*! + \brief Creation and deletion of a node in the queue. + \todo Maybe this should be private? + */ +private: + static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0); +public: + //@} static void release (TAO_Queued_Data *qd); + void release (void); /// Duplicate ourselves. This creates a copy of ourselves on the /// heap and returns a pointer to the duplicated node. @@ -146,11 +237,43 @@ public: /// The message block that contains the message. ACE_Message_Block *msg_block_; - /// Data missing in the above message that hasn't been read or - /// processed yet. - CORBA::Long missing_data_; - - /// The byte order of the message that is stored in the node.. + /*! + @name Missing Data details + + The \a missing_data_bytes_ member contains the number of bytes of + data missing from \a msg_block_. However, there can be two places + where data is missing: header and payload. We cannot know how + much data is missing from the payload until we have a complete + header. Fortunately, headers are a fixed length, so we can know + how much we're missing from the header. + + We use \param current_state_ to indicate which portion of the message + \param missing_data_bytes_ refers to, as well as the general state of + the message. + */ + //@{ + /*! + Describes the meaning given to the number stored in \a missing_data_bytes_. + */ + enum Queued_Data_State + { + INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted. + COMPLETED = 0, //!< Message is complete; \a missing_data_bytes_ should be zero. + WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_bytes_ indicates part of header is missing. + WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_bytes_ indicates part of payload is missing. + }; + + /*! + Indicates the current state of the message, including hints at + how to interpret the value stored in \a missing_data_bytes_. + */ + Queued_Data_State current_state_; + + /*! Data missing in the above message that hasn't been read or processed yet. */ + size_t missing_data_bytes_; + //@} + + /*! The byte order of the message that is stored in the node. */ CORBA::Octet byte_order_; /// Many protocols like GIOP have a major and minor version @@ -165,6 +288,9 @@ public: /// queue already has more fragments that is missing.. CORBA::Octet more_fragments_; + /// The fragment request id + CORBA::ULong request_id_; + /// The message type of the message TAO_Pluggable_Message_Type msg_type_; diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index d67bd485383..f82267b5cea 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void) return -1; if (this->size_ && - this->queued_data_->missing_data_ == 0) + this->last_added_->missing_data_bytes_ == 0) return 1; return 0; @@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void) return -1; if (this->size_ && - this->queued_data_->next_->missing_data_ == 0 && - this->queued_data_->next_->more_fragments_ == 0) + this->last_added_->next_->missing_data_bytes_ == 0 && + this->last_added_->next_->more_fragments_ == 0) return 1; return 0; @@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void) return 0; if (this->size_ && - this->queued_data_->more_fragments_ == 1) + this->last_added_->more_fragments_ == 1) return 1; return 0; @@ -55,23 +55,22 @@ ACE_INLINE size_t TAO_Incoming_Message_Queue::missing_data_tail (void) const { if (this->size_ != 0) - return this->queued_data_->missing_data_; + return this->last_added_->missing_data_bytes_; return 0; } - -ACE_INLINE TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_node (void) -{ - return TAO_Queued_Data::get_queued_data (); -} - /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ +ACE_INLINE void +TAO_Queued_Data::release (void) +{ + TAO_Queued_Data::release (this); +} + /*static*/ ACE_INLINE void TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb) diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 8ac38de01e0..3e84635d767 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -121,36 +121,30 @@ public: virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; - - /// Calculate the amount of data that is missing in the <incoming> - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; - - /// Get the details of the message parsed through the <qd>. - virtual void get_message_data (TAO_Queued_Data *qd) = 0; - - /* Extract the details of the next message from the <incoming> - * through <qd>. Returns 1 if there are more messages and returns a - * 0 if there are no more messages in <incoming>. - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) = 0; - - /// Check whether the node <qd> needs consolidation from <incoming> - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) = 0; - - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) = 0; - /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) = 0; + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const = 0; /// Parse the reply message that we received and return the reply /// information though <reply_info> diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index d4edd4c8c80..aa5a569892b 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -88,12 +88,19 @@ TAO_DIOP_Transport::send (iovec *iov, int iovcnt, for (int i = 0; i < iovcnt; i++) bytes_to_send += iov[i].iov_len; - this->connection_handler_->dgram ().send (iov, - iovcnt, - addr); + ssize_t n = this->connection_handler_->dgram ().send (iov, + iovcnt, + addr); // @@ Michael: // Always return a positive number of bytes sent, as we do // not handle sending errors in DIOP. + if (n == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t|%N|%l) Send of %d bytes failed %p\n"), + bytes_to_send, + ACE_TEXT ("send_i ()\n"))); + } bytes_transferred = bytes_to_send; @@ -191,7 +198,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), message_block.space (), max_wait_time); @@ -199,6 +206,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, if (n <= 0) { if (n == -1) + // @@ Why not send_connection_closed_notifications() ? this->tms_->connection_closed (); return n; @@ -207,23 +215,43 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the incoming message for validity. The check needs to be + // Check the incoming message for validity. The check needs to be // performed by the messaging objects. - if (this->parse_incoming_messages (message_block) == -1) - return -1; + if (this->messaging_object ()->check_for_valid_header (message_block) == 0) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"), + this->id (), + ACE_TEXT ("handle_input_i ()\n"))); + } + + return -1; + } // NOTE: We are not performing any queueing nor any checking for - // missing data. We are assuming that ALL the data would be got in a + // missing data. We are assuming that ALL the data arrives in a // single read. // Make a node of the message block.. - TAO_Queued_Data qd (&message_block); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); - - // Process the message - return this->process_parsed_messages (&qd, rh); + // + // We could make this more efficient by having a fixed Queued Data + // allocator, i.e., it always gave back the same thing. Actually, + // we *could* create an allocator that took a stack-allocated object + // as an argument and returned that when asked an allocation is + // done. Something to contemplate... + TAO_Queued_Data* qd = + TAO_Queued_Data::make_completed_message (message_block, + *this->messaging_object ()); + int retval = -1; + if (qd) + { + // Process the message + retval = this->process_parsed_messages (qd, rh); + TAO_Queued_Data::release (qd); + } + return retval; } @@ -310,4 +338,175 @@ TAO_DIOP_Transport::messaging_init (CORBA::Octet major, return 1; } +// @@ Frank: Hopefully DIOP doesn't need this +/* +int +TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) +{ + CORBA::Boolean byte_order; + if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) + return -1; + + cdr.reset_byte_order (ACE_static_cast(int,byte_order)); + + DIOP::ListenPointList listen_list; + if ((cdr >> listen_list) == 0) + return -1; + + // As we have received a bidirectional information, set the flag to + // 1 + this->bidirectional_flag (1); + return this->connection_handler_->process_listen_point_list (listen_list); +} +*/ + + + +// @@ Frank: Hopefully DIOP doesn't need this +/* +void +TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) +{ + + // Get a handle on to the acceptor registry + TAO_Acceptor_Registry * ar = + this->orb_core ()->acceptor_registry (); + + + // Get the first acceptor in the registry + TAO_AcceptorSetIterator acceptor = ar->begin (); + + DIOP::ListenPointList listen_point_list; + + for (; + acceptor != ar->end (); + acceptor++) + { + // Check whether it is a DIOP acceptor + if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE) + { + this->get_listen_point (listen_point_list, + *acceptor); + } + } + + // We have the ListenPointList at this point. Create a output CDR + // stream at this point + TAO_OutputCDR cdr; + + // Marshall the information into the stream + if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0) + || (cdr << listen_point_list) == 0) + return; + + // Add this info in to the svc_list + opdetails.service_context ().set_context (IOP::BI_DIR_DIOP, + cdr); + + return; +} + + +int +TAO_DIOP_Transport::get_listen_point ( + DIOP::ListenPointList &listen_point_list, + TAO_Acceptor *acceptor) +{ + TAO_DIOP_Acceptor *iiop_acceptor = + ACE_dynamic_cast (TAO_DIOP_Acceptor *, + acceptor ); + + // Get the array of endpoints serviced by <iiop_acceptor> + const ACE_INET_Addr *endpoint_addr = + iiop_acceptor->endpoints (); + + // Get the count + size_t count = + iiop_acceptor->endpoint_count (); + + // Get the local address of the connection + ACE_INET_Addr local_addr; + + if (this->connection_handler_->peer ().get_local_addr (local_addr) + == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" address in set_bidir_context_info () \n")), + -1); + } + + + // Note: Looks like there is no point in sending the list of + // endpoints on interfaces on which this connection has not + // been established. If this is wrong, please correct me. + char *local_interface = 0; + + // Get the hostname for the local address + if (iiop_acceptor->hostname (this->orb_core_, + local_addr, + local_interface) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" name \n")), + -1); + } + + ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *, + endpoint_addr); + + for (size_t index = 0; + index <= count; + index++) + { + // Get the listen point on that acceptor if it has the same + // interface on which this connection is established + char *acceptor_interface = 0; + + if (iiop_acceptor->hostname (this->orb_core_, + tmp_addr[index], + acceptor_interface) == -1) + continue; + + // @@ This is very bad for performance, but it is a one time + // affair + if (ACE_OS::strcmp (local_interface, + acceptor_interface) == 0) + { + // We have the connection and the acceptor endpoint on the + // same interface + DIOP::ListenPoint point; + point.host = CORBA::string_dup (local_interface); + point.port = endpoint_addr[index].get_port_number (); + + // Get the count of the number of elements + CORBA::ULong len = listen_point_list.length (); + + // Increase the length by 1 + listen_point_list.length (len + 1); + + // Add the new length to the list + listen_point_list[len] = point; + } + + // @@ This is bad.... + CORBA::string_free (acceptor_interface); + } + + CORBA::string_free (local_interface); + return 1; +} +*/ + +#if 0 +TAO_Connection_Handler * +TAO_DIOP_Transport::invalidate_event_handler_i (void) +{ + TAO_Connection_Handler * eh = this->connection_handler_; + this->connection_handler_ = 0; + return eh; +} +#endif + #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */ diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index aaebc6860cf..c7f845eee81 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -136,6 +136,7 @@ TAO_SHMIOP_Transport::recv (char *buf, } +#if 0 int TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, @@ -191,6 +192,7 @@ TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, // process that return this->process_parsed_messages (&pqd, rh); } +#endif int TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index 02c67c63116..8b54426aaa7 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -82,10 +82,14 @@ protected: size_t len, const ACE_Time_Value *s = 0); +#if 0 + // This no longer exists with the PMB-change flow. Not sure how to deal with that, + // so for now we ditch the method and see if things work. virtual int consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); +#endif //@} diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 9923ff1f895..1640644edf9 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -19,7 +19,9 @@ #include "Resume_Handle.h" #include "Codeset_Manager.h" #include "Codeset_Translator_Factory.h" +#include "GIOP_Message_State.h" #include "ace/OS_NS_sys_time.h" +#include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -110,12 +112,15 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) + , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) + , recv_buffer_size_ (0) + , sent_byte_count_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) @@ -236,13 +241,9 @@ TAO_Transport::generate_request_header ( { // codeset service context is only supposed to be sent in the first request // on a particular connection. - if (this->first_request_) - { - this->orb_core ()->codeset_manager ()->generate_service_context ( - opdetails, - *this - ); - } + TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager(); + if (csm && this->first_request_) + csm->generate_service_context( opdetails, *this ); if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -604,7 +605,12 @@ int TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -620,7 +626,12 @@ int TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -739,6 +750,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) // no bytes are sent send() can only return 0 or -1 ACE_ASSERT (byte_count != 0); + // Total no. of bytes sent for a send call + this->sent_byte_count_ += byte_count; + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -762,6 +776,10 @@ TAO_Transport::drain_queue_i (void) // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; + // reset the value so that the counting is done for each new send + // call. + this->sent_byte_count_ = 0; + while (i != 0) { // ... each element fills the iovector ... @@ -820,8 +838,14 @@ TAO_Transport::drain_queue_i (void) if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - reactor->cancel_timer (this->flush_timer_id_); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } this->reset_flush_timer (); } @@ -898,20 +922,25 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); - - if (this->flush_timer_pending ()) + if (eh != 0) { - reactor->cancel_timer (this->flush_timer_id_); - } + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } } return constraints_reached; @@ -1118,6 +1147,18 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } + +class CTHack +{ +public: + CTHack() { enter(); } + ~CTHack() { leave(); } +private: + void enter() { x = 1; } + void leave() { x = 0; } + int x; +}; + /* * * All the methods relevant to the incoming data path of the ORB are @@ -1129,6 +1170,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, int /*block*/) { + CTHack cthack; + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, @@ -1136,9 +1179,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, this->id ())); } - // First try to process messages of the head of the incoming queue. + // First try to process messages off the head of the incoming queue. int retval = this->process_queue_head (rh); - if (retval <= 0) { if (retval == -1) @@ -1149,7 +1191,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, "error while parsing the head of the queue\n", this->id())); } - return retval; } @@ -1158,7 +1199,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // The buffer on the stack which will be used to hold the input // messages - char buf [TAO_MAXBUFSIZE]; + char buf[TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1180,26 +1221,35 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); + // We'll loop trying to complete the message this number of times, + // and that's it. + unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; + + unsigned int did_queue_message = 0; // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; - if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = - message_block.space (); + recv_size = message_block.space (); } else { - recv_size = - this->messaging_object ()->header_length (); + recv_size = this->messaging_object ()->header_length (); } + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), recv_size, max_wait_time); @@ -1212,7 +1262,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::handle_input_i: " "read %d bytes\n", this->id (), n)); } @@ -1220,172 +1270,372 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the message and try consolidating the message if - // needed. - retval = this->parse_consolidate_messages (message_block, - rh, - max_wait_time); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) message_block.rd_ptr (), + message_block.length (), + ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); - if (retval <= 0) - { - if (retval == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " - "error while parsing and consolidating\n", - this->id ())); - } - return retval; - } - - // Make a node of the message block.. - TAO_Queued_Data qd (&message_block, - this->orb_core_->transport_message_buffer_allocator ()); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); - // Check whether the message was fragmented.. - if (qd.more_fragments_ || - (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) +complete_message_and_possibly_enqueue: + // Check to see if we're still working to complete a message + if (this->uncompleted_message_) { - // Duplicate the node that we have as the node is on stack.. - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); + // try to complete it - return this->consolidate_fragments (nqd, rh); - } + // on exit from this frame we have one of the following states: + // + // (a) an uncompleted message still in uncompleted_message_ + // AND message_block is empty + // + // (b) uncompleted_message_ zero, the completed message at the + // tail of the incoming queue; message_block could be empty + // or still contain bytes - // Process the message - return this->process_parsed_messages (&qd, - rh); -} + // ==> repeat + do + { + /* + * Append the "right number of bytes" to uncompleted_message_ + */ + // ==> right_number_of_bytes = MIN(bytes missing from + // uncompleted_message_, length of message_block); + size_t right_number_of_bytes = + ACE_MIN (this->uncompleted_message_->missing_data_bytes_, + message_block.length () ); -int -TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // Parse the incoming message for validity. The check needs to be - // performed by the messaging objects. - if (this->parse_incoming_messages (block) == -1) - { - return -1; - } + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "trying to use %u (of %u) " + "bytes to complete message missing %u bytes\n", + this->id (), + right_number_of_bytes, + message_block.length (), + this->uncompleted_message_->missing_data_bytes_)); + } - // Check whether we have a complete message for processing - ssize_t missing_data = this->missing_data (block); + // ==> append right_number_of_bytes from message_block + // to uncomplete_message_ & update read pointer of + // message_block; + + // 1. we assume that uncompleted_message_.msg_block_'s + // wr_ptr is properly maintained + // 2. we presume that uncompleted_message_.msg_block was + // allocated with enough space to contain the *entire* + // expected GIOP message, so this copy shouldn't involve an + // additional allocation + this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), + right_number_of_bytes); + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + message_block.rd_ptr (right_number_of_bytes); + + switch (this->uncompleted_message_->current_state_) + { + case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: + { + int hdrvalidity = this->messaging_object()->check_for_valid_header ( + *this->uncompleted_message_->msg_block_); + if (hdrvalidity == 0) + { + // According to the spec, Section 15.4.8, we should send + // the MessageError GIOP message on receipt of "any message...whose + // header is not properly formed (e.g., has the wrong magic value)". + // + // So, rather than returning -1, what we REALLY need to do is + // send a MessageError in reply. + // + // I'm not sure what the best way to trigger that is...probably to + // queue up a special internal-only COMPLETED message that, when + // processed, sends the MessageError as part of its processing. + return -1; + } + else if (hdrvalidity == 1) + { + // ==> update bytes missing from uncompleted_message_ + // with size of message from valid header; + this->messaging_object()->set_queued_data_from_message_header ( + this->uncompleted_message_, + *this->uncompleted_message_->msg_block_); + // ==> change state of uncompleted_event_ to + // WAITING_TO_COMPLETE_PAYLOAD; + this->uncompleted_message_->current_state_ = + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; + + // ==> Resize the message block to have capacity for + // the rest of the incoming message + ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; + ACE_CDR::grow (&mb, + mb.size () + + this->uncompleted_message_->missing_data_bytes_); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "found a valid header in the message; " + "waiting for %u bytes to complete payload\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + + // Continue the loop... + continue; + } + // In the case where we don't have enough information (hdrvalidity == -1), + // we just have to fall through and collect more. +#if 0 + else + { + // What the heck will we do with a bad header? Just + // better to close the connection and let things + // re-train from there. + if (this->uncompleted_message_->msg_block_->length () == + this->messaging_object()->header_length()) + return -1; + +#if 0 // I don't think I need this clause, but I'm leaving it just in case. + // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0); +#endif + } +#endif + } + break; + + case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: + // Here we have an opportunity to try to finish reading the + // uncompleted message. This is a Good Idea(TM) because there are + // good odds that either more data came available since the last + // time we read, or that we simply didn't read the whole message on + // the first read. So, we try to read again. + // + // NOTE! this changes this->uncompleted_message_! + this->try_to_complete (max_wait_time); + // ==> if (bytes missing from uncompleted_message_ == 0) + if (this->uncompleted_message_->missing_data_bytes_ == 0) + { + /* + * We completed the message! Hooray! + */ + // ==> place uncompleted_message_ (which is now + // complete!) at the tail of the incoming message + // queue; + + // ---> NOTE: whoever pulls this off the queue must delete it! + this->uncompleted_message_->current_state_ + = TAO_Queued_Data::COMPLETED; + + // @@CJC NEED TO CHECK RETURN VALUE HERE! + this->enqueue_incoming_message (this->uncompleted_message_); + did_queue_message = 1; + // zero out uncompleted_message_; + this->uncompleted_message_ = 0; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "completed and queued message for processing!\n", + this->id ())); + } - if (missing_data < 0) - { - // If we have more than one message - return this->consolidate_extra_messages (block, - rh); - } - else if (missing_data > 0) - { - // If we have missing data then try doing a read or try queueing - // them. - return this->consolidate_message (block, - missing_data, - rh, - max_wait_time); - } + } + else + { - return 1; -} + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still need %u bytes to complete uncompleted message.\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + } + break; -int -TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) -{ - // If we have a queue and if the last message is not complete a - // complete one, then this read will get us the remaining data. So - // do not try to parse the header if we have an incomplete message - // in the queue. - if (this->incoming_message_queue_.is_tail_complete () != 0) - { - // As it looks like a new message has been read, process the - // message. Call the messaging object to do the parsing.. - int retval = - this->messaging_object ()->parse_incoming_messages (block); + default: + // @@CJC What do we do here?! + ACE_ASSERT (! "Transport::handle_input_i: unexpected state" + "in uncompleted_message_"); + } + } + // Does the order of the checks matter? In both (a) and (b), + // message_block is empty, but only in (b) is there no + // uncompleted_message_. + // ==> until (message_block is empty || there is no uncompleted_message_); + // or, rewritten in C++ looping constructs + // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); + while (message_block.length() != 0 && this->uncompleted_message_); + } + + // ***************************** + // @@ CJC + // + // Once upon a time we tried to complete reading the uncompleted + // message here, but testing found that completing later worked + // better. + // ***************************** + + + // At this point, there should be nothing in uncompleted_message_. + // We now need to chop up the bytes in message_block and store any + // complete messages in the incoming message queue. + // + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + TAO_Queued_Data *complete_message = 0; + do + { + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") + ACE_TEXT("extracting complete messages\n"))); + ACE_HEX_DUMP ((LM_DEBUG, + message_block.rd_ptr (), + message_block.length (), + ACE_TEXT (" from this message buffer"))); + } - if (retval == -1) + complete_message = + TAO_Queued_Data::make_completed_message ( + message_block, *this->messaging_object ()); + if (complete_message) + { + this->enqueue_incoming_message (complete_message); + did_queue_message = 1; + } + } + while (complete_message != 0); + // On exit from this frame we have one of the following states: + // (a) message_block is empty + // (b) message_block contains bytes from a partial message + } + + // If, at this point, there's still data in message_block, it's + // an incomplete message. Therefore, we stuff it into the + // uncompleted_message_ and clear out message_block. + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + // duplicate message_block remainder into this->uncompleted_message_ + ACE_ASSERT (this->uncompleted_message_ == 0); + this->uncompleted_message_ = + TAO_Queued_Data::make_uncompleted_message (&message_block, + *this->messaging_object ()); + ACE_ASSERT (this->uncompleted_message_ != 0); + + // In a debug build, we won't reach this point if we couldn't + // create an uncompleted message because the above ASSERT will + // trip. However, in an optimized build, the ASSERT isn't + // there, so we'll go past here. + // + // We could put a check in here similar to the ASSERT condition, + // but doing that would terminate this loop early and result in + // our never processing any completed messages that were received + // in this trip to handle_input_i. + // + // Maybe we could instead queue up a special completed message that, + // when processed, causes the connection to get closed in a non-graceful + // termination scenario. + } + + // We should have consumed ALL the bytes by now. + ACE_ASSERT (message_block.length () == 0); + + // + // We don't want to try to re-read earlier because we may not have + // an uncompleted message until we get to this point. So, if we did + // it earlier, we could have missed the opportunity to complete it + // and dispatch. + // + // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again + // to increase throughput! + + if (this->uncompleted_message_) + { + if (number_of_read_attempts--) { + // We try to read again just in case more data arrived while + // we were doing the stuff above. This way, we can increase + // throughput without much of a penalty. + if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " - "error in incoming message\n", - this->id ())); + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still have an uncompleted message; " + "will try %d more times before letting " + "somebody else have a chance.\n", + this->id (), + number_of_read_attempts)); + } - return -1; + // We only bother trying to complete payload, not header, because the + // retry only happens in the complete-the-payload clause above. + if (this->uncompleted_message_->current_state_ == + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD) + goto complete_message_and_possibly_enqueue; + } + else + { + // The queue should be empty because it should have been processed + // above. But I wonder if I should put a check in here anyway. + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "giving up reading for now and returning " + "with incoming queue length = %d\n", + this->id (), + this->incoming_message_queue_.queue_length ())); + if (this->uncompleted_message_) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "missing bytes from uncompleted message = %u\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + return 1; } } - return 0; -} - - -size_t -TAO_Transport::missing_data (ACE_Message_Block &incoming) -{ - // If we have a incomplete message in the queue then find out how - // much of data is required to get a complete message. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->incoming_message_queue_.missing_data_tail (); - } + // **** END CJC PMG CHANGES **** - return this->messaging_object ()->missing_data (incoming); + return did_queue_message ? this->process_queue_head (rh) : 1; } -int -TAO_Transport::consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +void +TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) { - // Check whether the last message in the queue is complete.. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->consolidate_message_queue (incoming, - missing_data, - rh, - max_wait_time); - } - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message\n", - this->id ())); - } - - // Calculate the actual length of the load that we are supposed to - // read which is equal to the <missing_data> + length of the buffer - // that we have.. - size_t payload = missing_data + incoming.size (); - - // Grow the buffer to the size of the message - ACE_CDR::grow (&incoming, - payload); + if (this->uncompleted_message_ == 0) + return; ssize_t n = 0; + size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; + ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; - // As this used for transports where things are available in one - // shot this looping should not create any problems. - for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) + // Try to complete this until we error or block right here... + for (ssize_t bytes = missing_data; + bytes != 0; + bytes -= n) { // .. do a read on the socket again. - n = this->recv (incoming.wr_ptr (), + n = this->recv (mb.wr_ptr (), bytes, max_wait_time); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "TAO (%P|%t) - Transport[%d]::handle_input_i, " "read %d bytes on attempt\n", this->id(), n)); } @@ -1395,375 +1645,168 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, break; } - incoming.wr_ptr (n); + mb.wr_ptr (n); missing_data -= n; } - - // If we got an error.. - if (n == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Trasport[%d]::consolidate_message, " - "error while trying to consolidate\n", - this->id ())); - } - - return -1; - } - - // If we had gotten a EWOULDBLOCK n would be equal to zero. But we - // have to put the message in the queue anyway. So let us proceed - // to do that and return... - - // Check to see if we have messages in queue or if we have missing - // data . AT this point we cannot have have semi-complete messages - // in the queue as they would have been taken care before. Put - // ourselves in the queue and then try processing one of the - // messages.. - if ((missing_data > 0 - ||this->incoming_message_queue_.queue_length ()) - && this->incoming_message_queue_.is_tail_fragmented () == 0) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " - "queueing up the message\n", - this->id ())); - } - - // Get a queued data - TAO_Queued_Data *qd = - this->make_queued_data (incoming); - - // Add the missing data to the queue - qd->missing_data_ = missing_data; - - // Get the rest of the messaging data - this->messaging_object ()->get_message_data (qd); - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - if (this->incoming_message_queue_.is_head_complete ()) - { - return this->process_queue_head (rh); - } - - return 0; - } - - // We dont have any missing data. Just make a queued_data node with - // the existing message block and send it to the higher layers of - // the ORB. - TAO_Queued_Data pqd (&incoming, - this->orb_core_->transport_message_buffer_allocator ()); - pqd.missing_data_ = missing_data; - this->messaging_object ()->get_message_data (&pqd); - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (pqd.more_fragments_ || - (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the queued data as it is on stack.. - TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); - - return this->consolidate_fragments (nqd, rh); - } - - // Now we have a full message in our buffer. Just go ahead and - // process that - return this->process_parsed_messages (&pqd, - rh); } -int -TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh) -{ - // If we have received a fragment message then we have to - // consolidate <qd> with the last message in queue - // @@todo: this piece of logic follows GIOP a bit... Need to revisit - // if we have protocols other than GIOP - - // @@todo: Fragments now have too much copying overhead. Need to get - // them out if we want to have some reasonable performance metrics - // in future.. Post 1.2 seems a nice time.. - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - TAO_Queued_Data *tqd = - this->incoming_message_queue_.dequeue_tail (); - - tqd->more_fragments_ = qd->more_fragments_; - tqd->missing_data_ = qd->missing_data_; - - if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1) - { - return -1; - } - - TAO_Queued_Data::release (qd); - this->incoming_message_queue_.enqueue_tail (tqd); - this->process_queue_head (rh); - } - else - { - // if we dont have a fragment already in the queue just add it in - // the queue - this->incoming_message_queue_.enqueue_tail (qd); - } - - return 0; -} int -TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", - this->id ())); - } - - // If the queue did not have a complete message put this piece of - // message in the queue. We know it did not have a complete - // message. That is why we are here. - size_t n = - this->incoming_message_queue_.copy_tail (incoming); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "copied [%d] bytes to the tail\n", - this->id (), - n)); - } - - // Update the missing data... - missing_data = - this->incoming_message_queue_.missing_data_tail (); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "missing [%d] bytes in the tail message\n", - this->id (), - missing_data)); - } - - // Move the read pointer of the <incoming> message block to the end - // of the copied message and process the remaining portion... - incoming.rd_ptr (n); - - // If we have some more information left in the message block.. - if (incoming.length ()) - { - // We may have to parse & consolidate. This part of the message - // doesn't seem to be part of the last message in the queue (as - // the copy () hasn't taken away this message). - int retval = this->parse_consolidate_messages (incoming, - rh, - max_wait_time); - - // If there is an error return - if (retval == -1) + // Get the GIOP version + CORBA::Octet major = queueable_message->major_version_; + CORBA::Octet minor = queueable_message->minor_version_; + CORBA::UShort whole = major << 8 | minor; + + // Set up a couple of pointers that are shared by the code + // for the different GIOP versions. + ACE_Message_Block *mb = 0; + TAO_Queued_Data *fragment_message = 0; + + switch(whole) + { + case 0x0100: // GIOP 1.0 + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + // Fragments aren't supported in 1.0. This is an error and + // we should reject it somehow. What do we do here? Do we throw + // an exception to the receiving side? Do we throw an exception + // to the sending side? + // + // At the very least, we need to log the fact that we received + // nonsense. + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "detected a fragmented GIOP 1.0 message\n"), + -1); + break; + case 0x0101: // GIOP 1.1 + // In 1.1, fragments kinda suck because they don't have they're + // own message-specific header. Therefore, we have to do the + // following: + fragment_message = + this->incoming_message_queue_.find_fragment (major, minor); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (queueable_message->more_fragments_) { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "error while consolidating, part of the read message\n", - this->id ())); - } - return retval; + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN); + mb->cont (queueable_message->msg_block_); + + // Get rid of the queuable message but save the message block + queueable_message->msg_block_ = 0; + queueable_message->release (); + + // One note is that TAO_Queued_Data contains version numbers, + // but doesn't indicate the actual protocol to which those + // version numbers refer. That's not a problem, though, because + // instances of TAO_Queued_Data live in a queue, and that queue + // lives in a particular instance of a Transport, and the + // transport instance has an association with a particular + // messaging_object. The concrete messaging object embodies a + // messaging protocol, and must cover all versions of that + // protocol. Therefore, we just need to cover the bases of all + // versions of that one protocol. } - else if (retval == 1) - { - // If the message in the <incoming> message block has only - // one message left we need to process that seperately. - - // Get a queued data - TAO_Queued_Data *qd = this->make_queued_data (incoming); - - // Get the rest of the message data - this->messaging_object ()->get_message_data (qd); - - // Add the missing data to the queue - qd->missing_data_ = 0; - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (qd->more_fragments_ || - (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - return this->consolidate_fragments (qd, rh); - } - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - // We should surely have a message in queue now. So just - // process that. - return this->process_queue_head (rh); - } - - // parse_consolidate_messages () would have processed one of the - // messages, so we better return as we dont want to starve other - // threads. - return 0; - } - - // If we still have some missing data.. - if (missing_data > 0) - { - // Get the last message from the Queue - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_tail (); - - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "trying recv, again\n", - this->id ())); - } - - // Try to do a read again. If we have some luck it would be - // great.. - ssize_t n = this->recv (qd->msg_block_->wr_ptr (), - missing_data, - max_wait_time); - - if (TAO_debug_level > 5) + else { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "recv retval [%d]\n", - this->id (), - n)); - } + // There is a complete chain of fragments + fragment_message->consolidate (); - // Error... - if (n < 0) - { - return n; + // Go ahead and enqueue this message + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); } - - // If we get a EWOULDBLOCK ie. n==0, we should anyway put the - // message in queue before returning.. - // Move the write pointer - qd->msg_block_->wr_ptr (n); - - // Decrement the missing data - qd->missing_data_ -= n; - - // Now put the TAO_Queued_Data back in the queue - this->incoming_message_queue_.enqueue_tail (qd); - - // Any way as we have come this far and are about to return, - // just try to process a message if it is there in the queue. - if (this->incoming_message_queue_.is_head_complete ()) + break; + case 0x0102: // GIOP 1.2 + // In 1.2, we get a little more context. There's a + // FRAGMENT message-specific header, and inside that is the + // request id with which the fragment is associated. + fragment_message = + this->incoming_message_queue_.find_fragment ( + queueable_message->request_id_); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (fragment_message->major_version_ != major || + fragment_message->minor_version_ != minor) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "GIOP versions do not match " + "(%d.%d != %d.%d\n", + fragment_message->major_version_, + fragment_message->minor_version_, + major, minor), + -1); + + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN + + TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + mb->cont (queueable_message->msg_block_); + + // Remove our reference to the message block. At this point + // the message block of the fragment head owns it as part of a + // chain + queueable_message->msg_block_ = 0; + + if (!queueable_message->more_fragments_) { - return this->process_queue_head (rh); + // This is the end of the fragments for this request + fragment_message->consolidate (); } - return 0; + // Get rid of the queuable message + queueable_message->release (); + break; + default: + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + // This is an unknown GIOP version + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "can not handle a fragmented GIOP %d.%d " + "message\n", major, minor), + -1); } - // Process a message in the head of the queue if we have one.. - return this->process_queue_head (rh); + return 0; } int -TAO_Transport::consolidate_extra_messages (ACE_Message_Block - &incoming, - TAO_Resume_Handle &rh) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", - this->id ())); - } - - // Pick the tail of the queue - TAO_Queued_Data *tail = - this->incoming_message_queue_.dequeue_tail (); - - if (tail) - { - // If we have a node in the tail, checek to see whether it needs - // consolidation. If so, just consolidate it. - if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) - { - return -1; - } - - // .. put the tail back in queue.. - this->incoming_message_queue_.enqueue_tail (tail); - } - - int retval = 1; - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " - "extracting extra messages\n", - this->id ())); - } - - // Extract messages.. - while (retval == 1) - { - TAO_Queued_Data *q_data = 0; - - retval = - this->messaging_object ()->extract_next_message (incoming, - q_data); - if (q_data) - { - // If we have read a framented message then... - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - this->consolidate_fragments (q_data, rh); - } - else - { - this->incoming_message_queue_.enqueue_tail (q_data); - } - } - } - - // In case of error return.. - if (retval == -1) - { - return retval; - } - - return this->process_queue_head (rh); -} - -int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) { // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; - // int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -1828,57 +1871,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } -TAO_Queued_Data * -TAO_Transport::make_queued_data (ACE_Message_Block &incoming) -{ - // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // Get the flag for the details of the data block... - ACE_Message_Block::Message_Flags flg = - incoming.self_flags (); - - if (ACE_BIT_DISABLED (flg, - ACE_Message_Block::DONT_DELETE)) - { - // Duplicate the data block before putting it in the queue. - qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); - } - else - { - // As we are in CORBA mode, all the data blocks would be aligned - // on an 8 byte boundary. Hence create a data block for more - // than the actual length - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (incoming.length ()+ - ACE_CDR::MAX_ALIGNMENT); - - // Get the allocator.. - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - // Make message block.. - ACE_Message_Block mb (db, - 0, - alloc); - - // Duplicate the block.. - qd->msg_block_ = mb.duplicate (); - - // Align the message block - ACE_CDR::mb_align (qd->msg_block_); - - // Copy the data.. - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - } - - - return qd; -} - int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { @@ -1889,65 +1881,59 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) this->id ())); } - // See if the message in the head of the queue is complete... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + if (this->incoming_message_queue_.is_head_complete () != 1) + return 1; - if (TAO_debug_level > 3) + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); - } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); - - } - int retval = - this->notify_reactor (); + "notify reactor\n", + this->id ())); - if (retval == 1) - { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0) - return -1; - } - else - { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } + int retval = + this->notify_reactor (); - // Process the message... - if (this->process_parsed_messages (qd, rh) == -1) + if (retval == 1) { - return -1; + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + // Process the message... + int retval = this->process_parsed_messages (qd, rh); - return 0; - } + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - return 1; + return (retval == -1) ? -1 : 0; } int @@ -1959,6 +1945,8 @@ TAO_Transport::notify_reactor (void) } ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); @@ -1995,6 +1983,18 @@ TAO_Transport::transport_cache_manager (void) return this->orb_core_->lane_resources ().transport_cache (); } +size_t +TAO_Transport::recv_buffer_size (void) +{ + return this->recv_buffer_size_; +} + +size_t +TAO_Transport::sent_byte_count (void) +{ + return this->sent_byte_count_; +} + void TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) { diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index b6059b211fe..4b885eb40e5 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -447,7 +447,6 @@ public: virtual ACE_Event_Handler * event_handler_i (void) = 0; protected: - virtual TAO_Connection_Handler * connection_handler_i (void) = 0; public: @@ -489,6 +488,7 @@ public: virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time = 0, int block = 0); + void try_to_complete (ACE_Time_Value *max_wait_time); enum { @@ -568,60 +568,11 @@ public: protected: - /// Called by the handle_input_i (). This method is used to parse - /// message read by the handle_input_i () call. It also decides - /// whether the message needs consolidation before processing. - int parse_consolidate_messages (ACE_Message_Block &bl, - TAO_Resume_Handle &rh, - ACE_Time_Value *time = 0); - - - /// Method does parsing of the message if we have a fresh message in - /// the <message_block> or just returns if we have read part of the - /// previously stored message. - int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Return if we have any missing data in the queue of messages - /// or determine if we have more information left out in the - /// presently read message to make it complete. - size_t missing_data (ACE_Message_Block &message_block); - - /// Consolidate the currently read message or consolidate the last - /// message in the queue. The consolidation of the last message in - /// the queue is done by calling consolidate_message_queue (). - virtual int consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// @@Bala: Docu??? - int consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh); - - /// First consolidate the message queue. If the message is still not - /// complete, try to read from the handle again to make it - /// complete. If these dont help put the message back in the queue - /// and try to check the queue if we have message to process. (the - /// thread needs to do some work anyway :-)) - int consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// Called by parse_consolidate_message () if we have more messages - /// in one read. Queue up the messages and try to process one of - /// them, atleast at the head of them. - int consolidate_extra_messages (ACE_Message_Block &incoming, - TAO_Resume_Handle &rh); - /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); - /// Make a queued data from the <incoming> message block - TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); - /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, @@ -669,6 +620,40 @@ public: int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); + /// Accessor to recv_buffer_size_ + size_t recv_buffer_size (void); + + /// Accessor to sent_byte_count_ + size_t sent_byte_count (void); + + + /*! + \name Incoming Queue Methods + */ + //@{ + /*! + \brief Queue up \a queueable_message as a completely-received incoming message. + + This method queues up a completely-received queueable GIOP message + (i.e., it must be dynamically-allocated). It does not assemble a + complete GIOP message; that should be done prior to calling this + message, and is currently done in handle_input_i. + + This does, however, assure that a completely-received GIOP + FRAGMENT gets associated with any previously-received related + fragments. It does this through collaboration with the messaging + object (since fragment reassembly is protocol specific). + + \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received + + \return 0 successfully enqueued \a queueable_message + + \return -1 failed to enqueue \a queueable_message + \todo How do we indicate \em what may have failed? + */ + int enqueue_incoming_message (TAO_Queued_Data *queueable_message); + //@} + /// CodeSet Negotiation - Get the char codeset translator factory /// TAO_Codeset_Translator_Factory *char_translator (void) const; @@ -792,10 +777,14 @@ private: /// Print out error messages if the event handler is not valid void report_invalid_event_handler (const char *caller); - /* + /** * Process the message that is in the head of the incoming queue. * If there are more messages in the queue, this method calls - * this->notify_reactor () to wake up a thread + * this->notify_reactor () to wake up a thread. + * + * \return -1 An error occurred; occurs independent presence of messages in the queue. + * \return 1 No messages in the queue to process; nothing processed. + * \return 0 Messages were in the queue to process and one got processed. */ int process_queue_head (TAO_Resume_Handle &rh); @@ -856,9 +845,12 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// Queue of the incoming messages.. + /// Queue of the completely-received incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; + /// Place to hold a partially-received (waiting-to-be-completed) message + TAO_Queued_Data * uncompleted_message_; + /// The queue will start draining no later than <queing_deadline_> /// *if* the deadline is ACE_Time_Value current_deadline_; @@ -893,8 +885,11 @@ protected: /// Used by the LRU, LFU and FIFO Connection Purging Strategies. unsigned long purging_order_; -private: + /// Size of the buffer received. + size_t recv_buffer_size_; + /// Number of bytes sent. + size_t sent_byte_count_; /// @@Phil, I think it would be nice if we could think of a way to /// do the following. /// We have been trying to use the transport for marking about @@ -907,6 +902,7 @@ private: /// we can move this to the connection_handler and it may more sense /// with the DSCP stuff around there. Do you agree? +private: /// Additional member values required to support codeset translation TAO_Codeset_Translator_Factory *char_translator_; TAO_Codeset_Translator_Factory *wchar_translator_; diff --git a/TAO/tao/default_client.cpp b/TAO/tao/default_client.cpp index 0af1359c670..9d6cc79e0b8 100644 --- a/TAO/tao/default_client.cpp +++ b/TAO/tao/default_client.cpp @@ -179,7 +179,7 @@ TAO_Default_Client_Strategy_Factory::parse_args (int argc, ACE_TCHAR* argv[]) ACE_LIB_TEXT("LF")) == 0) this->connect_strategy_ = TAO_LEADER_FOLLOWER_CONNECT; else - this->report_option_value_error (ACE_LIB_TEXT("-ORBTransportMuxStrategy"), name); + this->report_option_value_error (ACE_LIB_TEXT("-ORBConnectStrategy"), name); } } else if (ACE_OS::strcmp (argv[curarg], diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h index 0abdc639a0e..f2b8d6a583b 100644 --- a/TAO/tao/orbconf.h +++ b/TAO/tao/orbconf.h @@ -146,6 +146,25 @@ const size_t TAO_DEFAULT_VALUE_FACTORY_TABLE_SIZE = 128; #define TAO_MAXBUFSIZE 1024 #endif /* TAO_MAXBUFSIZE */ +/*! + + The number of times the transport will try to re-read before + returning control to the reactor when it has an uncompleted + message (see TAO_Transport::handle_input()). + + The idea behind re-reading is that more data may have arrived + while the transport was busy deciding what to do with the bytes + it got, so we should probably try to re-read. + + This value shouldn't be too large, lest the transport starve + out other transports while trying to complete its message. + + When choosing a value, think of the type of this as 'unsigned int'. + */ +#if !defined(TAO_MAX_TRANSPORT_REREAD_ATTEMPTS) +#define TAO_MAX_TRANSPORT_REREAD_ATTEMPTS 2 +#endif + // This controls the alignment for TAO structs. It supports built-in // types up to and including 16 bytes (128 bits) in size. #if !defined (TAO_MAXIMUM_NATIVE_TYPE_SIZE) |