diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 116 |
1 files changed, 82 insertions, 34 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index f3cb4c0360a..6fe0fb61808 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -888,6 +888,23 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // Extract the data for the node.. this->messaging_object ()->get_message_data (&qd); + // Check whether the message was fragmented.. + if (qd.more_fragments_ || + (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Make a copy of the message that we have + ACE_Data_Block *ndb = + message_block.data_block ()->clone (); + + // Replace the underlying the datablock + message_block.replace_data_block (ndb); + + // Duplicate the node that we have as the node is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); + + return this->consolidate_fragments (nqd, rh); + } + // Resume before starting to process the request.. rh.resume_handle (); @@ -908,6 +925,7 @@ TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, // Check whether we have a complete message for processing ssize_t missing_data = this->missing_data (block); + if (missing_data < 0) { // If we have more than one message @@ -1027,13 +1045,13 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, // ..Decrement missing_data -= n; - if (missing_data > 0) + if (missing_data > 0 || this->incoming_message_queue_.queue_length ()) { if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" - "insufficient read, queueing up the message \n", + "queueing up the message \n", this->id ())); } // Get an instance of TAO_Queued_Data @@ -1051,41 +1069,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, // Add it to the tail of the queue.. this->incoming_message_queue_.enqueue_tail (qd); - return 0; - } - - // Check to see if we have messages in queue. AT this point we - // cannot have have semi-complete messages in the queue as they - // would have been taken care before - if (this->incoming_message_queue_.queue_length ()) - { - // If we have messages in the queue, put the <incoming> in the - // queue - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" - " queueing up the message \n", - this->id ())); - } - - // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); - - // Duplicate the data block before putting it in the queue. - qd->msg_block_ = incoming.duplicate (); - - // Get the rest of the messaging data - this->messaging_object ()->get_message_data (qd); - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - // Process one on the head of the queue and return return this->process_queue_head (rh); } - // We dont have any missing data. Just make a queued_data node with // the existing message block and send it to the higher layers of // the ORB. @@ -1093,6 +1080,17 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, pqd.missing_data_ = missing_data; this->messaging_object ()->get_message_data (&pqd); + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (pqd.more_fragments_ || + (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the queued data as it is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); + + return this->consolidate_fragments (nqd, rh); + } + // Resume the handle before processing the request rh.resume_handle (); @@ -1102,6 +1100,45 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, } int +TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) +{ + // If we have received a fragment message then we have to + // consolidate <qd> with the last message in queue + // @@todo: this piece of logic follows GIOP a bit... Need to revisit + // if we have protocols other than GIOP + + // @@todo: Fragments now have too much copying overhead. Need to get + // them out if we want to have some reasonable performance metrics + // in future.. Post 1.2 seems a nice time.. + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + TAO_Queued_Data *tqd = + this->incoming_message_queue_.dequeue_tail (); + + tqd->more_fragments_ = qd->more_fragments_; + + if (this->messaging_object ()->consolidate_fragments (tqd, + qd) == -1) + return -1; + + TAO_Queued_Data::release (qd); + + this->incoming_message_queue_.enqueue_tail (tqd); + + this->process_queue_head (rh); + } + else + { + // if we dont have a fragment already in the queue just add it in + // the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + return 0; +} + +int TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, @@ -1233,7 +1270,18 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block this->messaging_object ()->extract_next_message (incoming, q_data); if (q_data) - this->incoming_message_queue_.enqueue_tail (q_data); + { + // If we have read a framented message then... + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + this->consolidate_fragments (q_data, rh); + } + else + { + this->incoming_message_queue_.enqueue_tail (q_data); + } + } } // In case of error return.. |