diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 353 |
1 files changed, 301 insertions, 52 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 7ada6c903ee..76aeeaee51e 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, - this) + this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, @@ -295,7 +295,6 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) - const { // Convert to the right type of Pluggable Messaging message type. @@ -330,6 +329,264 @@ TAO_GIOP_Message_Base::message_type ( } int +TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) +{ + + if (this->message_state_.parse_message_header (incoming) == -1) + { + return -1; + } + + return 0; +} + +ssize_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) +{ + // Actual message size including the header.. + CORBA::ULong msg_size = + this->message_state_.message_size (); + + size_t len = incoming.length (); + + // If we have too many messages or if we have less than even a size + // of the GIOP header then .. + if (len > msg_size || + len < TAO_GIOP_MESSAGE_HEADER_LEN) + { + return -1; + } + else if (len == msg_size) + return 0; + + return msg_size - len; +} + + +int +TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) +{ + TAO_GIOP_Message_State state (this->orb_core_, + this); + + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (incoming.length () > 0) + { + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. + qd = + this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); + + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + qd->missing_data_ = -1; + } + return 0; + } + + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + size_t copying_len = state.message_size (); + + qd = this->make_queued_data (copying_len); + + if (copying_len > incoming.length ()) + { + qd->missing_data_ = + copying_len - incoming.length (); + + copying_len = incoming.length (); + } + + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); + + incoming.rd_ptr (copying_len); + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + return 1; +} + +int +TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) +{ + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == -1) + { + // The data length that has been stuck in there during the last + // read .... + size_t len = + qd->msg_block_->length (); + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the <incoming> into the message block in <qd> + qd->msg_block_->copy (incoming.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN - len); + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); + + TAO_GIOP_Message_State state (this->orb_core_, + this); + + // Parse the message header now... + if (state.parse_message_header (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + if (qd->msg_block_->space () < state.message_size ()) + { + ACE_CDR::grow (qd->msg_block_, + state.message_size ()); + } + + // Copy the pay load.. + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = + state.payload_size (); + + // If the data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + // ..now we are set to copy the right amount of data to the + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + incoming.rd_ptr (copy_len); + + // Get the other details... + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // Copy the right amount of data in to the node... + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + + } + + + return 0; +} + + +int +TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd) +{ + if (dqd->byte_order_ != sqd->byte_order_ + || dqd->major_version_ != sqd->major_version_ + || dqd->minor_version_ != sqd->minor_version_) + { + // Yes, print it out in all debug levels!. This is an error by + // CORBA 2.4 spec + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) incompatible fragments:") + ACE_TEXT ("different GIOP versions or byte order\n"))); + return -1; + } + + // Skip the header in the incoming message + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + // If we have a fragment header skip the header length too.. + if (sqd->minor_version_ == 2 && + sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + + // Get the length of the incoming message block.. + size_t incoming_length = + sqd->msg_block_->length (); + + // Increase the size of the destination message block if we need + // to. + ACE_Message_Block *mb = + dqd->msg_block_; + + // Check space before growing. + if (mb->space () < incoming_length) + { + ACE_CDR::grow (mb, + mb->length () + incoming_length); + } + + // Copy the data + dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), + incoming_length); + return 0; +} + +void +TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) +{ + // Get the message information + qd->byte_order_ = + this->message_state_.byte_order_; + qd->major_version_ = + this->message_state_.giop_version_.major; + qd->minor_version_ = + this->message_state_.giop_version_.minor; + + //qd->more_fragments_ = this->message_state_.more_fragments_; + + if (this->message_state_.more_fragments_) + qd->more_fragments_ = 1; + else + qd->more_fragments_ = 0; + + qd->msg_type_= + this->message_type (this->message_state_); + + // Reset the message_state + this->message_state_.reset (); +} + +int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -507,13 +764,13 @@ TAO_GIOP_Message_Base::process_reply_message ( // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, - params); + params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, - params); + params); break; default: retval = -1; @@ -641,9 +898,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, parse_error = parser->parse_request_header (request); - TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); - if (csm) - csm->process_service_context(request); + request.orb_core()->codeset_manager()->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -843,9 +1098,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), - locate_request.object_key ().length (), - locate_request.object_key ().get_buffer (), - 0); + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); // Set it to an error state parse_error = 1; @@ -1069,7 +1324,6 @@ TAO_GIOP_Message_Base::set_state ( } -#if 0 // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. @@ -1165,7 +1419,7 @@ TAO_GIOP_Message_Base:: transport-> id ())); } -#endif + int TAO_GIOP_Message_Base::send_reply_exception ( @@ -1322,49 +1576,44 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -void -TAO_GIOP_Message_Base::set_queued_data_from_message_header ( - TAO_Queued_Data *qd, - const ACE_Message_Block &mb - ) const +TAO_Queued_Data * +TAO_GIOP_Message_Base::make_queued_data (size_t sz) { - // @@CJC: Try leaving out the declaration for this->message_state_ - // and see what pukes. I don't think we need it any more. - TAO_GIOP_Message_State state; - if (state.take_values_from_message_block (mb) == -1) - { - // what the heck do we do here?! - qd->current_state_ = TAO_Queued_Data::INVALID; - return; - } + // Get a node for the queue.. + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data ( + this->orb_core_->transport_message_buffer_allocator ()); - // It'd be nice to have an abstract base for GIOP_Message_State - // so that there could just be a line like: - // qd->take_values_from (state); - // Get the message information - qd->byte_order_ = state.byte_order (); - qd->major_version_ = state.giop_version ().major; - qd->minor_version_ = state.giop_version ().minor; - qd->more_fragments_ = state.more_fragments () ? 1 : 0; - qd->request_id_ = state.request_id_; - qd->msg_type_= message_type (state); - qd->missing_data_bytes_ = state.payload_size (); + // @@todo: We have a similar method in Transport.cpp. Need to see how + // we can factor them out.. + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (sz + + ACE_CDR::MAX_ALIGNMENT); + + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); + + ACE_Message_Block mb (db, + 0, + alloc); + + ACE_Message_Block *new_mb = mb.duplicate (); + + ACE_CDR::mb_align (new_mb); + + qd->msg_block_ = new_mb; + + + return qd; } -int -TAO_GIOP_Message_Base::check_for_valid_header ( - const ACE_Message_Block &mb - ) const +size_t +TAO_GIOP_Message_Base::header_length (void) const { - // NOTE! We don't hardcode the length of the header b/c header_length should - // be eligible for inlining by pretty much any compiler, and it should return - // a constant. The rest of this method is hard-coded and hand-optimized because - // this method gets called A LOT. - if (mb.length () < this->header_length ()) - return -1; - - // Is finding that it's the right length and the magic bytes present - // enough to declare it a valid header? I think so... - register const char* h = mb.rd_ptr (); - return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0; + return TAO_GIOP_MESSAGE_HEADER_LEN; } |