summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp116
1 files changed, 82 insertions, 34 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index f3cb4c0360a..6fe0fb61808 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -888,6 +888,23 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// Extract the data for the node..
this->messaging_object ()->get_message_data (&qd);
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Make a copy of the message that we have
+ ACE_Data_Block *ndb =
+ message_block.data_block ()->clone ();
+
+ // Replace the underlying the datablock
+ message_block.replace_data_block (ndb);
+
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
// Resume before starting to process the request..
rh.resume_handle ();
@@ -908,6 +925,7 @@ TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
// Check whether we have a complete message for processing
ssize_t missing_data = this->missing_data (block);
+
if (missing_data < 0)
{
// If we have more than one message
@@ -1027,13 +1045,13 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
// ..Decrement
missing_data -= n;
- if (missing_data > 0)
+ if (missing_data > 0 || this->incoming_message_queue_.queue_length ())
{
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
- "insufficient read, queueing up the message \n",
+ "queueing up the message \n",
this->id ()));
}
// Get an instance of TAO_Queued_Data
@@ -1051,41 +1069,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
// Add it to the tail of the queue..
this->incoming_message_queue_.enqueue_tail (qd);
- return 0;
- }
-
- // Check to see if we have messages in queue. AT this point we
- // cannot have have semi-complete messages in the queue as they
- // would have been taken care before
- if (this->incoming_message_queue_.queue_length ())
- {
- // If we have messages in the queue, put the <incoming> in the
- // queue
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
- " queueing up the message \n",
- this->id ()));
- }
-
- // Get an instance of TAO_Queued_Data
- TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ();
-
- // Duplicate the data block before putting it in the queue.
- qd->msg_block_ = incoming.duplicate ();
-
- // Get the rest of the messaging data
- this->messaging_object ()->get_message_data (qd);
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
// Process one on the head of the queue and return
return this->process_queue_head (rh);
}
-
// We dont have any missing data. Just make a queued_data node with
// the existing message block and send it to the higher layers of
// the ORB.
@@ -1093,6 +1080,17 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
pqd.missing_data_ = missing_data;
this->messaging_object ()->get_message_data (&pqd);
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (pqd.more_fragments_ ||
+ (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
// Resume the handle before processing the request
rh.resume_handle ();
@@ -1102,6 +1100,45 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
}
int
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh)
+{
+ // If we have received a fragment message then we have to
+ // consolidate <qd> with the last message in queue
+ // @@todo: this piece of logic follows GIOP a bit... Need to revisit
+ // if we have protocols other than GIOP
+
+ // @@todo: Fragments now have too much copying overhead. Need to get
+ // them out if we want to have some reasonable performance metrics
+ // in future.. Post 1.2 seems a nice time..
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ TAO_Queued_Data *tqd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ tqd->more_fragments_ = qd->more_fragments_;
+
+ if (this->messaging_object ()->consolidate_fragments (tqd,
+ qd) == -1)
+ return -1;
+
+ TAO_Queued_Data::release (qd);
+
+ this->incoming_message_queue_.enqueue_tail (tqd);
+
+ this->process_queue_head (rh);
+ }
+ else
+ {
+ // if we dont have a fragment already in the queue just add it in
+ // the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ return 0;
+}
+
+int
TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
ssize_t missing_data,
TAO_Resume_Handle &rh,
@@ -1233,7 +1270,18 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
this->messaging_object ()->extract_next_message (incoming,
q_data);
if (q_data)
- this->incoming_message_queue_.enqueue_tail (q_data);
+ {
+ // If we have read a framented message then...
+ if (q_data->more_fragments_ ||
+ q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ this->consolidate_fragments (q_data, rh);
+ }
+ else
+ {
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+ }
}
// In case of error return..