diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-06-29 23:40:37 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-06-29 23:40:37 +0000 |
commit | b6582bce44f6ef6c80c085a93aa643822571bccc (patch) | |
tree | e96e87b04c73907c716a2861ae7dd214d8826c74 | |
parent | 61767d4431a8a8c1e6a880afef613456eae44671 (diff) | |
download | ATCD-b6582bce44f6ef6c80c085a93aa643822571bccc.tar.gz |
ChangeLogTag:Thu Jun 29 18:30:03 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 9 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 2 | ||||
-rw-r--r-- | TAO/tao/LIST_OF_TODO | 5 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 137 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 6 |
6 files changed, 132 insertions, 35 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 65493035213..84166c92089 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -355,6 +355,9 @@ int TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd) { + //ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) Extracting extra messages... \n")); + TAO_GIOP_Message_State state (this->orb_core_, this); @@ -381,6 +384,9 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, size_t copying_len = state.message_size (); + // ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) ... queueing messages.. \n")); + qd = this->make_queued_data (copying_len); if (copying_len > incoming.length ()) @@ -388,7 +394,7 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, qd->missing_data_ = copying_len - incoming.length (); - copying_len -= incoming.length (); + copying_len = incoming.length (); } qd->msg_block_->copy (incoming.rd_ptr (), diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 3582b151019..727d2e29961 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -22,13 +22,13 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) // Need to delete all the unused data-blocks } -void +size_t TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) { + size_t n = 0; + if (this->size_ > 0) { - size_t n = 0; - if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) { n = block.length (); @@ -40,8 +40,11 @@ TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) this->queued_data_->msg_block_->copy (block.rd_ptr (), n); + this->queued_data_->missing_data_ -= n; } + + return n; } TAO_Queued_Data * diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 91427672e2a..3bb2c25a3cf 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -46,7 +46,7 @@ public: ssize_t missing_data, CORBA::Octet byte_order);*/ - void copy_message (ACE_Message_Block &block); + size_t copy_message (ACE_Message_Block &block); CORBA::ULong queue_length (void); diff --git a/TAO/tao/LIST_OF_TODO b/TAO/tao/LIST_OF_TODO index c51df858af8..3f7cd661c94 100644 --- a/TAO/tao/LIST_OF_TODO +++ b/TAO/tao/LIST_OF_TODO @@ -9,11 +9,10 @@ - AMI tests - run purify quantify - DSI_Gateway tests -- Remove the ORB_Core from the signature of a number of methods of - GIOP_Message_Base class - Go through the code again & again... - Put tms_->close_connection () wherever we get -1 as a retval.. - Looks for a memory leak when we delete a node.. - Change the Output CDR to be on stack.. - Dont we want a cached transport on the server side... -- BIG_REPLY test, needs to be trimmed down a bit...
\ No newline at end of file +- BIG_REPLY test, needs to be trimmed down a bit... +- Look at purge_pending_notifications ()..
\ No newline at end of file diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 319132a151a..1e7f9d61844 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -125,7 +125,7 @@ TAO_Transport::~TAO_Transport (void) } int -TAO_Transport::handle_output () +TAO_Transport::handle_output (void) { if (TAO_debug_level > 4) { @@ -827,47 +827,89 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, max_wait_time); + if (n <= 0) - return n; + { + // Even if we have read failure just try to see if we have a + // message on the head of the queue and try to process that... + this->process_queue_head (rh); + + if (this->incoming_message_queue_.queue_length () == 0) + return n; + return 0; + } + + /* if (TAO_debug_level > 2) + {*/ + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) Read [%d] bytes \n", + n)); + // } // Set the write pointer in the stack buffer message_block.wr_ptr (n); - if (this->parse_incoming_messages (message_block) == -1) + int retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); + + if (retval <= 0) + { + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport::handle_input_i [%d] " + "Error while parsing and consolidating \n", + n)); + } + return retval; + } + + // Make a node of the message block.. + TAO_Queued_Data qd; + qd.msg_block_ = &message_block; + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Process the message + return this->process_parsed_messages (&qd, + rh); +} + +int +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (block) == -1) return -1; // Check whether we have a complete message for processing - ssize_t missing_data = - this->missing_data (message_block); + ssize_t missing_data = this->missing_data (block); if (missing_data < 0) { - return this->consolidate_extra_messages (message_block, + // If we have read too many messages.. + return this->consolidate_extra_messages (block, rh); } else if (missing_data > 0) { - return this->consolidate_message (message_block, + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, missing_data, rh, max_wait_time); } - - TAO_Queued_Data qd; - qd.msg_block_ = &message_block; - qd.missing_data_ = missing_data; - - this->messaging_object ()->get_message_data (&qd); - - // @@Bala: - return this->process_parsed_messages (&qd, - rh); + return 1; } - - int TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) { @@ -990,17 +1032,39 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, // If the queue did not have a complete message put this piece of // message in the queue. We kow it did not have a complete // message. That is why we are here. - this->incoming_message_queue_.copy_message (incoming); + size_t n = this->incoming_message_queue_.copy_message (incoming); + + // Update the missing data... missing_data = this->incoming_message_queue_.missing_data (); + // Move the read pointer of the <incoming> message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); - // @@todo: What will happen if we have a part of the next message in - // the incoming message block? If that is a one-way call we handle - // it differently. We will be in soup if the next message is a - // two-way call. We need to process that too.... Can we call - // process_messages () with rd_ptr () of teh incoming_message (), - // moved? + // If we have some more information left in the message block.. + if (incoming.length ()) + { + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); + + // If there is an error return + if (retval == -1) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Error while consolidating... \n", + "TAO (%P|%t) - .. part of the read message \n")); + } + return retval; + } + } + // If we still have some missing data.. if (missing_data > 0) { // Get the last message from the Queue @@ -1010,6 +1074,8 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, ACE_Message_Block *mb = qd->msg_block_; + // Try to do a read again. If we have some luck it would be + // great.. ssize_t n = this->recv (mb->wr_ptr (), missing_data, max_wait_time); @@ -1028,6 +1094,7 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, this->incoming_message_queue_.enqueue_tail (qd); } + // Process a message in the head of the queue if we have one.. return this->process_queue_head (rh); } @@ -1037,6 +1104,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming, TAO_Resume_Handle &rh) { + //ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) Consolidating multiple messages.. \n")); // Take a message from the tail.. TAO_Queued_Data *tail = this->incoming_message_queue_.dequeue_tail (); @@ -1051,6 +1120,10 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block } int retval = 1; + + //ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) Extracting multiple messages.. \n")); + while (retval == 1) { TAO_Queued_Data *q_data = 0; @@ -1209,6 +1282,13 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head \n", + this->id ())); + } + // See if the message in the head of the queue is complete... if (this->incoming_message_queue_.is_head_complete () == 1) { @@ -1237,6 +1317,7 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) } // Send a notification to the reactor... + // @@ What do we do if the notify to the reactor fails?? reactor->notify (eh, ACE_Event_Handler::READ_MASK); } @@ -1250,9 +1331,11 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) // Delete the Queued_Data.. delete qd; + + return 0; } - return 0; + return -1; } diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index f7fe3f6f3f2..965547d3369 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -568,7 +568,13 @@ protected: */ virtual void transition_handler_state_i (void) = 0; + /// @@ Bala: Documentation + + int parse_consolidate_messages (ACE_Message_Block &bl, + TAO_Resume_Handle &rh, + ACE_Time_Value *time); + int parse_incoming_messages (ACE_Message_Block &message_block); size_t missing_data (ACE_Message_Block &message_block); |