summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
authorbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-06 04:41:00 +0000
committerbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-06 04:41:00 +0000
commitd8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8 (patch)
tree3874c3e46e81a1a1c5a6c459720e1c17cab62da2 /TAO/tao/Transport.cpp
parent08c2939a52133c144b5f17b7f8556b5dc046c0b0 (diff)
downloadATCD-d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8.tar.gz
ChangeLogTag: Thu Jul 5 23:30:07 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp674
1 files changed, 673 insertions, 1 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index eff531ec2df..250689c9a37 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -1,6 +1,7 @@
// -*- C++ -*-
// $Id$
+
#include "Transport.h"
#include "Exception.h"
@@ -17,6 +18,7 @@
#include "Flushing_Strategy.h"
#include "Transport_Cache_Manager.h"
#include "debug.h"
+#include "Resume_Handle.h"
#include "ace/Message_Block.h"
@@ -26,6 +28,7 @@
ACE_RCSID(tao, Transport, "$Id$")
+
TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount)
: ACE_Refcountable (refcount)
, refcount_lock_ (lock)
@@ -67,6 +70,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
+ , incoming_message_queue_ (orb_core)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -121,7 +125,7 @@ TAO_Transport::~TAO_Transport (void)
}
int
-TAO_Transport::handle_output ()
+TAO_Transport::handle_output (void)
{
if (TAO_debug_level > 4)
{
@@ -783,6 +787,674 @@ TAO_Transport::generate_request_header (
}
int
+TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
+ ACE_Time_Value * max_wait_time,
+ int /*block*/)
+{
+ // First try to process messages of the head of the incoming queue.
+ int retval = this->process_queue_head (rh);
+
+ if (retval <= 0)
+ {
+ if (retval == -1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) TAO::handle_input_i,"
+ "error while parsing the head of the queue \n"));
+
+ this->tms_->connection_closed ();
+ }
+ return retval;
+ }
+
+ // If there are no messages then we can go ahead to read from the
+ // handle for further reading..
+
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [TAO_CONNECTION_HANDLER_STACK_BUF_SIZE];
+
+#if defined (ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_HAS_PURIFY */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_msgblock_allocator ());
+
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n = this->recv (message_block.rd_ptr (),
+ message_block.space (),
+ max_wait_time);
+
+ // If there is an error return to the reactor..
+ if (n <= 0)
+ {
+ if (n == -1)
+ this->tms_->connection_closed ();
+
+ return n;
+ }
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) Read [%d] bytes \n",
+ n));
+ }
+
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
+
+ // Parse the message and try consolidating the message if
+ // needed.
+ retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
+
+ if (retval <= 0)
+ {
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport::handle_input_i "
+ "error while parsing and consolidating \n"));
+ }
+ return retval;
+ }
+
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block);
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Resume before starting to process the request..
+ rh.resume_handle ();
+
+ // Process the message
+ return this->process_parsed_messages (&qd);
+}
+
+int
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (block) == -1)
+ return -1;
+
+ // Check whether we have a complete message for processing
+ ssize_t missing_data = this->missing_data (block);
+
+ if (missing_data < 0)
+ {
+ // If we have more than one message
+ return this->consolidate_extra_messages (block,
+ rh);
+ }
+ else if (missing_data > 0)
+ {
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
+
+ return 1;
+}
+
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (this->incoming_message_queue_.is_tail_complete () != 0)
+ {
+ // As it looks like a new message has been read, process the
+ // message. Call the messaging object to do the parsing..
+ int retval =
+ this->messaging_object ()->parse_incoming_messages (block);
+
+ if (retval == -1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - error in incoming message \n")));
+
+ this->tms_->connection_closed ();
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->incoming_message_queue_.missing_data_tail ();
+ }
+
+ return this->messaging_object ()->missing_data (incoming);
+}
+
+
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Check whether the last message in the queue is complete..
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ rh,
+ max_wait_time);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n",
+ this->id ()));
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t n = this->recv (incoming.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // If we got an EWOULDBLOCK or some other error..
+ if (n <= 0)
+ {
+ if (n == -1)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Trasport::consolidate_message,"
+ "error while trying to consolidate \n"));
+ }
+ this->tms_->connection_closed ();
+ }
+
+ return n;
+ }
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // ..Decrement
+ missing_data -= n;
+
+ if (missing_data > 0)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
+ "insufficient read, queueing up the message \n",
+ this->id ()));
+ }
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ();
+
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
+
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = incoming.duplicate ();
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ return 0;
+ }
+
+ // Check to see if we have messages in queue. AT this point we
+ // cannot have have semi-complete messages in the queue as they
+ // would have been taken care before
+ if (this->incoming_message_queue_.queue_length ())
+ {
+ // If we have messages in the queue, put the <incoming> in the
+ // queue
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
+ " queueing up the message \n",
+ this->id ()));
+ }
+
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ();
+
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = incoming.duplicate ();
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // Process one on the head of the queue and return
+ return this->process_queue_head (rh);
+ }
+
+
+ // We dont have any missing data. Just make a queued_data node with
+ // the existing message block and send it to the higher layers of
+ // the ORB.
+ TAO_Queued_Data pqd (&incoming);
+ pqd.missing_data_ = missing_data;
+ this->messaging_object ()->get_message_data (&pqd);
+
+ // Resume the handle before processing the request
+ rh.resume_handle ();
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd);
+}
+
+int
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n",
+ this->id ()));
+ }
+
+ // If the queue did not have a complete message put this piece of
+ // message in the queue. We kow it did not have a complete
+ // message. That is why we are here.
+ size_t n = this->incoming_message_queue_.copy_tail (incoming);
+
+ // Update the missing data...
+ missing_data = this->incoming_message_queue_.missing_data_tail ();
+
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
+
+ // If we have some more information left in the message block..
+ if (incoming.length ())
+ {
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
+
+ // If there is an error return
+ if (retval == -1)
+ {
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Error while consolidating... \n",
+ "TAO (%P|%t) - .. part of the read message \n"));
+ }
+ return retval;
+ }
+
+ // parse_consolidate_messages () would have processed one of the
+ // messages, so we better return as we dont want to starve other
+ // threads.
+ return 0;
+ }
+
+ // If we still have some missing data..
+ if (missing_data > 0)
+ {
+ // Get the last message from the Queue
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ // Try to do a read again. If we have some luck it would be
+ // great..
+ ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // Error...
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ qd->msg_block_->wr_ptr (n);
+
+ // Decrement the missing data
+ qd->missing_data_ -= n;
+
+ // Now put the TAO_Queued_Data back in the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ // Process a message in the head of the queue if we have one..
+ return this->process_queue_head (rh);
+}
+
+
+int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block
+ &incoming,
+ TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n",
+ this->id ()));
+ }
+
+ // Pick the tail of the queue
+ TAO_Queued_Data *tail =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (tail)
+ {
+ // If we have a node in the tail, checek to see whether it needs
+ // consolidation. If so, just consolidate it.
+ if (this->messaging_object ()->consolidate_node (tail,
+ incoming) == -1)
+ return -1;
+
+ // .. put the tail back in queue..
+ this->incoming_message_queue_.enqueue_tail (tail);
+ }
+
+ int retval = 1;
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n"
+ "..............extracting extra messages \n",
+ this->id ()));
+ }
+
+ // Extract messages..
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
+
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+
+ // In case of error return..
+ if (retval == -1)
+ {
+ return retval;
+ }
+
+ return this->process_queue_head (rh);
+}
+
+int
+TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
+{
+ // Get the <message_type> that we have received
+ TAO_Pluggable_Message_Type t = qd->msg_type_;
+
+ int result = 0;
+ if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("Close Connection Message recd \n")));
+
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
+ {
+ if (this->messaging_object ()->process_request_message (
+ this,
+ qd) == -1)
+ {
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ // @@todo: Maybe the input_cdr can be constructed from the
+ // message_block
+ TAO_Pluggable_Reply_Params params (this->orb_core ());
+
+ if (this->messaging_object ()->process_reply_message (params,
+ qd) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("IIOP_Transport::process_message, ")
+ ACE_TEXT ("process_reply_message ()")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ result = this->tms ()->dispatch_reply (params);
+
+ // @@ Somehow it seems dangerous to reset the state *after*
+ // dispatching the request, what if another threads receives
+ // another reply in the same connection?
+ // My guess is that it works as follows:
+ // - For the exclusive case there can be no such thread.
+ // - The the muxed case each thread has its own message_state.
+ // I'm pretty sure this comment is right. Could somebody else
+ // please look at it and confirm my guess?
+
+ // @@ The above comment was found in the older versions of the
+ // code. The code was also written in such a way that, when
+ // the client thread on a call from handle_input () from the
+ // reactor a call would be made on the handle_client_input
+ // (). The implementation of handle_client_input () looked so
+ // flaky. It used to create a message state upon entry in to
+ // the function using the TMS and destroy that on exit. All
+ // this was fine _theoretically_ for multiple threads. But
+ // the flakiness was originating in the implementation of
+ // get_message_state () where we were creating message state
+ // only once and dishing it out for every thread till one of
+ // them destroy's it. So, it looked broken. That has been
+ // changed. Why?. To my knowledge, the reactor does not call
+ // handle_input () on two threads at the same time. So, IMHO
+ // that defeats the purpose of creating a message state for
+ // every thread. This is just my guess. If we run in to
+ // problems this place needs to be revisited. If someone else
+ // is going to take a look please contact bala@cs.wustl.edu
+ // for details on this-- Bala
+
+ if (result == -1)
+ {
+ // Something really critical happened, we will forget about
+ // every reply on this connection.
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::")
+ ACE_TEXT ("process_message - ")
+ ACE_TEXT ("dispatch reply failed\n")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ if (result == 0)
+ {
+
+ this->messaging_object ()->reset ();
+
+ // The reply dispatcher was no longer registered.
+ // This can happened when the request/reply
+ // times out.
+ // To throw away all registered reply handlers is
+ // not the right thing, as there might be just one
+ // old reply coming in and several valid new ones
+ // pending. If we would invoke <connection_closed>
+ // we would throw away also the valid ones.
+ //return 0;
+ }
+
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ //this->tms_->destroy_message_state (message_state);
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ {
+ return -1;
+ }
+
+ // If not, just return back..
+ return 0;
+}
+
+int
+TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head \n",
+ this->id ()));
+ }
+
+ // See if the message in the head of the queue is complete...
+ if (this->incoming_message_queue_.is_head_complete () == 1)
+ {
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
+
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.is_head_complete () == 1)
+ {
+ // Get the event handler..
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
+ // Get the reactor associated with the event handler
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::notify to Reactor\n",
+ this->id ()));
+ }
+
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+
+ // Send a notification to the reactor...
+ int retval = reactor->notify (eh,
+ ACE_Event_Handler::READ_MASK);
+
+ if (retval < 0 && TAO_debug_level > 2)
+ {
+ // @@todo: need to think about what is the action that
+ // we can take when we get here.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport::process_queue_head ")
+ ACE_TEXT ("notify to the reactor failed.. \n")));
+ }
+
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ rh.resume_handle ();
+ }
+
+ // Process the message...
+ if (this->process_parsed_messages (qd) == -1)
+ return -1;
+
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
+
+ return 0;
+ }
+
+ return 1;
+}
+
+
+int
TAO_Transport::queue_is_empty (void)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);