summaryrefslogtreecommitdiff
path: root/TAO/tao/Leader_Follower.cpp
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-08-01 23:39:57 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-08-01 23:39:57 +0000
commit28f2b1eecfab765469d97d38b686bffdea53114c (patch)
treee11d110b2f6e69f07a780756276cc735be3e8fa9 /TAO/tao/Leader_Follower.cpp
parent545c5cb11c374d0333e7d0f1c88f12694ad2eaee (diff)
downloadATCD-28f2b1eecfab765469d97d38b686bffdea53114c.tar.gz
ChangeLogTag:Wed Aug 1 16:05:36 2001 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r--TAO/tao/Leader_Follower.cpp434
1 files changed, 292 insertions, 142 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp
index 342271f87a5..c94e471e05d 100644
--- a/TAO/tao/Leader_Follower.cpp
+++ b/TAO/tao/Leader_Follower.cpp
@@ -1,122 +1,65 @@
// $Id$
-#include "Leader_Follower.h"
-#include "Resource_Factory.h"
+#include "tao/Leader_Follower.h"
+#include "tao/Resource_Factory.h"
+#include "tao/LF_Follower.h"
+#include "tao/LF_Follower_Auto_Ptr.h"
+#include "tao/LF_Follower_Auto_Adder.h"
+#include "tao/LF_Event.h"
+#include "tao/LF_Event_Binder.h"
+
+#include "tao/Transport.h"
#include "ace/Reactor.h"
#if !defined (__ACE_INLINE__)
-# include "Leader_Follower.i"
+# include "tao/Leader_Follower.i"
#endif /* ! __ACE_INLINE__ */
ACE_RCSID(tao, Leader_Follower, "$Id$")
TAO_Leader_Follower::~TAO_Leader_Follower (void)
{
+ while (!this->follower_free_list_.empty ())
+ {
+ TAO_LF_Follower *follower =
+ this->follower_free_list_.pop_front ();
+ delete follower;
+ }
// Hand the reactor back to the resource factory.
this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
this->reactor_ = 0;
}
-TAO_Leader_Follower::TAO_Follower_Node::TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr)
- : follower_ (follower_ptr),
- next_ (0)
+TAO_LF_Follower *
+TAO_Leader_Follower::allocate_follower (void)
{
+ if (!this->follower_free_list_.empty ())
+ return this->follower_free_list_.pop_front ();
+ return new TAO_LF_Follower (*this);
}
-
-TAO_Leader_Follower::TAO_Follower_Queue::TAO_Follower_Queue (void)
- : head_ (0),
- tail_ (0)
-{
-
-}
-
-int
-TAO_Leader_Follower::TAO_Follower_Queue::insert (TAO_Follower_Node* node)
+void
+TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
{
- if (this->head_ == 0) {
- this->head_ = node;
- this->tail_ = node;
- // Make sure that we don't have garbage in the case when the same node
- // is added a second time. This is necessary as the nodes are on the
- // stack.
- node->next_ = 0;
- }
- else
- {
- // Add the node to the tail and modify the pointers
- TAO_Follower_Node* temp = this->tail_;
- temp->next_ = node;
- this->tail_ = node;
- node->next_ = 0;
- }
- return 0;
+ this->follower_free_list_.push_front (follower);
}
int
-TAO_Leader_Follower::TAO_Follower_Queue::remove (TAO_Follower_Node* node)
-{
- TAO_Follower_Node* prev = 0;
- TAO_Follower_Node* curr = 0;
-
- // No followers in queue, return
- if (this->head_ == 0)
- return -1;
-
- // Check is for whether we have the same condition variable on the
- // queue rather than the same node structure which wraps it.
- for (curr = this->head_;
- curr != 0 && curr->follower_ != node->follower_;
- curr = curr->next_)
- {
- prev = curr;
- }
-
- // Entry not found in the queue
- if (curr == 0)
- return -1;
- // Entry found at the head of the queue
- else if (prev == 0)
- this->head_ = this->head_->next_;
- else
- prev->next_ = curr->next_;
- // Entry at the tail
- if (curr->next_ == 0)
- this->tail_ = prev;
-
- return 0;
-}
-
-
-TAO_SYNCH_CONDITION*
-TAO_Leader_Follower::get_next_follower (void)
+TAO_Leader_Follower::elect_new_leader_i (void)
{
- // If the queue is empty return
- if (this->follower_set_.is_empty())
- return 0;
-
- TAO_Follower_Node* next_follower = this->follower_set_.head_;
-
- TAO_SYNCH_CONDITION *cond = next_follower->follower_;
+ TAO_LF_Follower* follower =
+ this->follower_set_.head ();
#if defined (TAO_DEBUG_LEADER_FOLLOWER)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) LF::get_next_follower - "
+ "TAO (%P|%t) LF::elect_new_leader_i - "
"follower is %x\n",
- cond));
+ follower));
#endif /* TAO_DEBUG_LEADER_FOLLOWER */
- // We *must* remove it when we signal it so the same condition is
- // not signalled for both wake up as a follower and as the next
- // leader.
- // The follower may not be there if the reply is received while the
- // consumer is not yet waiting for it (i.e. it send the request but
- // has not blocked to receive the reply yet)
- (void) this->remove_follower (next_follower); // Ignore errors..
-
- return cond;
+ return follower->signal ();
}
int
@@ -225,77 +168,284 @@ TAO_Leader_Follower::reset_client_thread (void)
}
}
-TAO_LF_Strategy::TAO_LF_Strategy ()
+int
+TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
+ TAO_Transport *transport,
+ ACE_Time_Value *max_wait_time)
{
-}
+ // Obtain the lock.
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
+
+ // Optmize the first iteration [no access to errno]
+ int result = 1;
+
+ {
+ // Calls this->set_client_thread () on construction and
+ // this->reset_client_thread () on destruction.
+ TAO_LF_Client_Thread_Helper client_thread_helper (*this);
+ ACE_UNUSED_ARG (client_thread_helper);
+
+ ACE_Countdown_Time countdown (max_wait_time);
+
+ // Check if there is a leader. Note that it cannot be us since we
+ // gave up our leadership when we became a client.
+ if (this->leader_available ())
+ {
+ // = Wait as a follower.
+
+ // Grab a follower:
+ TAO_LF_Follower_Auto_Ptr follower (*this);
+ if (follower.get () == 0)
+ return -1;
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " (follower) on Transport <%d>, cond <%x>\n",
+ transport->id (),
+ follower.get ()));
+
+ // Bound the follower and the LF_Event, this is important to
+ // get a signal when the event terminates
+ TAO_LF_Event_Binder event_binder (event, follower.get ());
+
+ while (event->keep_waiting () &&
+ this->leader_available ())
+ {
+ // Add ourselves to the list, do it everytime we wake up
+ // from the CV loop. Because:
+ //
+ // - The leader thread could have elected us as the new
+ // leader.
+ // - Before we can assume the role another thread becomes
+ // the leader
+ // - But our condition variable could have been removed
+ // already, if we don't add it again we will never wake
+ // up.
+ //
+ // Notice that we can have spurious wake ups, in that case
+ // adding the leader results in an error, that must be
+ // ignored.
+ // You may be thinking of not removing the condition
+ // variable in the code that sends the signal, but
+ // removing it here, that does not work either, in that
+ // case the condition variable may be used twice:
+ //
+ // - Wake up because its reply arrived
+ // - Wake up because it must become the leader
+ //
+ // but only the first one has any effect, so the leader is
+ // lost.
+ //
+
+ TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
+
+ if (max_wait_time == 0)
+ {
+ if (follower->wait (max_wait_time) == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event, "
+ " (follower) on <%d>"
+ " [no timer, cond failed]\n",
+ transport->id ()));
+
+ // @@ Michael: What is our error handling in this case?
+ // We could be elected as leader and
+ // no leader would come in?
+ return -1;
+ }
+ }
+ else
+ {
+ countdown.update ();
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+ if (follower->wait (&tv) == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait,"
+ " (follower) on <%x> "
+ " [has timer, follower failed]\n",
+ transport->id ()));
+
+ if (!event->successful ())
+ {
+ // Remove follower can fail because either
+ // 1) the condition was satisfied (i.e. reply
+ // received or queue drained), or
+ // 2) somebody elected us as leader, or
+ // 3) the connection got closed.
+ //
+ // Therefore:
+ // If remove_follower fails and the condition
+ // was not satisfied, we know that we got
+ // elected as a leader.
+ // But we got a timeout, so we cannot become
+ // the leader, therefore, we have to select a
+ // new leader.
+ //
+
+ if (this->elect_new_leader () == -1
+ && TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " elect_new_leader failed\n"));
+ }
+ }
+ return -1;
+ }
+ }
+ }
+
+ countdown.update ();
+
+ // @@ Michael: This is an old comment why we do not want to
+ // remove the follower here.
+ // We should not remove the follower here, we *must* remove it when
+ // we signal it so the same condition is not signalled for
+ // both wake up as a follower and as the next leader.
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " done (follower) on <%d>, successful %d\n",
+ transport->id (),
+ event->successful ()));
+
+ // Now somebody woke us up to become a leader or to handle our
+ // input. We are already removed from the follower queue.
+
+ if (event->successful ())
+ return 0;
+
+ if (event->error_detected ())
+ return -1;
+
+ // FALLTHROUGH
+ // We only get here if we woke up but the reply is not
+ // complete yet, time to assume the leader role....
+ // i.e. ACE_ASSERT (event->successful () == 0);
+ }
+
+ // = Leader Code.
+
+ // The only way to reach this point is if we must become the
+ // leader, because there is no leader or we have to update to a
+ // leader or we are doing nested upcalls in this case we do
+ // increase the refcount on the leader in TAO_ORB_Core.
+
+ // Calls this->set_client_leader_thread () on
+ // construction and this->reset_client_leader_thread ()
+ // on destruction. Note that this may increase the refcount of
+ // the leader.
+ TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
+ ACE_UNUSED_ARG (client_leader_thread_helper);
-TAO_LF_Strategy::~TAO_LF_Strategy ()
-{
-}
+ {
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
+ this->reverse_lock (), -1);
-TAO_Complete_LF_Strategy::TAO_Complete_LF_Strategy ()
-{
-}
+ // Become owner of the reactor.
+ ACE_Reactor *reactor = this->reactor_;
+ reactor->owner (ACE_Thread::self ());
-TAO_Complete_LF_Strategy::~TAO_Complete_LF_Strategy ()
-{
-}
+ // Run the reactor event loop.
-void
-TAO_Complete_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &leader_follower)
-{
- leader_follower.set_upcall_thread ();
-}
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " (leader) enter reactor event loop on <%d>\n",
+ transport->id ()));
-int
-TAO_Complete_LF_Strategy::set_event_loop_thread (ACE_Time_Value *max_wait_time,
- TAO_Leader_Follower &leader_follower)
-{
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock (), -1);
+ // If we got our event, no need to run the event loop any
+ // further.
+ while (event->keep_waiting ())
+ {
+ // Run the event loop.
+ result = reactor->handle_events (max_wait_time);
- return leader_follower.set_event_loop_thread (max_wait_time);
-}
+ // Did we timeout? If so, stop running the loop.
+ if (result == 0 &&
+ max_wait_time != 0 &&
+ *max_wait_time == ACE_Time_Value::zero)
+ break;
-void
-TAO_Complete_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int call_reset,
- TAO_Leader_Follower &leader_follower)
-{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ());
+ // Other errors? If so, stop running the loop.
+ if (result == -1)
+ break;
- if (call_reset)
- leader_follower.reset_event_loop_thread ();
+ // Otherwise, keep going...
+ }
- int result = leader_follower.elect_new_leader ();
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " (leader) exit reactor event loop on <%d>\n",
+ transport->id ()));
+ }
+ }
+ //
+ // End artificial scope for auto_ptr like helpers calling:
+ // this->reset_client_thread () and (maybe)
+ // this->reset_client_leader_thread ().
+ //
+
+ // Wake up the next leader, we cannot do that in handle_input,
+ // because the woken up thread would try to get into handle_events,
+ // which is at the time in handle_input still occupied. But do it
+ // before checking the error in <result>, even if there is an error
+ // in our input we should continue running the loop in another
+ // thread.
+
+ if (this->elect_new_leader () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " failed to elect new leader\n"),
+ -1);
if (result == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) Failed to wake up ")
- ACE_TEXT ("a follower thread\n")));
-}
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Leader_Follower::wait_for_event,"
+ " handle_events failed\n"),
+ -1);
-TAO_Null_LF_Strategy::TAO_Null_LF_Strategy ()
-{
+ // Return an error if there was a problem receiving the reply...
+ if (max_wait_time != 0)
+ {
+ if (!event->successful ()
+ && *max_wait_time == ACE_Time_Value::zero)
+ {
+ result = -1;
+ errno = ETIME;
+ }
+ else if (event->error_detected ())
+ {
+ // If the time did not expire yet, but we get a failure,
+ // e.g. the connections closed, we should still return an error.
+ result = -1;
+ }
+ }
+ else
+ {
+ result = 0;
+ if (event->error_detected ())
+ {
+ result = -1;
+ }
+ }
+ return result;
}
-TAO_Null_LF_Strategy::~TAO_Null_LF_Strategy ()
-{
-}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-void
-TAO_Null_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &)
-{
-}
+template class ACE_Intrusive_List<TAO_LF_Follower>;
-int
-TAO_Null_LF_Strategy::set_event_loop_thread (ACE_Time_Value *,
- TAO_Leader_Follower &)
-{
- return 0;
-}
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-void
-TAO_Null_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int,
- TAO_Leader_Follower &)
-{
-}
+#pragma instantiate ACE_Intrusive_List<TAO_LF_Follower>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */