diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-06 04:41:00 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-06 04:41:00 +0000 |
commit | d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8 (patch) | |
tree | 3874c3e46e81a1a1c5a6c459720e1c17cab62da2 /TAO/tao/Transport.cpp | |
parent | 08c2939a52133c144b5f17b7f8556b5dc046c0b0 (diff) | |
download | ATCD-d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8.tar.gz |
ChangeLogTag: Thu Jul 5 23:30:07 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 674 |
1 files changed, 673 insertions, 1 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index eff531ec2df..250689c9a37 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1,6 +1,7 @@ // -*- C++ -*- // $Id$ + #include "Transport.h" #include "Exception.h" @@ -17,6 +18,7 @@ #include "Flushing_Strategy.h" #include "Transport_Cache_Manager.h" #include "debug.h" +#include "Resume_Handle.h" #include "ace/Message_Block.h" @@ -26,6 +28,7 @@ ACE_RCSID(tao, Transport, "$Id$") + TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) : ACE_Refcountable (refcount) , refcount_lock_ (lock) @@ -67,6 +70,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) + , incoming_message_queue_ (orb_core) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) @@ -121,7 +125,7 @@ TAO_Transport::~TAO_Transport (void) } int -TAO_Transport::handle_output () +TAO_Transport::handle_output (void) { if (TAO_debug_level > 4) { @@ -783,6 +787,674 @@ TAO_Transport::generate_request_header ( } int +TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time, + int /*block*/) +{ + // First try to process messages of the head of the incoming queue. + int retval = this->process_queue_head (rh); + + if (retval <= 0) + { + if (retval == -1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) TAO::handle_input_i," + "error while parsing the head of the queue \n")); + + this->tms_->connection_closed (); + } + return retval; + } + + // If there are no messages then we can go ahead to read from the + // handle for further reading.. + + // The buffer on the stack which will be used to hold the input + // messages + char buf [TAO_CONNECTION_HANDLER_STACK_BUF_SIZE]; + +#if defined (ACE_HAS_PURIFY) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_HAS_PURIFY */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_msgblock_allocator ()); + + + // Align the message block + ACE_CDR::mb_align (&message_block); + + + // Read the message into the message block that we have created on + // the stack. + ssize_t n = this->recv (message_block.rd_ptr (), + message_block.space (), + max_wait_time); + + // If there is an error return to the reactor.. + if (n <= 0) + { + if (n == -1) + this->tms_->connection_closed (); + + return n; + } + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) Read [%d] bytes \n", + n)); + } + + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); + + // Parse the message and try consolidating the message if + // needed. + retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); + + if (retval <= 0) + { + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport::handle_input_i " + "error while parsing and consolidating \n")); + } + return retval; + } + + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block); + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Resume before starting to process the request.. + rh.resume_handle (); + + // Process the message + return this->process_parsed_messages (&qd); +} + +int +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (block) == -1) + return -1; + + // Check whether we have a complete message for processing + ssize_t missing_data = this->missing_data (block); + + if (missing_data < 0) + { + // If we have more than one message + return this->consolidate_extra_messages (block, + rh); + } + else if (missing_data > 0) + { + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, + missing_data, + rh, + max_wait_time); + } + + return 1; +} + +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (this->incoming_message_queue_.is_tail_complete () != 0) + { + // As it looks like a new message has been read, process the + // message. Call the messaging object to do the parsing.. + int retval = + this->messaging_object ()->parse_incoming_messages (block); + + if (retval == -1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - error in incoming message \n"))); + + this->tms_->connection_closed (); + return -1; + } + } + + return 0; +} + + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->incoming_message_queue_.missing_data_tail (); + } + + return this->messaging_object ()->missing_data (incoming); +} + + +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Check whether the last message in the queue is complete.. + if (this->incoming_message_queue_.is_tail_complete () == 0) + return this->consolidate_message_queue (incoming, + missing_data, + rh, + max_wait_time); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n", + this->id ())); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t n = this->recv (incoming.wr_ptr (), + missing_data, + max_wait_time); + + // If we got an EWOULDBLOCK or some other error.. + if (n <= 0) + { + if (n == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Trasport::consolidate_message," + "error while trying to consolidate \n")); + } + this->tms_->connection_closed (); + } + + return n; + } + + // Move the write pointer + incoming.wr_ptr (n); + + // ..Decrement + missing_data -= n; + + if (missing_data > 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + "insufficient read, queueing up the message \n", + this->id ())); + } + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); + + // Add the missing data to the queue + qd->missing_data_ = missing_data; + + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = incoming.duplicate (); + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + return 0; + } + + // Check to see if we have messages in queue. AT this point we + // cannot have have semi-complete messages in the queue as they + // would have been taken care before + if (this->incoming_message_queue_.queue_length ()) + { + // If we have messages in the queue, put the <incoming> in the + // queue + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + " queueing up the message \n", + this->id ())); + } + + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); + + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = incoming.duplicate (); + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + // Process one on the head of the queue and return + return this->process_queue_head (rh); + } + + + // We dont have any missing data. Just make a queued_data node with + // the existing message block and send it to the higher layers of + // the ORB. + TAO_Queued_Data pqd (&incoming); + pqd.missing_data_ = missing_data; + this->messaging_object ()->get_message_data (&pqd); + + // Resume the handle before processing the request + rh.resume_handle (); + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd); +} + +int +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n", + this->id ())); + } + + // If the queue did not have a complete message put this piece of + // message in the queue. We kow it did not have a complete + // message. That is why we are here. + size_t n = this->incoming_message_queue_.copy_tail (incoming); + + // Update the missing data... + missing_data = this->incoming_message_queue_.missing_data_tail (); + + // Move the read pointer of the <incoming> message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); + + // If we have some more information left in the message block.. + if (incoming.length ()) + { + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); + + // If there is an error return + if (retval == -1) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Error while consolidating... \n", + "TAO (%P|%t) - .. part of the read message \n")); + } + return retval; + } + + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. + return 0; + } + + // If we still have some missing data.. + if (missing_data > 0) + { + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); + + // Try to do a read again. If we have some luck it would be + // great.. + ssize_t n = this->recv (qd->msg_block_->wr_ptr (), + missing_data, + max_wait_time); + + // Error... + if (n <= 0) + return n; + + // Move the write pointer + qd->msg_block_->wr_ptr (n); + + // Decrement the missing data + qd->missing_data_ -= n; + + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + // Process a message in the head of the queue if we have one.. + return this->process_queue_head (rh); +} + + +int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n", + this->id ())); + } + + // Pick the tail of the queue + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); + + if (tail) + { + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. + if (this->messaging_object ()->consolidate_node (tail, + incoming) == -1) + return -1; + + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } + + int retval = 1; + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n" + "..............extracting extra messages \n", + this->id ())); + } + + // Extract messages.. + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; + + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + this->incoming_message_queue_.enqueue_tail (q_data); + } + + // In case of error return.. + if (retval == -1) + { + return retval; + } + + return this->process_queue_head (rh); +} + +int +TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) +{ + // Get the <message_type> that we have received + TAO_Pluggable_Message_Type t = qd->msg_type_; + + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) + { + if (this->messaging_object ()->process_request_message ( + this, + qd) == -1) + { + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + } + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + // @@todo: Maybe the input_cdr can be constructed from the + // message_block + TAO_Pluggable_Reply_Params params (this->orb_core ()); + + if (this->messaging_object ()->process_reply_message (params, + qd) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("IIOP_Transport::process_message, ") + ACE_TEXT ("process_reply_message ()"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + result = this->tms ()->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + // Something really critical happened, we will forget about + // every reply on this connection. + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::") + ACE_TEXT ("process_message - ") + ACE_TEXT ("dispatch reply failed\n"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + if (result == 0) + { + + this->messaging_object ()->reset (); + + // The reply dispatcher was no longer registered. + // This can happened when the request/reply + // times out. + // To throw away all registered reply handlers is + // not the right thing, as there might be just one + // old reply coming in and several valid new ones + // pending. If we would invoke <connection_closed> + // we would throw away also the valid ones. + //return 0; + } + + + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } + + // If not, just return back.. + return 0; +} + +int +TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head \n", + this->id ())); + } + + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () == 1) + { + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.is_head_complete () == 1) + { + // Get the event handler.. + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + // Get the reactor associated with the event handler + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::notify to Reactor\n", + this->id ())); + } + + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + + // Send a notification to the reactor... + int retval = reactor->notify (eh, + ACE_Event_Handler::READ_MASK); + + if (retval < 0 && TAO_debug_level > 2) + { + // @@todo: need to think about what is the action that + // we can take when we get here. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport::process_queue_head ") + ACE_TEXT ("notify to the reactor failed.. \n"))); + } + + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + rh.resume_handle (); + } + + // Process the message... + if (this->process_parsed_messages (qd) == -1) + return -1; + + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); + + return 0; + } + + return 1; +} + + +int TAO_Transport::queue_is_empty (void) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); |