diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-08-01 23:39:57 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-08-01 23:39:57 +0000 |
commit | 28f2b1eecfab765469d97d38b686bffdea53114c (patch) | |
tree | e11d110b2f6e69f07a780756276cc735be3e8fa9 /TAO/tao/Leader_Follower.cpp | |
parent | 545c5cb11c374d0333e7d0f1c88f12694ad2eaee (diff) | |
download | ATCD-28f2b1eecfab765469d97d38b686bffdea53114c.tar.gz |
ChangeLogTag:Wed Aug 1 16:05:36 2001 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r-- | TAO/tao/Leader_Follower.cpp | 434 |
1 files changed, 292 insertions, 142 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp index 342271f87a5..c94e471e05d 100644 --- a/TAO/tao/Leader_Follower.cpp +++ b/TAO/tao/Leader_Follower.cpp @@ -1,122 +1,65 @@ // $Id$ -#include "Leader_Follower.h" -#include "Resource_Factory.h" +#include "tao/Leader_Follower.h" +#include "tao/Resource_Factory.h" +#include "tao/LF_Follower.h" +#include "tao/LF_Follower_Auto_Ptr.h" +#include "tao/LF_Follower_Auto_Adder.h" +#include "tao/LF_Event.h" +#include "tao/LF_Event_Binder.h" + +#include "tao/Transport.h" #include "ace/Reactor.h" #if !defined (__ACE_INLINE__) -# include "Leader_Follower.i" +# include "tao/Leader_Follower.i" #endif /* ! __ACE_INLINE__ */ ACE_RCSID(tao, Leader_Follower, "$Id$") TAO_Leader_Follower::~TAO_Leader_Follower (void) { + while (!this->follower_free_list_.empty ()) + { + TAO_LF_Follower *follower = + this->follower_free_list_.pop_front (); + delete follower; + } // Hand the reactor back to the resource factory. this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_); this->reactor_ = 0; } -TAO_Leader_Follower::TAO_Follower_Node::TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr) - : follower_ (follower_ptr), - next_ (0) +TAO_LF_Follower * +TAO_Leader_Follower::allocate_follower (void) { + if (!this->follower_free_list_.empty ()) + return this->follower_free_list_.pop_front (); + return new TAO_LF_Follower (*this); } - -TAO_Leader_Follower::TAO_Follower_Queue::TAO_Follower_Queue (void) - : head_ (0), - tail_ (0) -{ - -} - -int -TAO_Leader_Follower::TAO_Follower_Queue::insert (TAO_Follower_Node* node) +void +TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower) { - if (this->head_ == 0) { - this->head_ = node; - this->tail_ = node; - // Make sure that we don't have garbage in the case when the same node - // is added a second time. This is necessary as the nodes are on the - // stack. - node->next_ = 0; - } - else - { - // Add the node to the tail and modify the pointers - TAO_Follower_Node* temp = this->tail_; - temp->next_ = node; - this->tail_ = node; - node->next_ = 0; - } - return 0; + this->follower_free_list_.push_front (follower); } int -TAO_Leader_Follower::TAO_Follower_Queue::remove (TAO_Follower_Node* node) -{ - TAO_Follower_Node* prev = 0; - TAO_Follower_Node* curr = 0; - - // No followers in queue, return - if (this->head_ == 0) - return -1; - - // Check is for whether we have the same condition variable on the - // queue rather than the same node structure which wraps it. - for (curr = this->head_; - curr != 0 && curr->follower_ != node->follower_; - curr = curr->next_) - { - prev = curr; - } - - // Entry not found in the queue - if (curr == 0) - return -1; - // Entry found at the head of the queue - else if (prev == 0) - this->head_ = this->head_->next_; - else - prev->next_ = curr->next_; - // Entry at the tail - if (curr->next_ == 0) - this->tail_ = prev; - - return 0; -} - - -TAO_SYNCH_CONDITION* -TAO_Leader_Follower::get_next_follower (void) +TAO_Leader_Follower::elect_new_leader_i (void) { - // If the queue is empty return - if (this->follower_set_.is_empty()) - return 0; - - TAO_Follower_Node* next_follower = this->follower_set_.head_; - - TAO_SYNCH_CONDITION *cond = next_follower->follower_; + TAO_LF_Follower* follower = + this->follower_set_.head (); #if defined (TAO_DEBUG_LEADER_FOLLOWER) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) LF::get_next_follower - " + "TAO (%P|%t) LF::elect_new_leader_i - " "follower is %x\n", - cond)); + follower)); #endif /* TAO_DEBUG_LEADER_FOLLOWER */ - // We *must* remove it when we signal it so the same condition is - // not signalled for both wake up as a follower and as the next - // leader. - // The follower may not be there if the reply is received while the - // consumer is not yet waiting for it (i.e. it send the request but - // has not blocked to receive the reply yet) - (void) this->remove_follower (next_follower); // Ignore errors.. - - return cond; + return follower->signal (); } int @@ -225,77 +168,284 @@ TAO_Leader_Follower::reset_client_thread (void) } } -TAO_LF_Strategy::TAO_LF_Strategy () +int +TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, + TAO_Transport *transport, + ACE_Time_Value *max_wait_time) { -} + // Obtain the lock. + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); + + // Optmize the first iteration [no access to errno] + int result = 1; + + { + // Calls this->set_client_thread () on construction and + // this->reset_client_thread () on destruction. + TAO_LF_Client_Thread_Helper client_thread_helper (*this); + ACE_UNUSED_ARG (client_thread_helper); + + ACE_Countdown_Time countdown (max_wait_time); + + // Check if there is a leader. Note that it cannot be us since we + // gave up our leadership when we became a client. + if (this->leader_available ()) + { + // = Wait as a follower. + + // Grab a follower: + TAO_LF_Follower_Auto_Ptr follower (*this); + if (follower.get () == 0) + return -1; + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (follower) on Transport <%d>, cond <%x>\n", + transport->id (), + follower.get ())); + + // Bound the follower and the LF_Event, this is important to + // get a signal when the event terminates + TAO_LF_Event_Binder event_binder (event, follower.get ()); + + while (event->keep_waiting () && + this->leader_available ()) + { + // Add ourselves to the list, do it everytime we wake up + // from the CV loop. Because: + // + // - The leader thread could have elected us as the new + // leader. + // - Before we can assume the role another thread becomes + // the leader + // - But our condition variable could have been removed + // already, if we don't add it again we will never wake + // up. + // + // Notice that we can have spurious wake ups, in that case + // adding the leader results in an error, that must be + // ignored. + // You may be thinking of not removing the condition + // variable in the code that sends the signal, but + // removing it here, that does not work either, in that + // case the condition variable may be used twice: + // + // - Wake up because its reply arrived + // - Wake up because it must become the leader + // + // but only the first one has any effect, so the leader is + // lost. + // + + TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); + + if (max_wait_time == 0) + { + if (follower->wait (max_wait_time) == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event, " + " (follower) on <%d>" + " [no timer, cond failed]\n", + transport->id ())); + + // @@ Michael: What is our error handling in this case? + // We could be elected as leader and + // no leader would come in? + return -1; + } + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (follower->wait (&tv) == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait," + " (follower) on <%x> " + " [has timer, follower failed]\n", + transport->id ())); + + if (!event->successful ()) + { + // Remove follower can fail because either + // 1) the condition was satisfied (i.e. reply + // received or queue drained), or + // 2) somebody elected us as leader, or + // 3) the connection got closed. + // + // Therefore: + // If remove_follower fails and the condition + // was not satisfied, we know that we got + // elected as a leader. + // But we got a timeout, so we cannot become + // the leader, therefore, we have to select a + // new leader. + // + + if (this->elect_new_leader () == -1 + && TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " elect_new_leader failed\n")); + } + } + return -1; + } + } + } + + countdown.update (); + + // @@ Michael: This is an old comment why we do not want to + // remove the follower here. + // We should not remove the follower here, we *must* remove it when + // we signal it so the same condition is not signalled for + // both wake up as a follower and as the next leader. + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " done (follower) on <%d>, successful %d\n", + transport->id (), + event->successful ())); + + // Now somebody woke us up to become a leader or to handle our + // input. We are already removed from the follower queue. + + if (event->successful ()) + return 0; + + if (event->error_detected ()) + return -1; + + // FALLTHROUGH + // We only get here if we woke up but the reply is not + // complete yet, time to assume the leader role.... + // i.e. ACE_ASSERT (event->successful () == 0); + } + + // = Leader Code. + + // The only way to reach this point is if we must become the + // leader, because there is no leader or we have to update to a + // leader or we are doing nested upcalls in this case we do + // increase the refcount on the leader in TAO_ORB_Core. + + // Calls this->set_client_leader_thread () on + // construction and this->reset_client_leader_thread () + // on destruction. Note that this may increase the refcount of + // the leader. + TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); + ACE_UNUSED_ARG (client_leader_thread_helper); -TAO_LF_Strategy::~TAO_LF_Strategy () -{ -} + { + ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, + this->reverse_lock (), -1); -TAO_Complete_LF_Strategy::TAO_Complete_LF_Strategy () -{ -} + // Become owner of the reactor. + ACE_Reactor *reactor = this->reactor_; + reactor->owner (ACE_Thread::self ()); -TAO_Complete_LF_Strategy::~TAO_Complete_LF_Strategy () -{ -} + // Run the reactor event loop. -void -TAO_Complete_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &leader_follower) -{ - leader_follower.set_upcall_thread (); -} + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (leader) enter reactor event loop on <%d>\n", + transport->id ())); -int -TAO_Complete_LF_Strategy::set_event_loop_thread (ACE_Time_Value *max_wait_time, - TAO_Leader_Follower &leader_follower) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock (), -1); + // If we got our event, no need to run the event loop any + // further. + while (event->keep_waiting ()) + { + // Run the event loop. + result = reactor->handle_events (max_wait_time); - return leader_follower.set_event_loop_thread (max_wait_time); -} + // Did we timeout? If so, stop running the loop. + if (result == 0 && + max_wait_time != 0 && + *max_wait_time == ACE_Time_Value::zero) + break; -void -TAO_Complete_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int call_reset, - TAO_Leader_Follower &leader_follower) -{ - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ()); + // Other errors? If so, stop running the loop. + if (result == -1) + break; - if (call_reset) - leader_follower.reset_event_loop_thread (); + // Otherwise, keep going... + } - int result = leader_follower.elect_new_leader (); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (leader) exit reactor event loop on <%d>\n", + transport->id ())); + } + } + // + // End artificial scope for auto_ptr like helpers calling: + // this->reset_client_thread () and (maybe) + // this->reset_client_leader_thread (). + // + + // Wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into handle_events, + // which is at the time in handle_input still occupied. But do it + // before checking the error in <result>, even if there is an error + // in our input we should continue running the loop in another + // thread. + + if (this->elect_new_leader () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " failed to elect new leader\n"), + -1); if (result == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) Failed to wake up ") - ACE_TEXT ("a follower thread\n"))); -} + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " handle_events failed\n"), + -1); -TAO_Null_LF_Strategy::TAO_Null_LF_Strategy () -{ + // Return an error if there was a problem receiving the reply... + if (max_wait_time != 0) + { + if (!event->successful () + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + else if (event->error_detected ()) + { + // If the time did not expire yet, but we get a failure, + // e.g. the connections closed, we should still return an error. + result = -1; + } + } + else + { + result = 0; + if (event->error_detected ()) + { + result = -1; + } + } + return result; } -TAO_Null_LF_Strategy::~TAO_Null_LF_Strategy () -{ -} +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -void -TAO_Null_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &) -{ -} +template class ACE_Intrusive_List<TAO_LF_Follower>; -int -TAO_Null_LF_Strategy::set_event_loop_thread (ACE_Time_Value *, - TAO_Leader_Follower &) -{ - return 0; -} +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -void -TAO_Null_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int, - TAO_Leader_Follower &) -{ -} +#pragma instantiate ACE_Intrusive_List<TAO_LF_Follower> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |