summaryrefslogtreecommitdiff
path: root/TAO/tao/Wait_On_Leader_Follower.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Wait_On_Leader_Follower.cpp')
-rw-r--r--TAO/tao/Wait_On_Leader_Follower.cpp340
1 files changed, 334 insertions, 6 deletions
diff --git a/TAO/tao/Wait_On_Leader_Follower.cpp b/TAO/tao/Wait_On_Leader_Follower.cpp
index 422bc57b541..d614abbafbc 100644
--- a/TAO/tao/Wait_On_Leader_Follower.cpp
+++ b/TAO/tao/Wait_On_Leader_Follower.cpp
@@ -3,9 +3,8 @@
#include "tao/Wait_On_Leader_Follower.h"
#include "tao/ORB_Core.h"
#include "tao/Leader_Follower.h"
-#include "tao/Transport.h"
-#include "tao/Synch_Reply_Dispatcher.h"
#include "tao/debug.h"
+#include "Transport.h"
ACE_RCSID(tao, Wait_On_Leader_Follower, "$Id$")
@@ -48,11 +47,340 @@ TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core,
int
TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
- TAO_Synch_Reply_Dispatcher &rd)
+ int &reply_received)
{
+ // Cache the ORB core, it won't change and is used multiple times
+ // below:
+ TAO_ORB_Core* orb_core =
+ this->transport_->orb_core ();
+
+ TAO_Leader_Follower& leader_follower =
+ orb_core->leader_follower ();
+
+ // Obtain the lock.
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon,
+ leader_follower.lock (), -1);
+
+ // Optmize the first iteration [no access to errno]
+ int result = 1;
+
+ //
+ // Begin artificial scope for auto_ptr like helpers calling:
+ // leader_follower.set_client_thread () and (maybe later on)
+ // leader_follower.set_client_leader_thread ().
+ //
+ {
+ // Calls leader_follower.set_client_thread () on construction and
+ // leader_follower.reset_client_thread () on destruction.
+ TAO_LF_Client_Thread_Helper client_thread_helper (leader_follower);
+ ACE_UNUSED_ARG (client_thread_helper);
+
+ ACE_Countdown_Time countdown (max_wait_time);
+
+ // Check if there is a leader. Note that it cannot be us since we
+ // gave up our leadership when we became a client.
+ if (leader_follower.leader_available ())
+ {
+ // = Wait as a follower.
+
+ // Grab the condtion variable.
+ TAO_SYNCH_CONDITION* cond =
+ orb_core->leader_follower_condition_variable ();
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - wait (follower) on Transport <%x>, cond <%x>\n"),
+ this->transport_,
+ cond));
+
+ // Keep the entry on the stack
+ TAO_Leader_Follower::TAO_Follower_Node node(cond);
+
+ while (!reply_received &&
+ leader_follower.leader_available ())
+ {
+ // Add ourselves to the list, do it everytime we wake up
+ // from the CV loop. Because:
+ //
+ // - The leader thread could have elected us as the new
+ // leader.
+ // - Before we can assume the role another thread becomes
+ // the leader
+ // - But our condition variable could have been removed
+ // already, if we don't add it again we will never wake
+ // up.
+ //
+ // Notice that we can have spurious wake ups, in that case
+ // adding the leader results in an error, that must be
+ // ignored.
+ // You may be thinking of not removing the condition
+ // variable in the code that sends the signal, but
+ // removing it here, that does not work either, in that
+ // case the condition variable may be used to:
+ // - Wake up because its reply arrived
+ // - Wake up because it must become the leader
+ // but only the first one has any effect, so the leader is
+ // lost.
+ //
+
+ (void) leader_follower.add_follower (&node);
+
+ if (max_wait_time == 0)
+ {
+ if (cond == 0 || cond->wait () == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - wait (follower) on <%x> ")
+ ACE_TEXT ("cond == 0 || cond->wait () == -1 : cond = %d\n"),
+ this->transport_, (cond == 0) ? 0 : cond));
+
+ // @@ Michael: What is our error handling in this case?
+ // We could be elected as leader and no leader would come in?
+ return -1;
+ }
+ }
+ else
+ {
+ countdown.update ();
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+ if (cond == 0 || cond->wait (&tv) == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - wait (follower) on <%x> ")
+ ACE_TEXT ("cond == 0 || cond->wait (tv) == -1\n"),
+ this->transport_));
+
+ if (leader_follower.remove_follower (&node) == -1
+ && reply_received == 0)
+ {
+ // Remove follower can fail because either
+ // 1) the reply arrived, or
+ // 2) somebody elected us as leader, or
+ // 3) the connection got closed.
+ //
+ // reply_received is 1, if the reply arrived.
+ // reply_received is 0, if the reply did not arrive yet.
+ // reply_received is -1, if the connection got closed
+ //
+ // Therefore:
+ // If remove_follower fails and reply_received is 0, we know that
+ // we got elected as a leader. As we cannot be the leader (remember
+ // we got a timeout), we have to select a new leader.
+ //
+ // ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - "
+ // "We got elected as leader, but have timeout\n"));
+
+ if (leader_follower.elect_new_leader () == -1)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - "
+ "elect_new_leader failed\n"));
+
+ }
+ return -1;
+ }
+ }
+ }
+
+ countdown.update ();
+
+ // @@ Michael: This is an old comment why we do not want to
+ // remove the follower here.
+ // We should not remove the follower here, we *must* remove it when
+ // we signal it so the same condition is not signalled for
+ // both wake up as a follower and as the next leader.
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - done (follower) "
+ "on <%x>, reply_received %d\n"),
+ this->transport_, reply_received));
+
+ // Now somebody woke us up to become a leader or to handle our
+ // input. We are already removed from the follower queue.
+
+ if (reply_received == 1)
+ return 0;
+
+ // FALLTHROUGH
+ // We only get here if we woke up but the reply is not
+ // complete yet, time to assume the leader role....
+ // i.e. ACE_ASSERT (reply_received == 0);
+ }
+
+ // = Leader Code.
+
+ // The only way to reach this point is if we must become the
+ // leader, because there is no leader or we have to update to a
+ // leader or we are doing nested upcalls in this case we do
+ // increase the refcount on the leader in TAO_ORB_Core.
+
+ // Calls leader_follower.set_client_leader_thread () on
+ // construction and leader_follower.reset_client_leader_thread ()
+ // on destruction. Note that this may increase the refcount of
+ // the leader.
+ TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (leader_follower);
+ ACE_UNUSED_ARG (client_leader_thread_helper);
+
+ {
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
+ leader_follower.reverse_lock (), -1);
+
+ // Become owner of the reactor.
+ ACE_Reactor *reactor = orb_core->reactor ();
+ reactor->owner (ACE_Thread::self ());
+
+ // Run the reactor event loop.
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n"),
+ this->transport_));
+
+ // If we got our reply, no need to run the event loop any
+ // further.
+ while (!reply_received)
+ {
+ // Run the event loop.
+ result = reactor->handle_events (max_wait_time);
+
+ // Did we timeout? If so, stop running the loop.
+ if (result == 0 &&
+ max_wait_time != 0 &&
+ *max_wait_time == ACE_Time_Value::zero)
+ break;
+
+ // Other errors? If so, stop running the loop.
+ if (result == -1)
+ break;
+
+ // Otherwise, keep going...
+ }
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n"),
+ this->transport_));
+ }
+ }
+ //
+ // End artificial scope for auto_ptr like helpers calling:
+ // leader_follower.reset_client_thread () and (maybe)
+ // leader_follower.reset_client_leader_thread ().
+ //
+
+ // Wake up the next leader, we cannot do that in handle_input,
+ // because the woken up thread would try to get into handle_events,
+ // which is at the time in handle_input still occupied. But do it
+ // before checking the error in <result>, even if there is an error
+ // in our input we should continue running the loop in another
+ // thread.
+
+ if (leader_follower.elect_new_leader () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t):TAO_Wait_On_Leader_Follower::send_request: ")
+ ACE_TEXT ("Failed to unset the leader and wake up a new follower.\n")),
+ -1);
+
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t):TAO_Wait_On_Leader_Follower::wait: ")
+ ACE_TEXT ("handle_events failed.\n")),
+ -1);
+
+ // Return an error if there was a problem receiving the reply...
+ if (max_wait_time != 0)
+ {
+ if (reply_received != 1
+ && *max_wait_time == ACE_Time_Value::zero)
+ {
+ result = -1;
+ errno = ETIME;
+ }
+ else if (reply_received == -1)
+ {
+ // If the time did not expire yet, but we get a failure,
+ // e.g. the connections closed, we should still return an error.
+ result = -1;
+ }
+ }
+ else
+ {
+ result = 0;
+ if (reply_received == -1)
+ {
+ result = -1;
+ }
+ }
+
+ return result;
+}
+
+TAO_SYNCH_CONDITION *
+TAO_Wait_On_Leader_Follower::leader_follower_condition_variable (void)
+{
+ return this->transport_->orb_core ()->leader_follower_condition_variable ();
+}
+
+int
+TAO_Wait_On_Leader_Follower::reply_dispatched (int &reply_received_flag,
+ TAO_SYNCH_CONDITION *condition)
+{
+ if (condition == 0)
+ return 0;
+
+ TAO_Leader_Follower& leader_follower =
+ this->transport_->orb_core ()->leader_follower ();
+
+ // Obtain the lock.
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon,
+ leader_follower.lock (),
+ -1);
+
+ reply_received_flag = 1;
+
+ // The following works as the node is assumed to be on the stack
+ // till the thread is alive.
+ TAO_Leader_Follower::TAO_Follower_Node node (condition);
+
+ // We *must* remove it when we signal it so the same condition
+ // is not signalled for both wake up as a follower and as the
+ // next leader.
+ // The follower may not be there if the reply is received while
+ // the consumer is not yet waiting for it (i.e. it send the
+ // request but has not blocked to receive the reply yet).
+ // Ignore errors.
+ (void) leader_follower.remove_follower (&node);
+
+ if (condition->signal () == -1)
+ return -1;
+
+ return 0;
+}
+
+void
+TAO_Wait_On_Leader_Follower::connection_closed (int &reply_received_flag,
+ TAO_SYNCH_CONDITION *condition)
+{
+ if (condition == 0)
+ return;
+
TAO_Leader_Follower& leader_follower =
this->transport_->orb_core ()->leader_follower ();
- return leader_follower.wait_for_event (&rd,
- this->transport_,
- max_wait_time);
+
+ // Obtain the lock.
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ());
+
+ reply_received_flag = -1;
+
+ // The following works as the node is assumed to be on the stack
+ // till the thread is alive.
+ TAO_Leader_Follower::TAO_Follower_Node node(condition);
+
+ (void) leader_follower.remove_follower (&node);
+
+ (void) condition->signal ();
}