From 1323714cde8a8dbf37f829315e0f45146a8408dd Mon Sep 17 00:00:00 2001 From: coryan Date: Sun, 30 May 1999 04:01:04 +0000 Subject: ChangeLogTag:Sat May 29 22:49:10 1999 Carlos O'Ryan --- TAO/ChangeLog-99c | 43 +++++++++ TAO/tao/Connect.cpp | 50 +++++++---- TAO/tao/IIOP_Transport.cpp | 38 +++++++- TAO/tao/IIOP_Transport.h | 3 + TAO/tao/Invocation.cpp | 19 ++-- TAO/tao/ORB_Core.cpp | 4 +- TAO/tao/Pluggable.cpp | 12 +++ TAO/tao/Pluggable.h | 52 ++++++----- TAO/tao/Request_Mux_Strategy.cpp | 4 +- TAO/tao/Request_Mux_Strategy.h | 6 +- TAO/tao/Wait_Strategy.cpp | 183 ++++++++++++++++++++++++++------------- TAO/tao/Wait_Strategy.h | 22 +++-- 12 files changed, 308 insertions(+), 128 deletions(-) diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index d733b4ab15e..b1ce73cd857 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,3 +1,46 @@ +Sat May 29 22:49:10 1999 Carlos O'Ryan + + * tao/Wait_Strategy.h: + * tao/Wait_Strategy.cpp: + The Wait_On_Reactor strategy was not passing up errors to the + user. + Don't hold the leader-follower lock while sending the request. + Don't forget to reset reply_received when sending a request. + Added a *lot* of debugging messages trying to fix the LF + strategy, they should go away on the production version. + New helper method for handle_input() and handle_close(), it is + used to decide if we should wake up any sleeping threads. + Removed obsolete @@ comments. + + * tao/Connect.cpp: + Re-organized some of the debugging code so it will be easier to + remove/compile out. + Inform the Transport when the connection closes, for example. + + * tao/Pluggable.h: + * tao/Pluggable.cpp: + * tao/IIOP_Transport.h: + * tao/IIOP_Transport.cpp: + New handle_close() method used to detect a connection going + down, the transport can use that to raise exceptions on all + waiting threads (or Reply_Handlers). + More debugging messages. + Changed signature of destroy_cdr_stream() + + * tao/Invocation.cpp: + If send() fails then cleanup the transport ASAP. + Bind the request id, dispatcher and CDR stream *before* sending + the request. + If send() fails report back to the user. + + * tao/ORB_Core.cpp: + The leader-follower lock is held by the user during the new + leader election, otherwise some nasty race conditions creep in. + + * tao/Request_Mux_Strategy.h: + * tao/Request_Mux_Strategy.cpp: + Changed signature of destroy_cdr_stream() + Sat May 29 22:35:38 1999 Carlos O'Ryan * tests/NestedUpcall/Reactor/Makefile: diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp index aac5e5382b6..0646f05b37b 100644 --- a/TAO/tao/Connect.cpp +++ b/TAO/tao/Connect.cpp @@ -107,13 +107,6 @@ TAO_Server_Connection_Handler::transport (void) int TAO_Server_Connection_Handler::open (void*) { - // Called by the when the handler is completely - // connected. - ACE_INET_Addr addr; - - if (this->peer ().get_remote_addr (addr) == -1) - return -1; - #if !defined (ACE_LACKS_SOCKET_BUFSIZ) int sndbufsize = this->orb_core_->orb_params ()->sock_sndbuf_size (); @@ -148,15 +141,21 @@ TAO_Server_Connection_Handler::open (void*) // operation fails we are out of luck (some platforms do not support // it and return -1). - char client[MAXHOSTNAMELEN + 1]; + // Called by the when the handler is completely + // connected. + ACE_INET_Addr addr; - if (addr.get_host_name (client, MAXHOSTNAMELEN) == -1) - addr.addr_to_string (client, sizeof (client)); + if (this->peer ().get_remote_addr (addr) == -1) + return -1; - if (TAO_orbdebug) + char client[MAXHOSTNAMELEN + 16]; + + (void) addr.addr_to_string (client, sizeof (client)); + + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%P|%t) connection from client %s\n", - client)); + "(%P|%t) connection from client <%s> on %d\n", + client, this->peer ().get_handle ())); return 0; } @@ -883,6 +882,22 @@ TAO_Client_Connection_Handler::open (void *) // operation fails we are out of luck (some platforms do not support // it and return -1). + // Called by the when the handler is completely + // connected. + ACE_INET_Addr addr; + + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + char server[MAXHOSTNAMELEN + 16]; + + (void) addr.addr_to_string (server, sizeof (server)); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connection to server <%s> on %d\n", + server, this->peer ().get_handle ())); + // Register the handler with the Reactor if necessary. return this->transport ()->wait_strategy ()->register_handler (); } @@ -905,11 +920,10 @@ TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle, // in turn take appropiate action (such as sending exceptions to // all waiting reply handlers). - if (TAO_orbdebug) + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Client_Connection_Handler::handle_close (%d, %d)\n", - handle, - rm)); + "(%P|%t) TAO_Client_Connection_Handler::" + "handle_close (%d, %d)\n", handle, rm)); if (this->recycler ()) this->recycler ()->mark_as_closed (this->recycling_act ()); @@ -929,6 +943,8 @@ TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle, this->peer ().close (); + this->transport ()->handle_close (); + return 0; } diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index b76c4bdeeee..abdb9918af2 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -245,10 +245,18 @@ TAO_IIOP_Client_Transport::handle_client_input (int block) case TAO_GIOP::MessageError: // Handle errors like these. // @@ this->reply_handler_->error (); + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) %N:%l handle_client_input: " + "error on stream.\n"), + -1); return -1; case TAO_GIOP::Fragment: // Handle this. + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) %N:%l handle_client_input: " + "fragment.\n"), + -1); return -1; case TAO_GIOP::Request: @@ -257,6 +265,10 @@ TAO_IIOP_Client_Transport::handle_client_input (int block) // on the firt iteration, leave it for the nearby future... // ERROR too. // @@ this->reply_handler_->error (); + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) %N:%l handle_client_input: " + "request.\n"), + -1); return -1; case TAO_GIOP::CancelRequest: @@ -264,6 +276,10 @@ TAO_IIOP_Client_Transport::handle_client_input (int block) case TAO_GIOP::CloseConnection: // @@ Errors for the time being. // @@ this->reply_handler_->error (); + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) %N:%l handle_client_input: " + "wrong message.\n"), + -1); return -1; case TAO_GIOP::LocateReply: @@ -324,11 +340,16 @@ TAO_IIOP_Client_Transport::handle_client_input (int block) // Handle the reply. if (reply_dispatcher->dispatch_reply () == -1) - return -1; - + { + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) %N:%l handle_client_input: " + "dispatch reply.\n"), + -1); + return -1; + } // This is a NOOP for the Exclusive request case, but it actually // destroys the stream in the muxed case. - this->rms_->destroy_cdr_stream (); + this->destroy_cdr_stream (cdr); // Return something to indicate the reply is received. return 1; @@ -359,6 +380,14 @@ TAO_IIOP_Client_Transport::resume_handler (void) (this->client_handler ()); } +int +TAO_IIOP_Client_Transport::handle_close (void) +{ + this->wait_strategy ()->handle_close (); + // @@ Should we? : this->rms_->handle_close (); + return 0; +} + int TAO_IIOP_Client_Transport::check_unexpected_data (void) { @@ -386,6 +415,9 @@ TAO_IIOP_Client_Transport::check_unexpected_data (void) // Both will result in us returning -1 and this connection // getting closed // + // if (errno == EWOULDBLOCK) + // return 0; + if (TAO_debug_level) ACE_DEBUG ((LM_WARNING, "TAO_IIOP_Client_Transport::check_unexpected_data: " diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 210cffdb6ba..01bafcec28a 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -161,6 +161,9 @@ public: // Resume the handler from the reactor. This will be called by the // Wait Strategy if Reactor is used for that strategy. + virtual int handle_close (void); + // The connection was closed, let everybody know about it.... + protected: int check_unexpected_data (void); // This method checks for unexpected data. diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index f279997ea0e..0544cffce70 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -393,6 +393,8 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, { // send_request () closed the connection, we just have to forget // about the hint. + this->transport_ = 0; + this->profile_->reset_hint (); return TAO_INVOKE_RESTART; @@ -683,10 +685,6 @@ int TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // Just send the request, without trying to wait for the reply. - int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV); - ACE_CHECK_RETURN (retval); - // Give the CDR stream for reading the input. this->transport_->input_cdr_stream (&this->inp_stream_); @@ -697,17 +695,24 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) this->rd_.request_id (this->request_id_); // Bind. - retval = this->transport_->bind_reply_dispatcher (this->request_id_, - &this->rd_); + int retval = this->transport_->bind_reply_dispatcher (this->request_id_, + &this->rd_); if (retval == -1) { // @@ What is the right way to handle this error? this->close_connection (); ACE_THROW_RETURN (CORBA::INTERNAL (TAO_DEFAULT_MINOR_CODE, - CORBA::COMPLETED_MAYBE), + CORBA::COMPLETED_NO), TAO_INVOKE_EXCEPTION); } + // Just send the request, without trying to wait for the reply. + retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV); + ACE_CHECK_RETURN (retval); + + if (retval != TAO_INVOKE_OK) + return retval; + // This blocks until the response is read. In the current version, // there is only one client thread that ever uses this connection, // so most response messages are illegal. diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 0b6985da2dc..d7e34d93ce1 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -999,8 +999,8 @@ int TAO_ORB_Core::unset_leader_wake_up_follower (void) // sets the leader_available flag to false and tries to wake up a follower { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, - this->leader_follower_lock (), -1); + // ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + // this->leader_follower_lock (), -1); this->unset_leader_thread (); diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index 1286009e1d1..287f1d678b8 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -63,6 +63,12 @@ TAO_Transport::input_cdr_stream (void) const return this->rms_->get_cdr_stream (); } +void +TAO_Transport::destroy_cdr_stream (TAO_InputCDR *cdr) const +{ + this->rms_->destroy_cdr_stream (cdr); +} + // Set the total size of the incoming message. (This does not // include the header size). void @@ -185,6 +191,12 @@ TAO_Transport::resume_handler (void) ACE_NOTSUP_RETURN (-1); } +int +TAO_Transport::handle_close (void) +{ + ACE_NOTSUP_RETURN (-1); +} + int TAO_Transport::wait_for_reply (void) { diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index 278d86bbcbc..b99bba7616a 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -147,30 +147,33 @@ public: void input_cdr_stream (TAO_InputCDR *cdr); // Set the CDR stream for reading the input message. - + TAO_InputCDR *input_cdr_stream (void) const; // Get the CDR stream for reading the input message. + void destroy_cdr_stream (TAO_InputCDR *) const; + // Release a CDR stream, simply pass it to the RMS... + // = State of the incoming message. - + void message_size (CORBA::ULong message_size); // Set the total size of the incoming message. (This does not // include the header size). This inits the setting - // it to zero. - + // it to zero. + CORBA::ULong message_size (void) const; // Get the total size of the incoming message. - + CORBA::ULong message_offset (void) const; // Get the current offset of the incoming message. - + int incr_message_offset (CORBA::Long bytes_transferred); // Update the offset of the incoming message. Returns 0 on success // -1 on failure. void message_received (int received); // Set the flag to indicate whether the input message was read fully - // or no. + // or no. int message_received (void) const; // Get the flag. @@ -179,7 +182,7 @@ public: // void orb_core (TAO_ORB_Core *orb_core); // Set it. - + TAO_ORB_Core *orb_core (void) const; // Get it. @@ -187,20 +190,20 @@ public: // void rms (TAO_Request_Mux_Strategy *rms); // Set the RMS object. - + TAO_Request_Mux_Strategy * rms (void) const; // Get the RMS used by this Transport object. - + TAO_Wait_Strategy *wait_strategy (void) const; // Return the Wait strategy used by the Transport. CORBA::ULong request_id (void); - // Get request id for the current invocation from the RMS object. - + // Get request id for the current invocation from the RMS object. + int bind_reply_dispatcher (CORBA::ULong request_id, TAO_Reply_Dispatcher *rd); // Bind the reply dispatcher with the RMS object. - + virtual int wait_for_reply (void); // Wait for the reply depending on the strategy. @@ -210,7 +213,7 @@ public: // Read and handle the reply. Returns 0 when there is Short Read on // the connection. Returns 1 when the full reply is read and // handled. Returns -1 on errors. - // If is 1, then reply is read in a blocking manner. + // If is 1, then reply is read in a blocking manner. virtual int register_handler (void); // Register the handler with the reactor. Will be called by the Wait @@ -218,40 +221,43 @@ public: // implementation out here returns -1 setting to ENOTSUP. virtual int suspend_handler (void); - // Suspend the handler from the reactor. Will be called by the Wait + // Suspend the handler from the reactor. Will be called by the Wait // Strategy if Reactor is used for that strategy. Default // implementation out here returns -1 setting to ENOTSUP. virtual int resume_handler (void); // Resume the handler from the reactor. This will be called by the // Wait Strategies, if Reactor is used in the strategy. Default - // implementation out here returns -1 setting to ENOTSUP. + // implementation out here returns -1 setting to ENOTSUP. + + virtual int handle_close (void); + // The connection was closed, let everybody know about it.... protected: // = States for the input message. - + CORBA::ULong message_size_; // Total length of the whole message. This does not include the // header length. - + CORBA::ULong message_offset_; // Current offset of the input message. int message_received_; // Flag to indicate whether the input message has been received // fully or not. - - + + TAO_Request_Mux_Strategy *rms_; // Strategy to decide whether multiple requests can be sent over the // same connection or the connection is exclusive for a request. - + TAO_Wait_Strategy *ws_; - // Strategy for waiting for the reply after sending the request. + // Strategy for waiting for the reply after sending the request. TAO_ORB_Core *orb_core_; // Global orbcore resource. - + TAO_GIOP_Version version_; // Version information found in the incoming message. }; diff --git a/TAO/tao/Request_Mux_Strategy.cpp b/TAO/tao/Request_Mux_Strategy.cpp index d7836021186..4cd55076f00 100644 --- a/TAO/tao/Request_Mux_Strategy.cpp +++ b/TAO/tao/Request_Mux_Strategy.cpp @@ -70,7 +70,7 @@ TAO_Muxed_RMS::set_cdr_stream (TAO_InputCDR *Cdr) void -TAO_Muxed_RMS::destroy_cdr_stream (void) +TAO_Muxed_RMS::destroy_cdr_stream (TAO_InputCDR *) { // @@ Implement. // delete cdr; @@ -132,7 +132,7 @@ TAO_Exclusive_RMS::set_cdr_stream (TAO_InputCDR *cdr) // NOOP function. void -TAO_Exclusive_RMS::destroy_cdr_stream (void) +TAO_Exclusive_RMS::destroy_cdr_stream (TAO_InputCDR *) { this->cdr_ = 0; } diff --git a/TAO/tao/Request_Mux_Strategy.h b/TAO/tao/Request_Mux_Strategy.h index 363b6539873..900f63a300e 100644 --- a/TAO/tao/Request_Mux_Strategy.h +++ b/TAO/tao/Request_Mux_Strategy.h @@ -69,7 +69,7 @@ public: virtual TAO_InputCDR *get_cdr_stream (void); // Get the CDR stream. - virtual void destroy_cdr_stream (void) = 0; + virtual void destroy_cdr_stream (TAO_InputCDR *) = 0; // Destroy the CDR stream. protected: @@ -115,7 +115,7 @@ public: // virtual TAO_InputCDR *cdr_stream (void); // Get the CDR stream. - virtual void destroy_cdr_stream (void); + virtual void destroy_cdr_stream (TAO_InputCDR *); // Delete the cdr stream. protected: @@ -159,7 +159,7 @@ public: // virtual TAO_InputCDR *cdr_stream (void); // Get the CDR stream. - virtual void destroy_cdr_stream (void); + virtual void destroy_cdr_stream (TAO_InputCDR *); // NO-OP function. protected: diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 93b34738f1d..f7f5e19ac58 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -3,6 +3,7 @@ #include "tao/Wait_Strategy.h" #include "tao/Pluggable.h" #include "tao/ORB_Core.h" +#include "tao/debug.h" ACE_RCSID(tao, Wait_Strategy, "$Id$") @@ -72,7 +73,7 @@ TAO_Wait_On_Reactor::wait (void) result = reactor->handle_events (/* timeout */); } - if (result == -1) + if (result == -1 || this->reply_received_ == -1) return -1; return 0; @@ -83,12 +84,19 @@ TAO_Wait_On_Reactor::handle_input (void) { int result = this->transport_->handle_client_input (0); + if (result == 1) + this->reply_received_ = 1; + if (result == -1) return -1; - if (result == 1) - this->reply_received_ = 1; + return 0; +} +int +TAO_Wait_On_Reactor::handle_close (void) +{ + this->reply_received_ = -1; return 0; } @@ -139,33 +147,48 @@ TAO_Wait_On_Leader_Follower::send_request (TAO_ORB_Core *orb_core, stream, two_way); } - else - { - // = Two way call. - // @@ Should we do here that checking for the difference in the - // Reactor used??? (Alex). + // = Two way call. - // Register the handler. - this->transport_->register_handler (); - // @@ Carlos: We do this only if the reactor is different right? - // (Alex) + // @@ Should we do here that checking for the difference in the + // Reactor used??? (Alex). - // Obtain the lock. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, - orb_core->leader_follower_lock (), -1); + // Register the handler. + this->transport_->register_handler (); + // @@ Carlos: We do this only if the reactor is different right? + // (Alex) + // @@ Alex: that is taken care of in + // IIOP_Transport::register_handler, but maybe we shouldn't do this + // checking everytime, I recall that there was a problem (sometime + // ago) about using the wrong ORB core, but that may have been + // fixed... - // Set the state so that we know we're looking for a response. - this->expecting_response_ = 1; + // Obtain the lock. + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + orb_core->leader_follower_lock (), -1); - // remember in which thread the client connection handler was running - this->calling_thread_ = ACE_Thread::self (); + // The last request may have left this unitialized + this->reply_received_ = 0; - // Send the request - return TAO_Wait_Strategy::send_request (orb_core, - stream, - two_way); - } + // Set the state so that we know we're looking for a response. + this->expecting_response_ = 1; + + // remember in which thread the client connection handler was running + this->calling_thread_ = ACE_Thread::self (); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::send_request " + "expecting reply for <%x:%d>\n", + this, this->transport_->handle ())); + + } + + // Send the request + return TAO_Wait_Strategy::send_request (orb_core, + stream, + two_way); } int @@ -223,7 +246,8 @@ TAO_Wait_On_Leader_Follower::wait (void) if (orb_core->add_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " - "Failed to add a follower thread\n")); + "Failed to add a follower <%x>\n", + cond)); while (!this->reply_received_ && orb_core->leader_available ()) { @@ -269,6 +293,9 @@ TAO_Wait_On_Leader_Follower::wait (void) // This might increase the refcount of the leader. orb_core->set_leader_thread (); + // ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) - become the leader\n")); + // Release the lock. if (ace_mon.release () == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -286,12 +313,22 @@ TAO_Wait_On_Leader_Follower::wait (void) while (result >= 0 && this->reply_received_ == 0) result = orb_core->reactor ()->handle_events (); + // Re-acquire the lock. + if (ace_mon.acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: " + "Failed to acquire the lock.\n"), + -1); + // Wake up the next leader, we cannot do that in handle_input, // because the woken up thread would try to get into // handle_events, which is at the time in handle_input still // occupied. But do it before checking the error in , even // if there is an error in our input we should continue running the - // loop in anothe rthread. + // loop in another thread. + + // ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) - elect a follower\n")); if (orb_core->unset_leader_wake_up_follower () == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -337,10 +374,21 @@ TAO_Wait_On_Leader_Follower::handle_input (void) // @@ Alex: this could be a CloseConnection message or something // similar, has to be handled... if (!this->expecting_response_) - return -1; + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "unexpected <%x:%d>\n", + this, this->transport_->handle ())); + return -1; + } // Receive any data that is available, without blocking... int result = this->transport_->handle_client_input (0); + if (result == -1 && TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "handle_client_input == -1\n")); // Data was read, but there the reply has not been completely // received... @@ -348,50 +396,28 @@ TAO_Wait_On_Leader_Follower::handle_input (void) return 0; // Severe error, abort.... - if (result == -1) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - L-F error while waiting on %d\n", - this->transport_->handle ())); - - this->reply_received_ = -1; - } - else + if (result == 1) { - // All the data is here! this->reply_received_ = 1; result = 0; } - if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ())) - { - // We are the leader thread, simply return 0 to terminate the - // event loop.... - return 0; - } - - // We are not the leader thread, but we have our data, wake up - // ourselves and then return 0 so the leader thread can continue - // doing its job.... - - // At this point we might fail to remove the follower, because - // it has been already chosen to become the leader, so it is - // awake and will get this too. - ACE_SYNCH_CONDITION* cond = - this->cond_response_available (); - - // Ignore any errors, may have been removed by another thread... - (void) orb_core->remove_follower (cond); - - if (cond == 0 || cond->signal () == -1) - { - // Yikes, what do we do here???? - return result; - } + this->wake_up (); return result; } +int +TAO_Wait_On_Leader_Follower::handle_close (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + this->transport_->orb_core ()->leader_follower_lock (), + -1); + this->reply_received_ = -1; + this->wake_up (); + return 0; +} + // Register the handler. int TAO_Wait_On_Leader_Follower::register_handler (void) @@ -421,6 +447,33 @@ TAO_Wait_On_Leader_Follower::cond_response_available (void) return this->cond_response_available_; } +void +TAO_Wait_On_Leader_Follower::wake_up (void) +{ + if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ())) + { + // We are the leader thread, simply return 0, handle_events() + // will return because there was at least one event (this one!) + return; + } + + // We are not the leader thread, but we have our data, wake up + // ourselves and then return 0 so the leader thread can continue + // doing its job.... + + // At this point we might fail to remove the follower, because + // it has been already chosen to become the leader, so it is + // awake and will get this too. + ACE_SYNCH_CONDITION* cond = + this->cond_response_available (); + + // Ignore any errors, may have been removed by another thread... + (void) this->transport_->orb_core ()->remove_follower (cond); + + if (cond != 0) + (void) cond->signal (); +} + // ********************************************************************* // Constructor. @@ -459,6 +512,12 @@ TAO_Wait_On_Read::handle_input (void) return this->transport_->handle_client_input (1); } +int +TAO_Wait_On_Read::handle_close (void) +{ + return 0; +} + // No-op. int TAO_Wait_On_Read::register_handler (void) diff --git a/TAO/tao/Wait_Strategy.h b/TAO/tao/Wait_Strategy.h index 14e02c4edde..a842ac536a7 100644 --- a/TAO/tao/Wait_Strategy.h +++ b/TAO/tao/Wait_Strategy.h @@ -59,9 +59,8 @@ public: virtual int handle_input (void) = 0; // Handle the input. - // @@ Alex: this class should *not* depend on the IIOP_Handlers, - // can't you use TAO_Transport for this? After all it returns an - // Event_Handler if you need one... + virtual int handle_close (void) = 0; + // The connection was closed, take appropiate action... virtual int register_handler (void) = 0; // Register the handler with the Reactor if it makes sense for the @@ -103,9 +102,8 @@ public: // Handle the input. Delegate this job to Transport object. Before // that suspend the handler in the Reactor. - // @@ Alex: this class should *not* depend on the IIOP_Handlers, - // can't you use TAO_Transport for this? After all it returns an - // Event_Handler if you need one... + virtual int handle_close (void); + // The connection was closed, take appropiate action... virtual int register_handler (void); // Register the handler with the Reactor. @@ -115,7 +113,7 @@ public: private: int reply_received_; - // This flag indicates if a *complete* reply has been received. Used + // This flag indicates if a *complete* reply has been received. Used // to exit the event loop. }; @@ -149,7 +147,8 @@ public: // Handle the input. Delegate this job to Transport object. Before // that, suspend the handler in the Reactor. - // @@ Alex: another use of IIOP_Handler... + virtual int handle_close (void); + // The connection was closed, take appropiate action... virtual int register_handler (void); // Register the handler with the Reactor. @@ -161,6 +160,10 @@ protected: ACE_SYNCH_CONDITION* cond_response_available (void); // Return the cond_response_available, initializing it if necessary. + void wake_up (void); + // Helper method to wake us up when we are a follower... + +protected: ACE_thread_t calling_thread_; // the thread ID of the thread we were running in. @@ -200,7 +203,8 @@ public: virtual int handle_input (void); // Handle the input. Delegate this job to Transport object. - // @@ Alex: another use of IIOP_Handler... + virtual int handle_close (void); + // The connection was closed, take appropiate action... virtual int register_handler (void); // No-op. Return 0. -- cgit v1.2.1