diff options
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r-- | TAO/tao/Leader_Follower.cpp | 434 |
1 files changed, 142 insertions, 292 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp index ab245232d58..342271f87a5 100644 --- a/TAO/tao/Leader_Follower.cpp +++ b/TAO/tao/Leader_Follower.cpp @@ -1,65 +1,122 @@ // $Id$ -#include "tao/Leader_Follower.h" -#include "tao/Resource_Factory.h" -#include "tao/LF_Follower.h" -#include "tao/LF_Follower_Auto_Ptr.h" -#include "tao/LF_Follower_Auto_Adder.h" -#include "tao/LF_Event.h" -#include "tao/LF_Event_Binder.h" -#include "tao/debug.h" -#include "tao/Transport.h" +#include "Leader_Follower.h" +#include "Resource_Factory.h" #include "ace/Reactor.h" #if !defined (__ACE_INLINE__) -# include "tao/Leader_Follower.i" +# include "Leader_Follower.i" #endif /* ! __ACE_INLINE__ */ ACE_RCSID(tao, Leader_Follower, "$Id$") TAO_Leader_Follower::~TAO_Leader_Follower (void) { - while (!this->follower_free_list_.empty ()) - { - TAO_LF_Follower *follower = - this->follower_free_list_.pop_front (); - delete follower; - } // Hand the reactor back to the resource factory. this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_); this->reactor_ = 0; } -TAO_LF_Follower * -TAO_Leader_Follower::allocate_follower (void) +TAO_Leader_Follower::TAO_Follower_Node::TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr) + : follower_ (follower_ptr), + next_ (0) { - if (!this->follower_free_list_.empty ()) - return this->follower_free_list_.pop_front (); - return new TAO_LF_Follower (*this); } -void -TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower) + +TAO_Leader_Follower::TAO_Follower_Queue::TAO_Follower_Queue (void) + : head_ (0), + tail_ (0) +{ + +} + +int +TAO_Leader_Follower::TAO_Follower_Queue::insert (TAO_Follower_Node* node) { - this->follower_free_list_.push_front (follower); + if (this->head_ == 0) { + this->head_ = node; + this->tail_ = node; + // Make sure that we don't have garbage in the case when the same node + // is added a second time. This is necessary as the nodes are on the + // stack. + node->next_ = 0; + } + else + { + // Add the node to the tail and modify the pointers + TAO_Follower_Node* temp = this->tail_; + temp->next_ = node; + this->tail_ = node; + node->next_ = 0; + } + return 0; } int -TAO_Leader_Follower::elect_new_leader_i (void) +TAO_Leader_Follower::TAO_Follower_Queue::remove (TAO_Follower_Node* node) +{ + TAO_Follower_Node* prev = 0; + TAO_Follower_Node* curr = 0; + + // No followers in queue, return + if (this->head_ == 0) + return -1; + + // Check is for whether we have the same condition variable on the + // queue rather than the same node structure which wraps it. + for (curr = this->head_; + curr != 0 && curr->follower_ != node->follower_; + curr = curr->next_) + { + prev = curr; + } + + // Entry not found in the queue + if (curr == 0) + return -1; + // Entry found at the head of the queue + else if (prev == 0) + this->head_ = this->head_->next_; + else + prev->next_ = curr->next_; + // Entry at the tail + if (curr->next_ == 0) + this->tail_ = prev; + + return 0; +} + + +TAO_SYNCH_CONDITION* +TAO_Leader_Follower::get_next_follower (void) { - TAO_LF_Follower* follower = - this->follower_set_.head (); + // If the queue is empty return + if (this->follower_set_.is_empty()) + return 0; + + TAO_Follower_Node* next_follower = this->follower_set_.head_; + + TAO_SYNCH_CONDITION *cond = next_follower->follower_; #if defined (TAO_DEBUG_LEADER_FOLLOWER) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) LF::elect_new_leader_i - " + "TAO (%P|%t) LF::get_next_follower - " "follower is %x\n", - follower)); + cond)); #endif /* TAO_DEBUG_LEADER_FOLLOWER */ - return follower->signal (); + // We *must* remove it when we signal it so the same condition is + // not signalled for both wake up as a follower and as the next + // leader. + // The follower may not be there if the reply is received while the + // consumer is not yet waiting for it (i.e. it send the request but + // has not blocked to receive the reply yet) + (void) this->remove_follower (next_follower); // Ignore errors.. + + return cond; } int @@ -168,284 +225,77 @@ TAO_Leader_Follower::reset_client_thread (void) } } -int -TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, - TAO_Transport *transport, - ACE_Time_Value *max_wait_time) +TAO_LF_Strategy::TAO_LF_Strategy () { - // Obtain the lock. - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); - - // Optmize the first iteration [no access to errno] - int result = 1; - - { - // Calls this->set_client_thread () on construction and - // this->reset_client_thread () on destruction. - TAO_LF_Client_Thread_Helper client_thread_helper (*this); - ACE_UNUSED_ARG (client_thread_helper); - - ACE_Countdown_Time countdown (max_wait_time); - - // Check if there is a leader. Note that it cannot be us since we - // gave up our leadership when we became a client. - if (this->leader_available ()) - { - // = Wait as a follower. - - // Grab a follower: - TAO_LF_Follower_Auto_Ptr follower (*this); - if (follower.get () == 0) - return -1; - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " (follower) on Transport <%d>, cond <%x>\n", - transport->id (), - follower.get ())); - - // Bound the follower and the LF_Event, this is important to - // get a signal when the event terminates - TAO_LF_Event_Binder event_binder (event, follower.get ()); - - while (event->keep_waiting () && - this->leader_available ()) - { - // Add ourselves to the list, do it everytime we wake up - // from the CV loop. Because: - // - // - The leader thread could have elected us as the new - // leader. - // - Before we can assume the role another thread becomes - // the leader - // - But our condition variable could have been removed - // already, if we don't add it again we will never wake - // up. - // - // Notice that we can have spurious wake ups, in that case - // adding the leader results in an error, that must be - // ignored. - // You may be thinking of not removing the condition - // variable in the code that sends the signal, but - // removing it here, that does not work either, in that - // case the condition variable may be used twice: - // - // - Wake up because its reply arrived - // - Wake up because it must become the leader - // - // but only the first one has any effect, so the leader is - // lost. - // - - TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); - - if (max_wait_time == 0) - { - if (follower->wait (max_wait_time) == -1) - { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait_for_event, " - " (follower) on <%d>" - " [no timer, cond failed]\n", - transport->id ())); - - // @@ Michael: What is our error handling in this case? - // We could be elected as leader and - // no leader would come in? - return -1; - } - } - else - { - countdown.update (); - ACE_Time_Value tv = ACE_OS::gettimeofday (); - tv += *max_wait_time; - if (follower->wait (&tv) == -1) - { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait," - " (follower) on <%x> " - " [has timer, follower failed]\n", - transport->id ())); - - if (!event->successful ()) - { - // Remove follower can fail because either - // 1) the condition was satisfied (i.e. reply - // received or queue drained), or - // 2) somebody elected us as leader, or - // 3) the connection got closed. - // - // Therefore: - // If remove_follower fails and the condition - // was not satisfied, we know that we got - // elected as a leader. - // But we got a timeout, so we cannot become - // the leader, therefore, we have to select a - // new leader. - // - - if (this->elect_new_leader () == -1 - && TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " elect_new_leader failed\n")); - } - } - return -1; - } - } - } - - countdown.update (); - - // @@ Michael: This is an old comment why we do not want to - // remove the follower here. - // We should not remove the follower here, we *must* remove it when - // we signal it so the same condition is not signalled for - // both wake up as a follower and as the next leader. - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " done (follower) on <%d>, successful %d\n", - transport->id (), - event->successful ())); - - // Now somebody woke us up to become a leader or to handle our - // input. We are already removed from the follower queue. - - if (event->successful ()) - return 0; - - if (event->error_detected ()) - return -1; - - // FALLTHROUGH - // We only get here if we woke up but the reply is not - // complete yet, time to assume the leader role.... - // i.e. ACE_ASSERT (event->successful () == 0); - } - - // = Leader Code. - - // The only way to reach this point is if we must become the - // leader, because there is no leader or we have to update to a - // leader or we are doing nested upcalls in this case we do - // increase the refcount on the leader in TAO_ORB_Core. - - // Calls this->set_client_leader_thread () on - // construction and this->reset_client_leader_thread () - // on destruction. Note that this may increase the refcount of - // the leader. - TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); - ACE_UNUSED_ARG (client_leader_thread_helper); +} - { - ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, - this->reverse_lock (), -1); +TAO_LF_Strategy::~TAO_LF_Strategy () +{ +} - // Become owner of the reactor. - ACE_Reactor *reactor = this->reactor_; - reactor->owner (ACE_Thread::self ()); +TAO_Complete_LF_Strategy::TAO_Complete_LF_Strategy () +{ +} - // Run the reactor event loop. +TAO_Complete_LF_Strategy::~TAO_Complete_LF_Strategy () +{ +} - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " (leader) enter reactor event loop on <%d>\n", - transport->id ())); +void +TAO_Complete_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &leader_follower) +{ + leader_follower.set_upcall_thread (); +} - // If we got our event, no need to run the event loop any - // further. - while (event->keep_waiting ()) - { - // Run the event loop. - result = reactor->handle_events (max_wait_time); +int +TAO_Complete_LF_Strategy::set_event_loop_thread (ACE_Time_Value *max_wait_time, + TAO_Leader_Follower &leader_follower) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock (), -1); - // Did we timeout? If so, stop running the loop. - if (result == 0 && - max_wait_time != 0 && - *max_wait_time == ACE_Time_Value::zero) - break; + return leader_follower.set_event_loop_thread (max_wait_time); +} - // Other errors? If so, stop running the loop. - if (result == -1) - break; +void +TAO_Complete_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int call_reset, + TAO_Leader_Follower &leader_follower) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ()); - // Otherwise, keep going... - } + if (call_reset) + leader_follower.reset_event_loop_thread (); - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " (leader) exit reactor event loop on <%d>\n", - transport->id ())); - } - } - // - // End artificial scope for auto_ptr like helpers calling: - // this->reset_client_thread () and (maybe) - // this->reset_client_leader_thread (). - // - - // Wake up the next leader, we cannot do that in handle_input, - // because the woken up thread would try to get into handle_events, - // which is at the time in handle_input still occupied. But do it - // before checking the error in <result>, even if there is an error - // in our input we should continue running the loop in another - // thread. - - if (this->elect_new_leader () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " failed to elect new leader\n"), - -1); + int result = leader_follower.elect_new_leader (); if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - Leader_Follower::wait_for_event," - " handle_events failed\n"), - -1); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) Failed to wake up ") + ACE_TEXT ("a follower thread\n"))); +} - // Return an error if there was a problem receiving the reply... - if (max_wait_time != 0) - { - if (!event->successful () - && *max_wait_time == ACE_Time_Value::zero) - { - result = -1; - errno = ETIME; - } - else if (event->error_detected ()) - { - // If the time did not expire yet, but we get a failure, - // e.g. the connections closed, we should still return an error. - result = -1; - } - } - else - { - result = 0; - if (event->error_detected ()) - { - result = -1; - } - } - return result; +TAO_Null_LF_Strategy::TAO_Null_LF_Strategy () +{ } -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +TAO_Null_LF_Strategy::~TAO_Null_LF_Strategy () +{ +} -template class ACE_Intrusive_List<TAO_LF_Follower>; +void +TAO_Null_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &) +{ +} -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +int +TAO_Null_LF_Strategy::set_event_loop_thread (ACE_Time_Value *, + TAO_Leader_Follower &) +{ + return 0; +} -#pragma instantiate ACE_Intrusive_List<TAO_LF_Follower> +void +TAO_Null_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int, + TAO_Leader_Follower &) +{ +} -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |