From 838d1b07dd512b7d383084a5e2dda149444f3eb2 Mon Sep 17 00:00:00 2001 From: bala Date: Tue, 3 Jul 2001 20:47:05 +0000 Subject: ChangeLogTag:Tue Jul 03 03:45:03 2001 Balachandran Natarajan --- TAO/tao/Resume_Handle.cpp | 3 +- .../Strategies/GIOP_Message_NonReactive_Base.cpp | 28 ++- TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h | 23 +- .../Strategies/GIOP_Message_NonReactive_Handler.h | 3 +- TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp | 37 +--- TAO/tao/Strategies/SHMIOP_Connection_Handler.h | 2 - TAO/tao/Strategies/SHMIOP_Transport.cpp | 237 ++++++++------------- TAO/tao/Strategies/SHMIOP_Transport.h | 13 +- TAO/tao/Strategies/TAO_Strategies.dsp | 16 -- TAO/tao/Transport.h | 10 +- 10 files changed, 144 insertions(+), 228 deletions(-) diff --git a/TAO/tao/Resume_Handle.cpp b/TAO/tao/Resume_Handle.cpp index d5c9b1e9b24..af94eefa37e 100644 --- a/TAO/tao/Resume_Handle.cpp +++ b/TAO/tao/Resume_Handle.cpp @@ -13,7 +13,8 @@ TAO_Resume_Handle::resume_handle (void) { // If we have a complete message, just resume the handler // Resume the handler. - if (this->orb_core_->reactor ()->resumable_handler () && + if (this->orb_core_ && + this->orb_core_->reactor ()->resumable_handler () && this->flag_ == TAO_HANDLE_RESUMABLE && this->handle_ != ACE_INVALID_HANDLE) this->orb_core_->reactor ()->resume_handler (this->handle_); diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp index 84f5a560fc2..1c4f044eebd 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp @@ -8,10 +8,9 @@ ACE_RCSID (Strategies, GIOP_Message_NonReactive_Base, "$Id$") -TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core, - size_t buf_size) - : TAO_GIOP_Message_Base (orb_core, buf_size), - message_handler_ (orb_core, this, buf_size) +TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core) + + : TAO_GIOP_Message_Base (orb_core) { } @@ -19,14 +18,18 @@ TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Co int TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport, + ACE_Message_Block &block, int /*block*/, ACE_Time_Value *max_wait_time) { // Call the handler to read and do a simple parse of the header of // the message. int retval = - this->message_handler_.read_parse_message (transport, - max_wait_time); + this->read_data (transport, + max_wait_time); + + // Before we do this let us reset the + char *buf = this->input_cdr_.rd_ptr (); // Error in the message that was received @@ -61,6 +64,19 @@ TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport, return 2; } + +size_t +TAO_GIOP_Message_NonReactive_Base::read_data (TAO_Transport *transport, + ACE_Time_Value *time) +{ + + transport->recv (buf, + n, + max_wait_time); + +} + + TAO_Pluggable_Message_Type TAO_GIOP_Message_NonReactive_Base::message_type (void) { diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h index d6e8b434ecd..c7ddb348117 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h @@ -28,21 +28,23 @@ class TAO_Pluggable_Reply_Params; /** * @class TAO_GIOP_Message_NonReactive_Base * - * @brief Uses the NonReactive handler class for reading messages. + * @brief Uses the NonReactive mechanism to read and process + * messages. * - * This class uses the TAO_GIOP_Message_NonReactive_Handler class to - * read and parse messages. This class derives from - * TAO_GIOP_Message_Base. It just redirects most of the functions to - * the base class but just acts as a sort of place holder for the - * NonReactive handler class. + * Some protocols based on shared memory cannot make use of the + * reactor as other protocols based on TCP/IP. This class is a relief + * for such protocols. This effectively does the following + * - reads the GIOP header out of the transport + * - processes the header to determine the length of the message + * and other details. + * - reads the body of the message from the transport + * - passes the data to the base class for making the upcall. */ class TAO_Strategies_Export TAO_GIOP_Message_NonReactive_Base :public TAO_GIOP_Message_Base { public: - friend class TAO_GIOP_Message_NonReactive_Handler; - /// Constructor TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core, size_t cdr_size = ACE_CDR::DEFAULT_BUFSIZE); @@ -82,9 +84,8 @@ public: private: - /// Thr message handler object that does reading and parsing of the - /// incoming messages - TAO_GIOP_Message_NonReactive_Handler message_handler_; + /// The input cdr stream in which the incoming data is stored. + TAO_InputCDR input_cdr_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h index 02b93da7d47..a8d9642a1b1 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h @@ -77,8 +77,7 @@ private: /// Our Message base TAO_GIOP_Message_NonReactive_Base *mesg_base_; - /// The input cdr stream in which the incoming data is stored. - TAO_InputCDR input_cdr_; + }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp index fd7fb9db6e6..6ab063aa2e8 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -10,12 +10,11 @@ #include "tao/ORB.h" #include "tao/CDR.h" #include "tao/Messaging_Policy_i.h" -#include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" #include "tao/Server_Strategy_Factory.h" #include "tao/Base_Transport_Property.h" #include "tao/Transport_Cache_Manager.h" #include "SHMIOP_Endpoint.h" +#include "tao/Resume_Handle.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Connection_Handler.inl" @@ -251,42 +250,26 @@ TAO_SHMIOP_Connection_Handler::add_transport_to_cache (void) int TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_i (h); -} - - -int -TAO_SHMIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) -{ + // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); - - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Connection_Handler::read_message \n"))); + TAO_Resume_Handle resume_handle (this->orb_core (), + h); - } + int retval = this->transport ()->handle_input_i (resume_handle); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; - - if (result == 0 || result == -1) - { - return result; - } + retval = -1; - return 0; + return retval; } + + + // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h index cf9c2fe3549..59a0469cdbc 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h @@ -104,8 +104,6 @@ protected: /// ensure that server threads eventually exit. virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); private: diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 81c803e0a41..8e3fd644f43 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -15,9 +15,9 @@ #include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" +#include "tao/Resume_Handle.h" +#include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" -#include "GIOP_Message_NonReactive_Base.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Transport.i" @@ -33,6 +33,7 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl connection_handler_ (handler), messaging_object_ (0) { +#if 0 if (flag) { // Use the lite version of the protocol @@ -40,10 +41,11 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl TAO_GIOP_Message_Lite (orb_core)); } else +#endif /*#if 0 */ { // Use the normal GIOP object ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_NonReactive_Base (orb_core)); + TAO_GIOP_Message_Base (orb_core)); } } @@ -91,42 +93,100 @@ TAO_SHMIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - return this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); + ssize_t n = 0; + + int read_break = 0; + + while (!read_break) + { + n = this->connection_handler_->peer ().recv (buf, + len, + max_wait_time); + + // If we get a EWOULBLOCK we try to read again. + if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + n = 0; + continue; + } + + // If there is anything else we just drop out of the loop. + read_break = 1; + } + + if (n == 0 || n == -1) + { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); + } + } + + return n; + } + int -TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, - int block) +TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // Read the message of the socket - int result = this->messaging_object_->read_message (this, - block, - max_wait_time); - - if (result == -1) + // Calculate the actual length of the load that we are supposed to + // read which is equal to the + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t bytes = 0; + + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (size_t n = payload; + n != 0; + n -= bytes) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()"))); + // We would have liked to use something like a recv_n () + // here. But at the time when the code was written, the MEM_Stream + // classes had poor support for recv_n (). Till a day when we + // get proper recv_n (), let us stick with this. The other + // argument that can be said against this is that, this is the + // bad layer in which this is being done ie. recv_n is + // simulated. But... + bytes = this->recv (incoming.wr_ptr (), + missing_data, + max_wait_time); + + if (bytes == 0 || + bytes == -1) + { + return -1; + } - this->tms_->connection_closed (); - return -1; + incoming.wr_ptr (bytes); } - if (result < 2) - return result; - // Now we know that we have been able to read the complete message - // here.. We loop here to see whether we have read more than one - // message in our read. + TAO_Queued_Data pqd (&incoming); - // See we use the reactor semantics again - result = this->process_message (); + // With SHMIOP we would not have any missing data... + pqd.missing_data_ = 0; + this->messaging_object ()->get_message_data (&pqd); - return result; + // Resume the handle before processing the request + rh.resume_handle (); + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd); } @@ -220,127 +280,6 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -int -TAO_SHMIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - // The messages are fragmented, so we go back to the reactor. - if (retval <= 0) - return retval; - - // Get the that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message (this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - - result = - this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 0; -} - void TAO_SHMIOP_Transport::transition_handler_state_i (void) { diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index 60812057b5b..61785ef6215 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -78,11 +78,10 @@ protected: size_t len, const ACE_Time_Value *s = 0); - /// Read and process the message from the connection. The processing - /// of the message is done by delegating the work to the underlying - /// messaging object - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); virtual int register_handler_i (void); @@ -108,10 +107,6 @@ public: virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor); -private: - /// Process the message that we have read - int process_message (void); - private: /// The connection service handler used for accessing lower layer /// communication protocols. diff --git a/TAO/tao/Strategies/TAO_Strategies.dsp b/TAO/tao/Strategies/TAO_Strategies.dsp index 3b836f5bd2e..8056375a565 100644 --- a/TAO/tao/Strategies/TAO_Strategies.dsp +++ b/TAO/tao/Strategies/TAO_Strategies.dsp @@ -102,14 +102,6 @@ SOURCE=.\FIFO_Connection_Purging_Strategy.cpp # End Source File # Begin Source File -SOURCE=.\GIOP_Message_NonReactive_Base.cpp -# End Source File -# Begin Source File - -SOURCE=.\GIOP_Message_NonReactive_Handler.cpp -# End Source File -# Begin Source File - SOURCE=.\LFU_Connection_Purging_Strategy.cpp # End Source File # Begin Source File @@ -198,14 +190,6 @@ SOURCE=.\advanced_resource.h # End Source File # Begin Source File -SOURCE=.\GIOP_Message_NonReactive_Base.h -# End Source File -# Begin Source File - -SOURCE=.\GIOP_Message_NonReactive_Handler.h -# End Source File -# Begin Source File - SOURCE=.\Reactor_Per_Priority.h # End Source File # Begin Source File diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 98ac3bdb8d1..d683667ee46 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -599,10 +599,10 @@ protected: int check_message_integrity (ACE_Message_Block &message_block); - int consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); int consolidate_message_queue (ACE_Message_Block &incoming, ssize_t missing_data, @@ -613,7 +613,7 @@ protected: TAO_Resume_Handle &rh); /// @@ Bala: Documentation - virtual int process_parsed_messages (TAO_Queued_Data *qd); + int process_parsed_messages (TAO_Queued_Data *qd); public: /// Method for the connection handler to signify that it -- cgit v1.2.1