diff options
Diffstat (limited to 'TAO/tao/Wait_Strategy.cpp')
-rw-r--r-- | TAO/tao/Wait_Strategy.cpp | 183 |
1 files changed, 121 insertions, 62 deletions
diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 93b34738f1d..f7f5e19ac58 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -3,6 +3,7 @@ #include "tao/Wait_Strategy.h" #include "tao/Pluggable.h" #include "tao/ORB_Core.h" +#include "tao/debug.h" ACE_RCSID(tao, Wait_Strategy, "$Id$") @@ -72,7 +73,7 @@ TAO_Wait_On_Reactor::wait (void) result = reactor->handle_events (/* timeout */); } - if (result == -1) + if (result == -1 || this->reply_received_ == -1) return -1; return 0; @@ -83,12 +84,19 @@ TAO_Wait_On_Reactor::handle_input (void) { int result = this->transport_->handle_client_input (0); + if (result == 1) + this->reply_received_ = 1; + if (result == -1) return -1; - if (result == 1) - this->reply_received_ = 1; + return 0; +} +int +TAO_Wait_On_Reactor::handle_close (void) +{ + this->reply_received_ = -1; return 0; } @@ -139,33 +147,48 @@ TAO_Wait_On_Leader_Follower::send_request (TAO_ORB_Core *orb_core, stream, two_way); } - else - { - // = Two way call. - // @@ Should we do here that checking for the difference in the - // Reactor used??? (Alex). + // = Two way call. - // Register the handler. - this->transport_->register_handler (); - // @@ Carlos: We do this only if the reactor is different right? - // (Alex) + // @@ Should we do here that checking for the difference in the + // Reactor used??? (Alex). - // Obtain the lock. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, - orb_core->leader_follower_lock (), -1); + // Register the handler. + this->transport_->register_handler (); + // @@ Carlos: We do this only if the reactor is different right? + // (Alex) + // @@ Alex: that is taken care of in + // IIOP_Transport::register_handler, but maybe we shouldn't do this + // checking everytime, I recall that there was a problem (sometime + // ago) about using the wrong ORB core, but that may have been + // fixed... - // Set the state so that we know we're looking for a response. - this->expecting_response_ = 1; + // Obtain the lock. + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + orb_core->leader_follower_lock (), -1); - // remember in which thread the client connection handler was running - this->calling_thread_ = ACE_Thread::self (); + // The last request may have left this unitialized + this->reply_received_ = 0; - // Send the request - return TAO_Wait_Strategy::send_request (orb_core, - stream, - two_way); - } + // Set the state so that we know we're looking for a response. + this->expecting_response_ = 1; + + // remember in which thread the client connection handler was running + this->calling_thread_ = ACE_Thread::self (); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::send_request " + "expecting reply for <%x:%d>\n", + this, this->transport_->handle ())); + + } + + // Send the request + return TAO_Wait_Strategy::send_request (orb_core, + stream, + two_way); } int @@ -223,7 +246,8 @@ TAO_Wait_On_Leader_Follower::wait (void) if (orb_core->add_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " - "Failed to add a follower thread\n")); + "Failed to add a follower <%x>\n", + cond)); while (!this->reply_received_ && orb_core->leader_available ()) { @@ -269,6 +293,9 @@ TAO_Wait_On_Leader_Follower::wait (void) // This might increase the refcount of the leader. orb_core->set_leader_thread (); + // ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) - become the leader\n")); + // Release the lock. if (ace_mon.release () == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -286,12 +313,22 @@ TAO_Wait_On_Leader_Follower::wait (void) while (result >= 0 && this->reply_received_ == 0) result = orb_core->reactor ()->handle_events (); + // Re-acquire the lock. + if (ace_mon.acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: " + "Failed to acquire the lock.\n"), + -1); + // Wake up the next leader, we cannot do that in handle_input, // because the woken up thread would try to get into // handle_events, which is at the time in handle_input still // occupied. But do it before checking the error in <result>, even // if there is an error in our input we should continue running the - // loop in anothe rthread. + // loop in another thread. + + // ACE_DEBUG ((LM_DEBUG, + // "TAO (%P|%t) - elect a follower\n")); if (orb_core->unset_leader_wake_up_follower () == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -337,10 +374,21 @@ TAO_Wait_On_Leader_Follower::handle_input (void) // @@ Alex: this could be a CloseConnection message or something // similar, has to be handled... if (!this->expecting_response_) - return -1; + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "unexpected <%x:%d>\n", + this, this->transport_->handle ())); + return -1; + } // Receive any data that is available, without blocking... int result = this->transport_->handle_client_input (0); + if (result == -1 && TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "handle_client_input == -1\n")); // Data was read, but there the reply has not been completely // received... @@ -348,50 +396,28 @@ TAO_Wait_On_Leader_Follower::handle_input (void) return 0; // Severe error, abort.... - if (result == -1) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - L-F error while waiting on %d\n", - this->transport_->handle ())); - - this->reply_received_ = -1; - } - else + if (result == 1) { - // All the data is here! this->reply_received_ = 1; result = 0; } - if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ())) - { - // We are the leader thread, simply return 0 to terminate the - // event loop.... - return 0; - } - - // We are not the leader thread, but we have our data, wake up - // ourselves and then return 0 so the leader thread can continue - // doing its job.... - - // At this point we might fail to remove the follower, because - // it has been already chosen to become the leader, so it is - // awake and will get this too. - ACE_SYNCH_CONDITION* cond = - this->cond_response_available (); - - // Ignore any errors, may have been removed by another thread... - (void) orb_core->remove_follower (cond); - - if (cond == 0 || cond->signal () == -1) - { - // Yikes, what do we do here???? - return result; - } + this->wake_up (); return result; } +int +TAO_Wait_On_Leader_Follower::handle_close (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + this->transport_->orb_core ()->leader_follower_lock (), + -1); + this->reply_received_ = -1; + this->wake_up (); + return 0; +} + // Register the handler. int TAO_Wait_On_Leader_Follower::register_handler (void) @@ -421,6 +447,33 @@ TAO_Wait_On_Leader_Follower::cond_response_available (void) return this->cond_response_available_; } +void +TAO_Wait_On_Leader_Follower::wake_up (void) +{ + if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ())) + { + // We are the leader thread, simply return 0, handle_events() + // will return because there was at least one event (this one!) + return; + } + + // We are not the leader thread, but we have our data, wake up + // ourselves and then return 0 so the leader thread can continue + // doing its job.... + + // At this point we might fail to remove the follower, because + // it has been already chosen to become the leader, so it is + // awake and will get this too. + ACE_SYNCH_CONDITION* cond = + this->cond_response_available (); + + // Ignore any errors, may have been removed by another thread... + (void) this->transport_->orb_core ()->remove_follower (cond); + + if (cond != 0) + (void) cond->signal (); +} + // ********************************************************************* // Constructor. @@ -459,6 +512,12 @@ TAO_Wait_On_Read::handle_input (void) return this->transport_->handle_client_input (1); } +int +TAO_Wait_On_Read::handle_close (void) +{ + return 0; +} + // No-op. int TAO_Wait_On_Read::register_handler (void) |