diff options
Diffstat (limited to 'TAO/tao/Strategies/SHMIOP_Transport.cpp')
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.cpp | 233 |
1 files changed, 149 insertions, 84 deletions
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 6cd79c5620f..81c803e0a41 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -15,10 +15,9 @@ #include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" -#include "tao/Resume_Handle.h" -#include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" +#include "tao/GIOP_Message_Lite.h" +#include "GIOP_Message_NonReactive_Base.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Transport.i" @@ -44,7 +43,7 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl { // Use the normal GIOP object ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_Base (orb_core)); + TAO_GIOP_Message_NonReactive_Base (orb_core)); } } @@ -92,97 +91,42 @@ TAO_SHMIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - ssize_t n = 0; - - int read_break = 0; - - while (!read_break) - { - n = this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); - - // If we get a EWOULBLOCK we try to read again. - if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) - { - n = 0; - continue; - } - - // If there is anything else we just drop out of the loop. - read_break = 1; - } - - if (n == 0 || n == -1) - { - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p \n"), - ACE_TEXT ("TAO - read message failure ") - ACE_TEXT ("recv_i () \n"))); - } - } - - return n; - + return this->connection_handler_->peer ().recv (buf, + len, + max_wait_time); } - int -TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, + int block) { - // Calculate the actual length of the load that we are supposed to - // read which is equal to the <missing_data> + length of the buffer - // that we have.. - size_t payload = missing_data + incoming.length (); - - // Grow the buffer to the size of the message - ACE_CDR::grow (&incoming, - payload); - - // .. do a read on the socket again. - ssize_t bytes = 0; - - // As this used for transports where things are available in one - // shot this looping should not create any problems. - for (size_t n = missing_data; - n != 0; - n -= bytes) + // Read the message of the socket + int result = this->messaging_object_->read_message (this, + block, + max_wait_time); + + if (result == -1) { - // We would have liked to use something like a recv_n () - // here. But at the time when the code was written, the MEM_Stream - // classes had poor support for recv_n (). Till a day when we - // get proper recv_n (), let us stick with this. The other - // argument that can be said against this is that, this is the - // bad layer in which this is being done ie. recv_n is - // simulated. But... - bytes = this->recv (incoming.wr_ptr (), - n, - max_wait_time); - - if (bytes == 0 || - bytes == -1) - { - return -1; - } + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()"))); - incoming.wr_ptr (bytes); + this->tms_->connection_closed (); + return -1; } + if (result < 2) + return result; - TAO_Queued_Data pqd (&incoming); + // Now we know that we have been able to read the complete message + // here.. We loop here to see whether we have read more than one + // message in our read. - // With SHMIOP we would not have any missing data... - pqd.missing_data_ = 0; + // See we use the reactor semantics again + result = this->process_message (); - this->messaging_object ()->get_message_data (&pqd); - // Now we have a full message in our buffer. Just go ahead and - // process that - return this->process_parsed_messages (&pqd, rh); + return result; } @@ -276,6 +220,127 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major, return 1; } +int +TAO_SHMIOP_Transport::process_message (void) +{ + // Check whether we have messages for processing + int retval = + this->messaging_object_->more_messages (); + + // The messages are fragmented, so we go back to the reactor. + if (retval <= 0) + return retval; + + // Get the <message_type> that we have received + TAO_Pluggable_Message_Type t = + this->messaging_object_->message_type (); + + + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + this->tms_->connection_closed (); + } + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) + { + if (this->messaging_object_->process_request_message (this, + this->orb_core ()) == -1) + return -1; + } + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + TAO_Pluggable_Reply_Params params (this->orb_core ()); + if (this->messaging_object_->process_reply_message (params) == -1) + { + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()"))); + + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } + + + result = + this->tms_->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::") + ACE_TEXT ("process_message - ") + ACE_TEXT ("dispatch reply failed\n"))); + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } + + if (result == 0) + { + this->messaging_object_->reset (); + + // The reply dispatcher was no longer registered. + // This can happened when the request/reply + // times out. + // To throw away all registered reply handlers is + // not the right thing, as there might be just one + // old reply coming in and several valid new ones + // pending. If we would invoke <connection_closed> + // we would throw away also the valid ones. + //return 0; + } + + + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } + + return 0; +} + void TAO_SHMIOP_Transport::transition_handler_state_i (void) { |