From 97c99a7123f6ac555b1c726d9798d47466bbe56b Mon Sep 17 00:00:00 2001 From: bala Date: Wed, 4 Jul 2001 14:19:38 +0000 Subject: ChangeLogTag:Wed Jul 4 09:20:22 2001 Balachandran Natarajan --- .../orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp | 43 +------ .../orbsvcs/SSLIOP/SSLIOP_Connection_Handler.h | 12 +- TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp | 135 ++------------------- TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.h | 15 ++- TAO/tao/Transport.h | 27 ++++- 5 files changed, 47 insertions(+), 185 deletions(-) diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp index 7134fecd93a..bd7301db559 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp @@ -255,7 +255,7 @@ TAO_SSLIOP_Connection_Handler::fetch_handle (void) int -TAO_IIOP_Connection_Handler::resume_handler (void) +TAO_SSLIOP_Connection_Handler::resume_handler (void) { return TAO_RESUMES_CONNECTION_HANDLER; } @@ -357,47 +357,6 @@ TAO_SSLIOP_Connection_Handler::handle_input (ACE_HANDLE h) } - -int -TAO_SSLIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) -{ - - - // Set up the SSLIOP::Current object. - TAO_SSL_State_Guard ssl_state_guard (this, - this->orb_core (), - result); - if (result == -1) - return -1; - - this->pending_upcalls_++; - - // Call the transport read the message - result = this->transport ()->read_process_message (max_wait_time); - - // Now the message has been read - if (result == -1 && TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SSLIOP_Connection_Handler::read_message \n"))); - - } - - // The upcall is done. Bump down the reference count - if (--this->pending_upcalls_ <= 0) - result = -1; - - if (result == 0 || result == -1) - { - return result; - } - - return 0; -} - - int TAO_SSLIOP_Connection_Handler::setup_ssl_state (TAO_ORB_Core *orb_core) { diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.h b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.h index 7844e35bc69..34a4696cf52 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.h +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.h @@ -100,6 +100,9 @@ public: /// Documented in ACE_Event_Handler virtual int handle_output (ACE_HANDLE); + /// Doumented in ACE_Event_Handler + virtual int resume_handle (void); + /// Add ourselves to Cache. int add_transport_to_cache (void); @@ -119,15 +122,8 @@ protected: /// Reads a message from the , dispatching and servicing it /// appropriately. - /// handle_input() just delegates on handle_input_i() which timeouts - /// after , this is used in thread-per-connection to - /// ensure that server threads eventually exit. - + /// handle_input() just delegates on handle_input_i(). virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); - - protected: diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp index e107f9c4585..b43fd9746c9 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp @@ -74,7 +74,18 @@ TAO_SSLIOP_Transport::handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, int block) { + int result = 0; + + // Set up the SSLIOP::Current object. + TAO_SSL_State_Guard ssl_state_guard (this, + this->orb_core (), + result); + if (result == -1) + return -1; + return TAO_Transport::handle_input_i (rh, + max_wait_time, + block); } ssize_t @@ -288,130 +299,6 @@ TAO_SSLIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) -int -TAO_SSLIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - if (retval <= 0) - return retval; - - // Get the that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message (this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SSLIOP_Transport::process_message, process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - - result = - this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - if (result == -1) - { - // Something really critical happened, we will forget about - // every reply on this connection. - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : SSLIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 1; -} - - void TAO_SSLIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) { diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.h b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.h index eb316a425b1..3cf25a90dc2 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.h +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.h @@ -96,11 +96,13 @@ protected: size_t len, const ACE_Time_Value *s = 0); - /// Read and process the message from the connection. The processing - /// of the message is done by delegating the work to the underlying - /// messaging object - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); + /// Overload of the handle_input_i () in the TAO_Transport + /// class. This is required to set up the state guard. The + /// thread-per-connection and wait on RW strategies call this + /// handle_input_i (). + virtual int handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value *max_time_value = 0, + int block =0); virtual int register_handler_i (void); @@ -142,9 +144,6 @@ public: private: - /// Process the message that we have read - int process_message (void); - /// Set the Bidirectional context info in the service context list void set_bidir_context_info (TAO_Operation_Details &opdetails); diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index d683667ee46..e60c63f1005 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -174,6 +174,30 @@ protected: * chunk of data in one shot. Data could trickle in byte by byte. * (c) Single read gives multiple messages * + * We solve the problems as follows + * + * (a) First do a read with the buffer on stack. Query the underlying + * messaging object whether the message has any incomplete + * portion. If so, we just grow the buffer for the missing size + * and read the rest of the message. We free the handle and then + * send the message to the higher layers of the ORB for + * processing. + * + * (b) If we block (ie. if we receive a EWOULDBLOCK) while trying to + * do the above (ie. trying to read after growing the buffer + * size) we put the message in a queue and return back to the + * reactor. The reactor would call us back when the handle + * becomes read ready. + * + * (c) If we get multiple messages (possible if the client connected + * to the server sends oneways or AMI requests), we parse and + * split the messages. Every message is put in the queue. Once + * the messages are queued, the thread picks up one message to + * send to the higher layers of the ORB. Before doing that, if + * it finds more messages, it sends a notify to the reactor + * without resuming the handle. The next thread picks up a + * message from the queue and processes that. Once the queue + * is drained the last thread resumes the handle. * * See Also: * @@ -689,9 +713,6 @@ public: int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); - // @@ Bala : Add documentation - // int process_message (ACE_Message_Block &message_block) = 0; - private: /// Send some of the data in the queue. /** -- cgit v1.2.1