From aa8cdced83cdfdf41381ed556543afc87658ec24 Mon Sep 17 00:00:00 2001 From: Chris Cleeland Date: Mon, 15 Dec 2003 22:31:47 +0000 Subject: Tag: pmb_integration Started work on performance enhancements for PMB. --- TAO/tao/GIOP_Message_Base.cpp | 353 +++++++----------------------------------- 1 file changed, 52 insertions(+), 301 deletions(-) diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 76aeeaee51e..7ada6c903ee 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, - this) + this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, @@ -295,6 +295,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) + const { // Convert to the right type of Pluggable Messaging message type. @@ -328,264 +329,6 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } -int -TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) -{ - - if (this->message_state_.parse_message_header (incoming) == -1) - { - return -1; - } - - return 0; -} - -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) -{ - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return -1; - } - else if (len == msg_size) - return 0; - - return msg_size - len; -} - - -int -TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) -{ - TAO_GIOP_Message_State state (this->orb_core_, - this); - - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - if (incoming.length () > 0) - { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - qd->missing_data_ = -1; - } - return 0; - } - - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - size_t copying_len = state.message_size (); - - qd = this->make_queued_data (copying_len); - - if (copying_len > incoming.length ()) - { - qd->missing_data_ = - copying_len - incoming.length (); - - copying_len = incoming.length (); - } - - qd->msg_block_->copy (incoming.rd_ptr (), - copying_len); - - incoming.rd_ptr (copying_len); - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - return 1; -} - -int -TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) -{ - // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) - { - // The data length that has been stuck in there during the last - // read .... - size_t len = - qd->msg_block_->length (); - - // We know that we would have space for - // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data - // from the into the message block in - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); - - // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); - - TAO_GIOP_Message_State state (this->orb_core_, - this); - - // Parse the message header now... - if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) - { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); - } - - // Copy the pay load.. - // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.payload_size (); - - // If the data that needs to be copied is more than that is - // available to us .. - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - else - { - qd->missing_data_ = 0; - } - - // ..now we are set to copy the right amount of data to the - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the of the .. - incoming.rd_ptr (copy_len); - - // Get the other details... - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - } - else - { - // @@todo: Need to abstract this out to a seperate method... - size_t copy_len = qd->missing_data_; - - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - - // Copy the right amount of data in to the node... - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the of the .. - qd->msg_block_->rd_ptr (copy_len); - - } - - - return 0; -} - - -int -TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) -{ - if (dqd->byte_order_ != sqd->byte_order_ - || dqd->major_version_ != sqd->major_version_ - || dqd->minor_version_ != sqd->minor_version_) - { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:") - ACE_TEXT ("different GIOP versions or byte order\n"))); - return -1; - } - - // Skip the header in the incoming message - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // If we have a fragment header skip the header length too.. - if (sqd->minor_version_ == 2 && - sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - - // Get the length of the incoming message block.. - size_t incoming_length = - sqd->msg_block_->length (); - - // Increase the size of the destination message block if we need - // to. - ACE_Message_Block *mb = - dqd->msg_block_; - - // Check space before growing. - if (mb->space () < incoming_length) - { - ACE_CDR::grow (mb, - mb->length () + incoming_length); - } - - // Copy the data - dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), - incoming_length); - return 0; -} - -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - qd->byte_order_ = - this->message_state_.byte_order_; - qd->major_version_ = - this->message_state_.giop_version_.major; - qd->minor_version_ = - this->message_state_.giop_version_.minor; - - //qd->more_fragments_ = this->message_state_.more_fragments_; - - if (this->message_state_.more_fragments_) - qd->more_fragments_ = 1; - else - qd->more_fragments_ = 0; - - qd->msg_type_= - this->message_type (this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} - int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -764,13 +507,13 @@ TAO_GIOP_Message_Base::process_reply_message ( // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, - params); + params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, - params); + params); break; default: retval = -1; @@ -898,7 +641,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, parse_error = parser->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1098,9 +843,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), - locate_request.object_key ().length (), - locate_request.object_key ().get_buffer (), - 0); + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); // Set it to an error state parse_error = 1; @@ -1324,6 +1069,7 @@ TAO_GIOP_Message_Base::set_state ( } +#if 0 // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. @@ -1419,7 +1165,7 @@ TAO_GIOP_Message_Base:: transport-> id ())); } - +#endif int TAO_GIOP_Message_Base::send_reply_exception ( @@ -1576,44 +1322,49 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -TAO_Queued_Data * -TAO_GIOP_Message_Base::make_queued_data (size_t sz) +void +TAO_GIOP_Message_Base::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const { - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // @@todo: We have a similar method in Transport.cpp. Need to see how - // we can factor them out.. - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - + // @@CJC: Try leaving out the declaration for this->message_state_ + // and see what pukes. I don't think we need it any more. + TAO_GIOP_Message_State state; + if (state.take_values_from_message_block (mb) == -1) + { + // what the heck do we do here?! + qd->current_state_ = TAO_Queued_Data::INVALID; + return; + } - return qd; + // It'd be nice to have an abstract base for GIOP_Message_State + // so that there could just be a line like: + // qd->take_values_from (state); + // Get the message information + qd->byte_order_ = state.byte_order (); + qd->major_version_ = state.giop_version ().major; + qd->minor_version_ = state.giop_version ().minor; + qd->more_fragments_ = state.more_fragments () ? 1 : 0; + qd->request_id_ = state.request_id_; + qd->msg_type_= message_type (state); + qd->missing_data_bytes_ = state.payload_size (); } -size_t -TAO_GIOP_Message_Base::header_length (void) const +int +TAO_GIOP_Message_Base::check_for_valid_header ( + const ACE_Message_Block &mb + ) const { - return TAO_GIOP_MESSAGE_HEADER_LEN; + // NOTE! We don't hardcode the length of the header b/c header_length should + // be eligible for inlining by pretty much any compiler, and it should return + // a constant. The rest of this method is hard-coded and hand-optimized because + // this method gets called A LOT. + if (mb.length () < this->header_length ()) + return -1; + + // Is finding that it's the right length and the magic bytes present + // enough to declare it a valid header? I think so... + register const char* h = mb.rd_ptr (); + return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0; } -- cgit v1.2.1