summaryrefslogtreecommitdiff
path: root/TAO/tao/Leader_Follower.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r--TAO/tao/Leader_Follower.cpp422
1 files changed, 272 insertions, 150 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp
index 9bf50ba6fa3..427374ee726 100644
--- a/TAO/tao/Leader_Follower.cpp
+++ b/TAO/tao/Leader_Follower.cpp
@@ -3,6 +3,7 @@
#include "ace/Countdown_Time.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Reactor.h"
+#include "ace/Auto_Ptr.h"
#include "tao/Leader_Follower.h"
#include "tao/LF_Follower_Auto_Ptr.h"
@@ -19,8 +20,25 @@
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+TAO_Leader_Follower::Deferred_Event::Deferred_Event (ACE_Event_Handler* h)
+: eh_ (h)
+{
+ h->add_reference ();
+}
+
+
+ACE_Event_Handler* TAO_Leader_Follower::Deferred_Event::handler () const
+{
+ return this->eh_.handler ();
+}
+
TAO_Leader_Follower::~TAO_Leader_Follower (void)
{
+ while (!this->deferred_event_set_.is_empty ())
+ {
+ Deferred_Event *event = this->deferred_event_set_.pop_front ();
+ delete event;
+ }
while (!this->follower_free_list_.is_empty ())
{
TAO_LF_Follower *follower = this->follower_free_list_.pop_front ();
@@ -185,6 +203,49 @@ TAO_Leader_Follower::reset_client_thread (void)
}
int
+TAO_Leader_Follower::defer_event (ACE_Event_Handler* eh)
+{
+ // Obtain the lock.
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
+
+ if (TAO_debug_level > 7)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Leader_Follower::defer_event, "
+ "deferring event handler[%d]\n",
+ eh->get_handle ()));
+ Deferred_Event* ptr = 0;
+ ACE_NEW_RETURN (ptr,
+ Deferred_Event (eh),
+ -1);
+ this->deferred_event_set_.push_back (ptr);
+ return 0;
+}
+
+void
+TAO_Leader_Follower::resume_events ()
+{
+ // not need to obtain the lock, only called when holding the lock
+ while (!this->deferred_event_set_.is_empty ())
+ {
+ ACE_Auto_Ptr<Deferred_Event> event (this->deferred_event_set_.pop_front ());
+ // Send a notification to the reactor to cause the awakening of a new
+ // follower, if there is one already available.
+ ACE_Reactor *reactor = this->orb_core_->reactor ();
+ int const retval = reactor->notify (event->handler (), ACE_Event_Handler::READ_MASK);
+ if (TAO_debug_level > 2)
+ {
+ // @@todo: need to think about what is the action that
+ // we can take when we get here with an error?!
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::resume_events, ")
+ ACE_TEXT ("an event handler[%d] has been resumed, ")
+ ACE_TEXT ("notified the reactor, retval=%d.\n"),
+ event->handler ()->get_handle (), retval));
+ }
+ }
+}
+
+int
TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
TAO_Transport *transport,
ACE_Time_Value *max_wait_time)
@@ -207,20 +268,32 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
size_t t_id = 0;
if (TAO_debug_level && transport != 0)
- {
- t_id = transport->id ();
- }
-
{
+ t_id = transport->id ();
+ }
+
+ { // Scope #1: All threads inside here are client threads
// Calls this->set_client_thread () on construction and
// this->reset_client_thread () on destruction.
TAO_LF_Client_Thread_Helper client_thread_helper (*this);
ACE_UNUSED_ARG (client_thread_helper);
- // Check if there is a leader. Note that it cannot be us since we
- // gave up our leadership when we became a client.
- if (this->leader_available ())
- {
+ // The loop here is for when we get elected (client) leader and
+ // then later relinquish the leader position and our event has
+ // still not completed (and we haven't run out of time).
+ // All the conditions below are basically the various ways the
+ // leader loop below can end, other than the event being complete
+ while (event->keep_waiting ()
+ && !(result == 0 &&
+ max_wait_time != 0 &&
+ *max_wait_time == ACE_Time_Value::zero)
+ && result != -1)
+ { // Scope #2: threads here alternate between being leader/followers
+
+ // Check if there is a leader. Note that it cannot be us since we
+ // gave up our leadership when we became a client.
+ if (this->leader_available ())
+ { // Scope #3: threads here are followers
// = Wait as a follower.
// Grab a follower:
@@ -240,101 +313,101 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
while (event->keep_waiting () &&
this->leader_available ())
+ { // Scope #4: this loop handles spurious wake-ups
+ // Add ourselves to the list, do it everytime we wake up
+ // from the CV loop. Because:
+ //
+ // - The leader thread could have elected us as the new
+ // leader.
+ // - Before we can assume the role another thread becomes
+ // the leader
+ // - But our condition variable could have been removed
+ // already, if we don't add it again we will never wake
+ // up.
+ //
+ // Notice that we can have spurious wake ups, in that case
+ // adding the leader results in an error, that must be
+ // ignored.
+ // You may be thinking of not removing the condition
+ // variable in the code that sends the signal, but
+ // removing it here, that does not work either, in that
+ // case the condition variable may be used twice:
+ //
+ // - Wake up because its reply arrived
+ // - Wake up because it must become the leader
+ //
+ // but only the first one has any effect, so the leader is
+ // lost.
+ TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
+
+ if (max_wait_time == 0)
{
- // Add ourselves to the list, do it everytime we wake up
- // from the CV loop. Because:
- //
- // - The leader thread could have elected us as the new
- // leader.
- // - Before we can assume the role another thread becomes
- // the leader
- // - But our condition variable could have been removed
- // already, if we don't add it again we will never wake
- // up.
- //
- // Notice that we can have spurious wake ups, in that case
- // adding the leader results in an error, that must be
- // ignored.
- // You may be thinking of not removing the condition
- // variable in the code that sends the signal, but
- // removing it here, that does not work either, in that
- // case the condition variable may be used twice:
- //
- // - Wake up because its reply arrived
- // - Wake up because it must become the leader
- //
- // but only the first one has any effect, so the leader is
- // lost.
- TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
-
- if (max_wait_time == 0)
+ if (follower->wait (max_wait_time) == -1)
{
- if (follower->wait (max_wait_time) == -1)
- {
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
- " (follower) [no timer, cond failed]\n",
- t_id));
-
- // @@ Michael: What is our error handling in this case?
- // We could be elected as leader and
- // no leader would come in?
- return -1;
- }
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
+ " (follower) [no timer, cond failed]\n",
+ t_id));
+
+ // @@ Michael: What is our error handling in this case?
+ // We could be elected as leader and
+ // no leader would come in?
+ return -1;
}
- else
+ }
+ else
+ {
+ countdown.update ();
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+ if (follower->wait (&tv) == -1)
{
- countdown.update ();
- ACE_Time_Value tv = ACE_OS::gettimeofday ();
- tv += *max_wait_time;
- if (follower->wait (&tv) == -1)
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait, "
+ "(follower) [has timer, follower failed]\n",
+ t_id ));
+
+ // If we have timedout set the state in the
+ // LF_Event. We call the non-locking,
+ // no-signalling method on LF_Event.
+ if (errno == ETIME)
+ // We have timedout
+ event->set_state (TAO_LF_Event::LFS_TIMEOUT);
+
+ if (!event->successful ())
+ {
+ // Remove follower can fail because either
+ // 1) the condition was satisfied (i.e. reply
+ // received or queue drained), or
+ // 2) somebody elected us as leader, or
+ // 3) the connection got closed.
+ //
+ // Therefore:
+ // If remove_follower fails and the condition
+ // was not satisfied, we know that we got
+ // elected as a leader.
+ // But we got a timeout, so we cannot become
+ // the leader, therefore, we have to select a
+ // new leader.
+ //
+
+ if (this->elect_new_leader () == -1
+ && TAO_debug_level > 0)
{
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower[%d]::wait, "
- "(follower) [has timer, follower failed]\n",
- t_id ));
-
- // If we have timedout set the state in the
- // LF_Event. We call the non-locking,
- // no-signalling method on LF_Event.
- if (errno == ETIME)
- // We have timedout
- event->set_state (TAO_LF_Event::LFS_TIMEOUT);
-
- if (!event->successful ())
- {
- // Remove follower can fail because either
- // 1) the condition was satisfied (i.e. reply
- // received or queue drained), or
- // 2) somebody elected us as leader, or
- // 3) the connection got closed.
- //
- // Therefore:
- // If remove_follower fails and the condition
- // was not satisfied, we know that we got
- // elected as a leader.
- // But we got a timeout, so we cannot become
- // the leader, therefore, we have to select a
- // new leader.
- //
-
- if (this->elect_new_leader () == -1
- && TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
- "elect_new_leader failed\n",
- t_id ));
- }
- }
-
-
- return -1;
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
+ "elect_new_leader failed\n",
+ t_id ));
}
- }
+ }
+
+
+ return -1;
+ }
}
+ } // End Scope #4: loop to handle spurious wakeups
countdown.update ();
@@ -364,69 +437,118 @@ TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
// We only get here if we woke up but the reply is not
// complete yet, time to assume the leader role....
// i.e. ACE_ASSERT (event->successful () == 0);
- }
+ } // End Scope #3: we are no longer a follower
+
+ // = Leader Code.
+
+ // The only way to reach this point is if we must become the
+ // leader, because there is no leader or we have to update to a
+ // leader or we are doing nested upcalls in this case we do
+ // increase the refcount on the leader in TAO_ORB_Core.
+
+ // Calls this->set_client_leader_thread () on
+ // construction and this->reset_client_leader_thread ()
+ // on destruction. Note that this may increase the refcount of
+ // the leader.
+ { // Scope #5: We are now the client-leader
+ TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
+ ACE_UNUSED_ARG (client_leader_thread_helper);
+
+ { // Scope #6: release the lock via a reverse lock
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
+ this->reverse_lock (), -1);
+
+ // Become owner of the reactor.
+ ACE_Reactor *reactor = this->reactor_;
+ reactor->owner (ACE_Thread::self ());
+
+ // Run the reactor event loop.
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
+ " (leader) enter reactor event loop\n",
+ t_id));
+
+ // If we got our event, no need to run the event loop any
+ // further.
+ while (event->keep_waiting ())
+ {
+ // Run the event loop.
+ result = reactor->handle_events (max_wait_time);
+
+ // Did we timeout? If so, stop running the loop.
+ if (result == 0 &&
+ max_wait_time != 0 &&
+ *max_wait_time == ACE_Time_Value::zero)
+ break;
+
+ // Other errors? If so, stop running the loop.
+ if (result == -1)
+ break;
+
+ // Has an event loop thread become available to take over?
+ // Yes, we are checking this without the lock, however, if
+ // we get a false reading we'll just circle around and
+ // become leader again...
+ if (this->event_loop_threads_waiting_)
+ break;
+ // Did we give up leadership?
+ if (!this->is_client_leader_thread ())
+ break;
+ // Otherwise, keep going...
+ }
- // = Leader Code.
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
+ " (leader) exit reactor event loop\n",
+ t_id));
+ } // End Scope #6: we should now hold the lock again
- // The only way to reach this point is if we must become the
- // leader, because there is no leader or we have to update to a
- // leader or we are doing nested upcalls in this case we do
- // increase the refcount on the leader in TAO_ORB_Core.
+ // End artificial scope for auto_ptr like helpers calling:
+ // this->reset_client_leader_thread ().
- // Calls this->set_client_leader_thread () on
- // construction and this->reset_client_leader_thread ()
- // on destruction. Note that this may increase the refcount of
- // the leader.
- TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
- ACE_UNUSED_ARG (client_leader_thread_helper);
+ } // End Scope #5: we are no longer a client-leader
+ // We only get here if we were the client leader and either our
+ // event completed or an event loop thread has become available to
+ // become leader.
- {
- ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
- this->reverse_lock (), -1);
-
- // Become owner of the reactor.
- ACE_Reactor *reactor = this->reactor_;
- reactor->owner (ACE_Thread::self ());
-
- // Run the reactor event loop.
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
- " (leader) enter reactor event loop\n",
- t_id));
-
- // If we got our event, no need to run the event loop any
- // further.
- while (event->keep_waiting ())
- {
- // Run the event loop.
- result = reactor->handle_events (max_wait_time);
+ // resume any deferred events before we switch to a new leader thread
+ this->resume_events ();
- // Did we timeout? If so, stop running the loop.
- if (result == 0 &&
- max_wait_time != 0 &&
- *max_wait_time == ACE_Time_Value::zero)
- break;
+ // Wake and yield to any event loop threads that may be waiting to
+ // take leadership - otherwise we will just loop around and take
+ // leadership again (because we hold the lock).
+ if (this->event_loop_threads_waiting_ && !this->leader_available ())
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
+ " (client) waking and yielding to allow event thread leadership\n",
+ t_id));
+
+ // Wake up the next leader (in case not yet done)
+ if (this->elect_new_leader () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
+ " failed to elect new leader\n",
+ t_id),
+ -1);
+
+ // Yield, providing the event thread some time to grab leadership
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
+ this->reverse_lock (), -1);
+ ACE_OS::thr_yield ();
+ }
- // Other errors? If so, stop running the loop.
- if (result == -1)
- break;
+ } // End Scope #2: we loop here if our event is incomplete
- // Otherwise, keep going...
- }
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
- " (leader) exit reactor event loop\n",
- t_id));
- }
- }
- //
// End artificial scope for auto_ptr like helpers calling:
- // this->reset_client_thread () and (maybe)
- // this->reset_client_leader_thread ().
- //
+ // this->reset_client_thread ()
+
+ // We should only get here when our event is complete or timed-out
+ } // End Scope #1
// Wake up the next leader, we cannot do that in handle_input,
// because the woken up thread would try to get into handle_events,