diff options
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r-- | TAO/tao/Leader_Follower.cpp | 422 |
1 files changed, 272 insertions, 150 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp index 9bf50ba6fa3..427374ee726 100644 --- a/TAO/tao/Leader_Follower.cpp +++ b/TAO/tao/Leader_Follower.cpp @@ -3,6 +3,7 @@ #include "ace/Countdown_Time.h" #include "ace/OS_NS_sys_time.h" #include "ace/Reactor.h" +#include "ace/Auto_Ptr.h" #include "tao/Leader_Follower.h" #include "tao/LF_Follower_Auto_Ptr.h" @@ -19,8 +20,25 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL +TAO_Leader_Follower::Deferred_Event::Deferred_Event (ACE_Event_Handler* h) +: eh_ (h) +{ + h->add_reference (); +} + + +ACE_Event_Handler* TAO_Leader_Follower::Deferred_Event::handler () const +{ + return this->eh_.handler (); +} + TAO_Leader_Follower::~TAO_Leader_Follower (void) { + while (!this->deferred_event_set_.is_empty ()) + { + Deferred_Event *event = this->deferred_event_set_.pop_front (); + delete event; + } while (!this->follower_free_list_.is_empty ()) { TAO_LF_Follower *follower = this->follower_free_list_.pop_front (); @@ -185,6 +203,49 @@ TAO_Leader_Follower::reset_client_thread (void) } int +TAO_Leader_Follower::defer_event (ACE_Event_Handler* eh) +{ + // Obtain the lock. + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); + + if (TAO_debug_level > 7) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Leader_Follower::defer_event, " + "deferring event handler[%d]\n", + eh->get_handle ())); + Deferred_Event* ptr = 0; + ACE_NEW_RETURN (ptr, + Deferred_Event (eh), + -1); + this->deferred_event_set_.push_back (ptr); + return 0; +} + +void +TAO_Leader_Follower::resume_events () +{ + // not need to obtain the lock, only called when holding the lock + while (!this->deferred_event_set_.is_empty ()) + { + ACE_Auto_Ptr<Deferred_Event> event (this->deferred_event_set_.pop_front ()); + // Send a notification to the reactor to cause the awakening of a new + // follower, if there is one already available. + ACE_Reactor *reactor = this->orb_core_->reactor (); + int const retval = reactor->notify (event->handler (), ACE_Event_Handler::READ_MASK); + if (TAO_debug_level > 2) + { + // @@todo: need to think about what is the action that + // we can take when we get here with an error?! + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::resume_events, ") + ACE_TEXT ("an event handler[%d] has been resumed, ") + ACE_TEXT ("notified the reactor, retval=%d.\n"), + event->handler ()->get_handle (), retval)); + } + } +} + +int TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, TAO_Transport *transport, ACE_Time_Value *max_wait_time) @@ -207,20 +268,32 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, size_t t_id = 0; if (TAO_debug_level && transport != 0) - { - t_id = transport->id (); - } - { + t_id = transport->id (); + } + + { // Scope #1: All threads inside here are client threads // Calls this->set_client_thread () on construction and // this->reset_client_thread () on destruction. TAO_LF_Client_Thread_Helper client_thread_helper (*this); ACE_UNUSED_ARG (client_thread_helper); - // Check if there is a leader. Note that it cannot be us since we - // gave up our leadership when we became a client. - if (this->leader_available ()) - { + // The loop here is for when we get elected (client) leader and + // then later relinquish the leader position and our event has + // still not completed (and we haven't run out of time). + // All the conditions below are basically the various ways the + // leader loop below can end, other than the event being complete + while (event->keep_waiting () + && !(result == 0 && + max_wait_time != 0 && + *max_wait_time == ACE_Time_Value::zero) + && result != -1) + { // Scope #2: threads here alternate between being leader/followers + + // Check if there is a leader. Note that it cannot be us since we + // gave up our leadership when we became a client. + if (this->leader_available ()) + { // Scope #3: threads here are followers // = Wait as a follower. // Grab a follower: @@ -240,101 +313,101 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, while (event->keep_waiting () && this->leader_available ()) + { // Scope #4: this loop handles spurious wake-ups + // Add ourselves to the list, do it everytime we wake up + // from the CV loop. Because: + // + // - The leader thread could have elected us as the new + // leader. + // - Before we can assume the role another thread becomes + // the leader + // - But our condition variable could have been removed + // already, if we don't add it again we will never wake + // up. + // + // Notice that we can have spurious wake ups, in that case + // adding the leader results in an error, that must be + // ignored. + // You may be thinking of not removing the condition + // variable in the code that sends the signal, but + // removing it here, that does not work either, in that + // case the condition variable may be used twice: + // + // - Wake up because its reply arrived + // - Wake up because it must become the leader + // + // but only the first one has any effect, so the leader is + // lost. + TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); + + if (max_wait_time == 0) { - // Add ourselves to the list, do it everytime we wake up - // from the CV loop. Because: - // - // - The leader thread could have elected us as the new - // leader. - // - Before we can assume the role another thread becomes - // the leader - // - But our condition variable could have been removed - // already, if we don't add it again we will never wake - // up. - // - // Notice that we can have spurious wake ups, in that case - // adding the leader results in an error, that must be - // ignored. - // You may be thinking of not removing the condition - // variable in the code that sends the signal, but - // removing it here, that does not work either, in that - // case the condition variable may be used twice: - // - // - Wake up because its reply arrived - // - Wake up because it must become the leader - // - // but only the first one has any effect, so the leader is - // lost. - TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); - - if (max_wait_time == 0) + if (follower->wait (max_wait_time) == -1) { - if (follower->wait (max_wait_time) == -1) - { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " - " (follower) [no timer, cond failed]\n", - t_id)); - - // @@ Michael: What is our error handling in this case? - // We could be elected as leader and - // no leader would come in? - return -1; - } + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " + " (follower) [no timer, cond failed]\n", + t_id)); + + // @@ Michael: What is our error handling in this case? + // We could be elected as leader and + // no leader would come in? + return -1; } - else + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (follower->wait (&tv) == -1) { - countdown.update (); - ACE_Time_Value tv = ACE_OS::gettimeofday (); - tv += *max_wait_time; - if (follower->wait (&tv) == -1) + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower[%d]::wait, " + "(follower) [has timer, follower failed]\n", + t_id )); + + // If we have timedout set the state in the + // LF_Event. We call the non-locking, + // no-signalling method on LF_Event. + if (errno == ETIME) + // We have timedout + event->set_state (TAO_LF_Event::LFS_TIMEOUT); + + if (!event->successful ()) + { + // Remove follower can fail because either + // 1) the condition was satisfied (i.e. reply + // received or queue drained), or + // 2) somebody elected us as leader, or + // 3) the connection got closed. + // + // Therefore: + // If remove_follower fails and the condition + // was not satisfied, we know that we got + // elected as a leader. + // But we got a timeout, so we cannot become + // the leader, therefore, we have to select a + // new leader. + // + + if (this->elect_new_leader () == -1 + && TAO_debug_level > 0) { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower[%d]::wait, " - "(follower) [has timer, follower failed]\n", - t_id )); - - // If we have timedout set the state in the - // LF_Event. We call the non-locking, - // no-signalling method on LF_Event. - if (errno == ETIME) - // We have timedout - event->set_state (TAO_LF_Event::LFS_TIMEOUT); - - if (!event->successful ()) - { - // Remove follower can fail because either - // 1) the condition was satisfied (i.e. reply - // received or queue drained), or - // 2) somebody elected us as leader, or - // 3) the connection got closed. - // - // Therefore: - // If remove_follower fails and the condition - // was not satisfied, we know that we got - // elected as a leader. - // But we got a timeout, so we cannot become - // the leader, therefore, we have to select a - // new leader. - // - - if (this->elect_new_leader () == -1 - && TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " - "elect_new_leader failed\n", - t_id )); - } - } - - - return -1; + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " + "elect_new_leader failed\n", + t_id )); } - } + } + + + return -1; + } } + } // End Scope #4: loop to handle spurious wakeups countdown.update (); @@ -364,69 +437,118 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, // We only get here if we woke up but the reply is not // complete yet, time to assume the leader role.... // i.e. ACE_ASSERT (event->successful () == 0); - } + } // End Scope #3: we are no longer a follower + + // = Leader Code. + + // The only way to reach this point is if we must become the + // leader, because there is no leader or we have to update to a + // leader or we are doing nested upcalls in this case we do + // increase the refcount on the leader in TAO_ORB_Core. + + // Calls this->set_client_leader_thread () on + // construction and this->reset_client_leader_thread () + // on destruction. Note that this may increase the refcount of + // the leader. + { // Scope #5: We are now the client-leader + TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); + ACE_UNUSED_ARG (client_leader_thread_helper); + + { // Scope #6: release the lock via a reverse lock + ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, + this->reverse_lock (), -1); + + // Become owner of the reactor. + ACE_Reactor *reactor = this->reactor_; + reactor->owner (ACE_Thread::self ()); + + // Run the reactor event loop. + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," + " (leader) enter reactor event loop\n", + t_id)); + + // If we got our event, no need to run the event loop any + // further. + while (event->keep_waiting ()) + { + // Run the event loop. + result = reactor->handle_events (max_wait_time); + + // Did we timeout? If so, stop running the loop. + if (result == 0 && + max_wait_time != 0 && + *max_wait_time == ACE_Time_Value::zero) + break; + + // Other errors? If so, stop running the loop. + if (result == -1) + break; + + // Has an event loop thread become available to take over? + // Yes, we are checking this without the lock, however, if + // we get a false reading we'll just circle around and + // become leader again... + if (this->event_loop_threads_waiting_) + break; + // Did we give up leadership? + if (!this->is_client_leader_thread ()) + break; + // Otherwise, keep going... + } - // = Leader Code. + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," + " (leader) exit reactor event loop\n", + t_id)); + } // End Scope #6: we should now hold the lock again - // The only way to reach this point is if we must become the - // leader, because there is no leader or we have to update to a - // leader or we are doing nested upcalls in this case we do - // increase the refcount on the leader in TAO_ORB_Core. + // End artificial scope for auto_ptr like helpers calling: + // this->reset_client_leader_thread (). - // Calls this->set_client_leader_thread () on - // construction and this->reset_client_leader_thread () - // on destruction. Note that this may increase the refcount of - // the leader. - TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); - ACE_UNUSED_ARG (client_leader_thread_helper); + } // End Scope #5: we are no longer a client-leader + // We only get here if we were the client leader and either our + // event completed or an event loop thread has become available to + // become leader. - { - ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, - this->reverse_lock (), -1); - - // Become owner of the reactor. - ACE_Reactor *reactor = this->reactor_; - reactor->owner (ACE_Thread::self ()); - - // Run the reactor event loop. - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," - " (leader) enter reactor event loop\n", - t_id)); - - // If we got our event, no need to run the event loop any - // further. - while (event->keep_waiting ()) - { - // Run the event loop. - result = reactor->handle_events (max_wait_time); + // resume any deferred events before we switch to a new leader thread + this->resume_events (); - // Did we timeout? If so, stop running the loop. - if (result == 0 && - max_wait_time != 0 && - *max_wait_time == ACE_Time_Value::zero) - break; + // Wake and yield to any event loop threads that may be waiting to + // take leadership - otherwise we will just loop around and take + // leadership again (because we hold the lock). + if (this->event_loop_threads_waiting_ && !this->leader_available ()) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," + " (client) waking and yielding to allow event thread leadership\n", + t_id)); + + // Wake up the next leader (in case not yet done) + if (this->elect_new_leader () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," + " failed to elect new leader\n", + t_id), + -1); + + // Yield, providing the event thread some time to grab leadership + ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, + this->reverse_lock (), -1); + ACE_OS::thr_yield (); + } - // Other errors? If so, stop running the loop. - if (result == -1) - break; + } // End Scope #2: we loop here if our event is incomplete - // Otherwise, keep going... - } - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," - " (leader) exit reactor event loop\n", - t_id)); - } - } - // // End artificial scope for auto_ptr like helpers calling: - // this->reset_client_thread () and (maybe) - // this->reset_client_leader_thread (). - // + // this->reset_client_thread () + + // We should only get here when our event is complete or timed-out + } // End Scope #1 // Wake up the next leader, we cannot do that in handle_input, // because the woken up thread would try to get into handle_events, |