summaryrefslogtreecommitdiff
path: root/TAO/tao/Leader_Follower.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Leader_Follower.cpp')
-rw-r--r--TAO/tao/Leader_Follower.cpp434
1 files changed, 142 insertions, 292 deletions
diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp
index ab245232d58..342271f87a5 100644
--- a/TAO/tao/Leader_Follower.cpp
+++ b/TAO/tao/Leader_Follower.cpp
@@ -1,65 +1,122 @@
// $Id$
-#include "tao/Leader_Follower.h"
-#include "tao/Resource_Factory.h"
-#include "tao/LF_Follower.h"
-#include "tao/LF_Follower_Auto_Ptr.h"
-#include "tao/LF_Follower_Auto_Adder.h"
-#include "tao/LF_Event.h"
-#include "tao/LF_Event_Binder.h"
-#include "tao/debug.h"
-#include "tao/Transport.h"
+#include "Leader_Follower.h"
+#include "Resource_Factory.h"
#include "ace/Reactor.h"
#if !defined (__ACE_INLINE__)
-# include "tao/Leader_Follower.i"
+# include "Leader_Follower.i"
#endif /* ! __ACE_INLINE__ */
ACE_RCSID(tao, Leader_Follower, "$Id$")
TAO_Leader_Follower::~TAO_Leader_Follower (void)
{
- while (!this->follower_free_list_.empty ())
- {
- TAO_LF_Follower *follower =
- this->follower_free_list_.pop_front ();
- delete follower;
- }
// Hand the reactor back to the resource factory.
this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
this->reactor_ = 0;
}
-TAO_LF_Follower *
-TAO_Leader_Follower::allocate_follower (void)
+TAO_Leader_Follower::TAO_Follower_Node::TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr)
+ : follower_ (follower_ptr),
+ next_ (0)
{
- if (!this->follower_free_list_.empty ())
- return this->follower_free_list_.pop_front ();
- return new TAO_LF_Follower (*this);
}
-void
-TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
+
+TAO_Leader_Follower::TAO_Follower_Queue::TAO_Follower_Queue (void)
+ : head_ (0),
+ tail_ (0)
+{
+
+}
+
+int
+TAO_Leader_Follower::TAO_Follower_Queue::insert (TAO_Follower_Node* node)
{
- this->follower_free_list_.push_front (follower);
+ if (this->head_ == 0) {
+ this->head_ = node;
+ this->tail_ = node;
+ // Make sure that we don't have garbage in the case when the same node
+ // is added a second time. This is necessary as the nodes are on the
+ // stack.
+ node->next_ = 0;
+ }
+ else
+ {
+ // Add the node to the tail and modify the pointers
+ TAO_Follower_Node* temp = this->tail_;
+ temp->next_ = node;
+ this->tail_ = node;
+ node->next_ = 0;
+ }
+ return 0;
}
int
-TAO_Leader_Follower::elect_new_leader_i (void)
+TAO_Leader_Follower::TAO_Follower_Queue::remove (TAO_Follower_Node* node)
+{
+ TAO_Follower_Node* prev = 0;
+ TAO_Follower_Node* curr = 0;
+
+ // No followers in queue, return
+ if (this->head_ == 0)
+ return -1;
+
+ // Check is for whether we have the same condition variable on the
+ // queue rather than the same node structure which wraps it.
+ for (curr = this->head_;
+ curr != 0 && curr->follower_ != node->follower_;
+ curr = curr->next_)
+ {
+ prev = curr;
+ }
+
+ // Entry not found in the queue
+ if (curr == 0)
+ return -1;
+ // Entry found at the head of the queue
+ else if (prev == 0)
+ this->head_ = this->head_->next_;
+ else
+ prev->next_ = curr->next_;
+ // Entry at the tail
+ if (curr->next_ == 0)
+ this->tail_ = prev;
+
+ return 0;
+}
+
+
+TAO_SYNCH_CONDITION*
+TAO_Leader_Follower::get_next_follower (void)
{
- TAO_LF_Follower* follower =
- this->follower_set_.head ();
+ // If the queue is empty return
+ if (this->follower_set_.is_empty())
+ return 0;
+
+ TAO_Follower_Node* next_follower = this->follower_set_.head_;
+
+ TAO_SYNCH_CONDITION *cond = next_follower->follower_;
#if defined (TAO_DEBUG_LEADER_FOLLOWER)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) LF::elect_new_leader_i - "
+ "TAO (%P|%t) LF::get_next_follower - "
"follower is %x\n",
- follower));
+ cond));
#endif /* TAO_DEBUG_LEADER_FOLLOWER */
- return follower->signal ();
+ // We *must* remove it when we signal it so the same condition is
+ // not signalled for both wake up as a follower and as the next
+ // leader.
+ // The follower may not be there if the reply is received while the
+ // consumer is not yet waiting for it (i.e. it send the request but
+ // has not blocked to receive the reply yet)
+ (void) this->remove_follower (next_follower); // Ignore errors..
+
+ return cond;
}
int
@@ -168,284 +225,77 @@ TAO_Leader_Follower::reset_client_thread (void)
}
}
-int
-TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
- TAO_Transport *transport,
- ACE_Time_Value *max_wait_time)
+TAO_LF_Strategy::TAO_LF_Strategy ()
{
- // Obtain the lock.
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
-
- // Optmize the first iteration [no access to errno]
- int result = 1;
-
- {
- // Calls this->set_client_thread () on construction and
- // this->reset_client_thread () on destruction.
- TAO_LF_Client_Thread_Helper client_thread_helper (*this);
- ACE_UNUSED_ARG (client_thread_helper);
-
- ACE_Countdown_Time countdown (max_wait_time);
-
- // Check if there is a leader. Note that it cannot be us since we
- // gave up our leadership when we became a client.
- if (this->leader_available ())
- {
- // = Wait as a follower.
-
- // Grab a follower:
- TAO_LF_Follower_Auto_Ptr follower (*this);
- if (follower.get () == 0)
- return -1;
-
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " (follower) on Transport <%d>, cond <%x>\n",
- transport->id (),
- follower.get ()));
-
- // Bound the follower and the LF_Event, this is important to
- // get a signal when the event terminates
- TAO_LF_Event_Binder event_binder (event, follower.get ());
-
- while (event->keep_waiting () &&
- this->leader_available ())
- {
- // Add ourselves to the list, do it everytime we wake up
- // from the CV loop. Because:
- //
- // - The leader thread could have elected us as the new
- // leader.
- // - Before we can assume the role another thread becomes
- // the leader
- // - But our condition variable could have been removed
- // already, if we don't add it again we will never wake
- // up.
- //
- // Notice that we can have spurious wake ups, in that case
- // adding the leader results in an error, that must be
- // ignored.
- // You may be thinking of not removing the condition
- // variable in the code that sends the signal, but
- // removing it here, that does not work either, in that
- // case the condition variable may be used twice:
- //
- // - Wake up because its reply arrived
- // - Wake up because it must become the leader
- //
- // but only the first one has any effect, so the leader is
- // lost.
- //
-
- TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
-
- if (max_wait_time == 0)
- {
- if (follower->wait (max_wait_time) == -1)
- {
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait_for_event, "
- " (follower) on <%d>"
- " [no timer, cond failed]\n",
- transport->id ()));
-
- // @@ Michael: What is our error handling in this case?
- // We could be elected as leader and
- // no leader would come in?
- return -1;
- }
- }
- else
- {
- countdown.update ();
- ACE_Time_Value tv = ACE_OS::gettimeofday ();
- tv += *max_wait_time;
- if (follower->wait (&tv) == -1)
- {
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait,"
- " (follower) on <%x> "
- " [has timer, follower failed]\n",
- transport->id ()));
-
- if (!event->successful ())
- {
- // Remove follower can fail because either
- // 1) the condition was satisfied (i.e. reply
- // received or queue drained), or
- // 2) somebody elected us as leader, or
- // 3) the connection got closed.
- //
- // Therefore:
- // If remove_follower fails and the condition
- // was not satisfied, we know that we got
- // elected as a leader.
- // But we got a timeout, so we cannot become
- // the leader, therefore, we have to select a
- // new leader.
- //
-
- if (this->elect_new_leader () == -1
- && TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " elect_new_leader failed\n"));
- }
- }
- return -1;
- }
- }
- }
-
- countdown.update ();
-
- // @@ Michael: This is an old comment why we do not want to
- // remove the follower here.
- // We should not remove the follower here, we *must* remove it when
- // we signal it so the same condition is not signalled for
- // both wake up as a follower and as the next leader.
-
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " done (follower) on <%d>, successful %d\n",
- transport->id (),
- event->successful ()));
-
- // Now somebody woke us up to become a leader or to handle our
- // input. We are already removed from the follower queue.
-
- if (event->successful ())
- return 0;
-
- if (event->error_detected ())
- return -1;
-
- // FALLTHROUGH
- // We only get here if we woke up but the reply is not
- // complete yet, time to assume the leader role....
- // i.e. ACE_ASSERT (event->successful () == 0);
- }
-
- // = Leader Code.
-
- // The only way to reach this point is if we must become the
- // leader, because there is no leader or we have to update to a
- // leader or we are doing nested upcalls in this case we do
- // increase the refcount on the leader in TAO_ORB_Core.
-
- // Calls this->set_client_leader_thread () on
- // construction and this->reset_client_leader_thread ()
- // on destruction. Note that this may increase the refcount of
- // the leader.
- TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
- ACE_UNUSED_ARG (client_leader_thread_helper);
+}
- {
- ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
- this->reverse_lock (), -1);
+TAO_LF_Strategy::~TAO_LF_Strategy ()
+{
+}
- // Become owner of the reactor.
- ACE_Reactor *reactor = this->reactor_;
- reactor->owner (ACE_Thread::self ());
+TAO_Complete_LF_Strategy::TAO_Complete_LF_Strategy ()
+{
+}
- // Run the reactor event loop.
+TAO_Complete_LF_Strategy::~TAO_Complete_LF_Strategy ()
+{
+}
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " (leader) enter reactor event loop on <%d>\n",
- transport->id ()));
+void
+TAO_Complete_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &leader_follower)
+{
+ leader_follower.set_upcall_thread ();
+}
- // If we got our event, no need to run the event loop any
- // further.
- while (event->keep_waiting ())
- {
- // Run the event loop.
- result = reactor->handle_events (max_wait_time);
+int
+TAO_Complete_LF_Strategy::set_event_loop_thread (ACE_Time_Value *max_wait_time,
+ TAO_Leader_Follower &leader_follower)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock (), -1);
- // Did we timeout? If so, stop running the loop.
- if (result == 0 &&
- max_wait_time != 0 &&
- *max_wait_time == ACE_Time_Value::zero)
- break;
+ return leader_follower.set_event_loop_thread (max_wait_time);
+}
- // Other errors? If so, stop running the loop.
- if (result == -1)
- break;
+void
+TAO_Complete_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int call_reset,
+ TAO_Leader_Follower &leader_follower)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ());
- // Otherwise, keep going...
- }
+ if (call_reset)
+ leader_follower.reset_event_loop_thread ();
- if (TAO_debug_level >= 5)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " (leader) exit reactor event loop on <%d>\n",
- transport->id ()));
- }
- }
- //
- // End artificial scope for auto_ptr like helpers calling:
- // this->reset_client_thread () and (maybe)
- // this->reset_client_leader_thread ().
- //
-
- // Wake up the next leader, we cannot do that in handle_input,
- // because the woken up thread would try to get into handle_events,
- // which is at the time in handle_input still occupied. But do it
- // before checking the error in <result>, even if there is an error
- // in our input we should continue running the loop in another
- // thread.
-
- if (this->elect_new_leader () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " failed to elect new leader\n"),
- -1);
+ int result = leader_follower.elect_new_leader ();
if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - Leader_Follower::wait_for_event,"
- " handle_events failed\n"),
- -1);
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) Failed to wake up ")
+ ACE_TEXT ("a follower thread\n")));
+}
- // Return an error if there was a problem receiving the reply...
- if (max_wait_time != 0)
- {
- if (!event->successful ()
- && *max_wait_time == ACE_Time_Value::zero)
- {
- result = -1;
- errno = ETIME;
- }
- else if (event->error_detected ())
- {
- // If the time did not expire yet, but we get a failure,
- // e.g. the connections closed, we should still return an error.
- result = -1;
- }
- }
- else
- {
- result = 0;
- if (event->error_detected ())
- {
- result = -1;
- }
- }
- return result;
+TAO_Null_LF_Strategy::TAO_Null_LF_Strategy ()
+{
}
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+TAO_Null_LF_Strategy::~TAO_Null_LF_Strategy ()
+{
+}
-template class ACE_Intrusive_List<TAO_LF_Follower>;
+void
+TAO_Null_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &)
+{
+}
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+int
+TAO_Null_LF_Strategy::set_event_loop_thread (ACE_Time_Value *,
+ TAO_Leader_Follower &)
+{
+ return 0;
+}
-#pragma instantiate ACE_Intrusive_List<TAO_LF_Follower>
+void
+TAO_Null_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int,
+ TAO_Leader_Follower &)
+{
+}
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */