diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 369 |
1 files changed, 58 insertions, 311 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index dfdd5db0474..0dc4d09e384 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -254,19 +254,18 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) // this particular environment and that isn't handled by the // networking infrastructure (e.g., IPSEC). - CORBA::ULong bodylen = ACE_static_cast (CORBA::ULong, - total_len - TAO_GIOP_MESSAGE_HEADER_LEN); + CORBA::ULong bodylen = + static_cast<CORBA::ULong> (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); #if !defined (ACE_ENABLE_SWAP_ON_WRITE) - *ACE_reinterpret_cast (CORBA::ULong *, buf + - TAO_GIOP_MESSAGE_SIZE_OFFSET) = bodylen; + *reinterpret_cast<CORBA::ULong *> (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET) + = bodylen; #else if (!stream.do_byte_swap ()) - *ACE_reinterpret_cast (CORBA::ULong *, - buf + TAO_GIOP_MESSAGE_SIZE_OFFSET) = bodylen; + *reinterpret_cast<CORBA::ULong *> (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET) + = bodylen; else - ACE_CDR::swap_4 (ACE_reinterpret_cast (char *, - &bodylen), + ACE_CDR::swap_4 (reinterpret_cast<char *>(&bodylen), buf + TAO_GIOP_MESSAGE_SIZE_OFFSET); #endif /* ACE_ENABLE_SWAP_ON_WRITE */ @@ -284,8 +283,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) } /// this->dump_msg ("send", - ACE_reinterpret_cast (u_char *, - buf), + reinterpret_cast<u_char *>(buf), total_len); // @@ -334,264 +332,6 @@ TAO_GIOP_Message_Base::message_type ( } int -TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) -{ - - if (this->message_state_.parse_message_header (incoming) == -1) - { - return -1; - } - - return 0; -} - -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) -{ - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return -1; - } - else if (len == msg_size) - return 0; - - return msg_size - len; -} - - -int -TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) -{ - TAO_GIOP_Message_State state (this->orb_core_, - this); - - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - if (incoming.length () > 0) - { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - qd->missing_data_ = -1; - } - return 0; - } - - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - size_t copying_len = state.message_size (); - - qd = this->make_queued_data (copying_len); - - if (copying_len > incoming.length ()) - { - qd->missing_data_ = - copying_len - incoming.length (); - - copying_len = incoming.length (); - } - - qd->msg_block_->copy (incoming.rd_ptr (), - copying_len); - - incoming.rd_ptr (copying_len); - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - return 1; -} - -int -TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) -{ - // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) - { - // The data length that has been stuck in there during the last - // read .... - size_t len = - qd->msg_block_->length (); - - // We know that we would have space for - // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data - // from the <incoming> into the message block in <qd> - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); - - // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); - - TAO_GIOP_Message_State state (this->orb_core_, - this); - - // Parse the message header now... - if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) - { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); - } - - // Copy the pay load.. - // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.payload_size (); - - // If the data that needs to be copied is more than that is - // available to us .. - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - else - { - qd->missing_data_ = 0; - } - - // ..now we are set to copy the right amount of data to the - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - incoming.rd_ptr (copy_len); - - // Get the other details... - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - } - else - { - // @@todo: Need to abstract this out to a seperate method... - size_t copy_len = qd->missing_data_; - - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - - // Copy the right amount of data in to the node... - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the <rd_ptr> of the <incoming>.. - qd->msg_block_->rd_ptr (copy_len); - - } - - - return 0; -} - - -int -TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) -{ - if (dqd->byte_order_ != sqd->byte_order_ - || dqd->major_version_ != sqd->major_version_ - || dqd->minor_version_ != sqd->minor_version_) - { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:") - ACE_TEXT ("different GIOP versions or byte order\n"))); - return -1; - } - - // Skip the header in the incoming message - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // If we have a fragment header skip the header length too.. - if (sqd->minor_version_ == 2 && - sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - - // Get the length of the incoming message block.. - size_t incoming_length = - sqd->msg_block_->length (); - - // Increase the size of the destination message block if we need - // to. - ACE_Message_Block *mb = - dqd->msg_block_; - - // Check space before growing. - if (mb->space () < incoming_length) - { - ACE_CDR::grow (mb, - mb->length () + incoming_length); - } - - // Copy the data - dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), - incoming_length); - return 0; -} - -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - qd->byte_order_ = - this->message_state_.byte_order_; - qd->major_version_ = - this->message_state_.giop_version_.major; - qd->minor_version_ = - this->message_state_.giop_version_.minor; - - //qd->more_fragments_ = this->message_state_.more_fragments_; - - if (this->message_state_.more_fragments_) - qd->more_fragments_ = 1; - else - qd->more_fragments_ = 0; - - qd->msg_type_= - this->message_type (this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} - -int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -641,8 +381,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, if (TAO_debug_level > 0) this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, - qd->msg_block_->rd_ptr ()), + reinterpret_cast<u_char *> (qd->msg_block_->rd_ptr ()), qd->msg_block_->length ()); @@ -735,8 +474,7 @@ TAO_GIOP_Message_Base::process_reply_message ( if (TAO_debug_level > 0) this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, - qd->msg_block_->rd_ptr ()), + reinterpret_cast<u_char *>(qd->msg_block_->rd_ptr ()), qd->msg_block_->length ()); @@ -901,7 +639,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, parse_error = parser->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1347,6 +1087,7 @@ TAO_GIOP_Message_Base::set_state ( // orderly disconnect as provided by TCP. This quality of service is // required to write robust distributed systems.) +#if 0 void TAO_GIOP_Message_Base:: send_close_connection (const TAO_GIOP_Message_Version &version, @@ -1427,6 +1168,7 @@ TAO_GIOP_Message_Base:: transport-> id ())); } +#endif int @@ -1520,14 +1262,14 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, #if !defined (ACE_DISABLE_SWAP_ON_READ) if (byte_order == TAO_ENCAP_BYTE_ORDER) { - id = ACE_reinterpret_cast (ACE_CDR::ULong*, tmp_id); + id = reinterpret_cast<ACE_CDR::ULong*> (tmp_id); } else { - ACE_CDR::swap_4 (tmp_id, ACE_reinterpret_cast (char*,id)); + ACE_CDR::swap_4 (tmp_id, reinterpret_cast<char*> (id)); } #else - id = ACE_reinterpret_cast(ACE_CDR::ULong*, tmp_id); + id = reinterpret_cast<ACE_CDR::ULong*> (tmp_id); #endif /* ACE_DISABLE_SWAP_ON_READ */ } @@ -1584,44 +1326,49 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -TAO_Queued_Data * -TAO_GIOP_Message_Base::make_queued_data (size_t sz) +void +TAO_GIOP_Message_Base::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const { - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // @@todo: We have a similar method in Transport.cpp. Need to see how - // we can factor them out.. - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - + // @@CJC: Try leaving out the declaration for this->message_state_ + // and see what pukes. I don't think we need it any more. + TAO_GIOP_Message_State state; + if (state.take_values_from_message_block (mb) == -1) + { + // what the heck do we do here?! + qd->current_state_ = TAO_Queued_Data::INVALID; + return; + } - return qd; + // It'd be nice to have an abstract base for GIOP_Message_State + // so that there could just be a line like: + // qd->take_values_from (state); + // Get the message information + qd->byte_order_ = state.byte_order (); + qd->major_version_ = state.giop_version ().major; + qd->minor_version_ = state.giop_version ().minor; + qd->more_fragments_ = state.more_fragments () ? 1 : 0; + qd->request_id_ = state.request_id_; + qd->msg_type_= message_type (state); + qd->missing_data_bytes_ = state.payload_size (); } -size_t -TAO_GIOP_Message_Base::header_length (void) const +int +TAO_GIOP_Message_Base::check_for_valid_header ( + const ACE_Message_Block &mb + ) const { - return TAO_GIOP_MESSAGE_HEADER_LEN; + // NOTE! We don't hardcode the length of the header b/c header_length should + // be eligible for inlining by pretty much any compiler, and it should return + // a constant. The rest of this method is hard-coded and hand-optimized because + // this method gets called A LOT. + if (mb.length () < this->header_length ()) + return -1; + + // Is finding that it's the right length and the magic bytes present + // enough to declare it a valid header? I think so... + register const char* h = mb.rd_ptr (); + return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P'); } |