summaryrefslogtreecommitdiff
path: root/TAO/tao/Wait_Strategy.cpp
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-04 17:03:52 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-04 17:03:52 +0000
commitf9f0e9cbef284346dd4baf9484c65fba1c046fa1 (patch)
treecc581037c280c97ef1c233d0ce2d3bbb0b89a77f /TAO/tao/Wait_Strategy.cpp
parent6d27093d1ac6828621a28997610ac8d5a4fa9520 (diff)
downloadATCD-f9f0e9cbef284346dd4baf9484c65fba1c046fa1.tar.gz
ChangeLogTag:Wed Aug 4 12:02:45 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/tao/Wait_Strategy.cpp')
-rw-r--r--TAO/tao/Wait_Strategy.cpp361
1 files changed, 331 insertions, 30 deletions
diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp
index 8fb141e5e94..215f60e951e 100644
--- a/TAO/tao/Wait_Strategy.cpp
+++ b/TAO/tao/Wait_Strategy.cpp
@@ -20,7 +20,13 @@ TAO_Wait_Strategy::~TAO_Wait_Strategy (void)
int
TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */,
- int /* two_way */)
+ int /* two_way */)
+{
+ return 0;
+}
+
+ACE_SYNCH_CONDITION *
+TAO_Wait_Strategy::leader_follower_condition_variable (void)
{
return 0;
}
@@ -29,8 +35,8 @@ TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */,
// Constructor.
TAO_Wait_On_Reactor::TAO_Wait_On_Reactor (TAO_Transport *transport)
- : TAO_Wait_Strategy (transport),
- reply_received_ (0)
+ : TAO_Wait_Strategy (transport)
+ // reply_received_ (0)
{
}
@@ -40,7 +46,8 @@ TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void)
}
int
-TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time)
+TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time,
+ int &reply_received)
{
// Reactor does not change inside the loop.
ACE_Reactor* reactor =
@@ -49,8 +56,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time)
// Do the event loop, till we fully receive a reply.
int result = 1; // Optimize the first iteration [no access to errno]
- this->reply_received_ = 0;
- while (this->reply_received_ == 0
+ while (reply_received == 0
&& (result > 0
|| (result == 0
&& max_wait_time != 0
@@ -59,14 +65,14 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time)
result = reactor->handle_events (max_wait_time);
}
- if (result == -1 || this->reply_received_ == -1)
+ if (result == -1 || reply_received == -1)
return -1;
- // Return an error if there was a problem receiving the reply...
+ // Return an error if there was a problem receiving the reply.
if (max_wait_time != 0)
{
- if (this->reply_received_ != 1
- && *max_wait_time == ACE_Time_Value::zero)
+ if (reply_received != 1 &&
+ *max_wait_time == ACE_Time_Value::zero)
{
result = -1;
errno = ETIME;
@@ -75,7 +81,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time)
else
{
result = 0;
- if (this->reply_received_ == -1)
+ if (reply_received == -1)
result = -1;
}
@@ -89,14 +95,14 @@ TAO_Wait_On_Reactor::handle_input (void)
if (result == 1)
{
- this->reply_received_ = 1;
+ // this->reply_received_ = 1;
result = 0;
}
- if (result == -1)
- this->reply_received_ = -1;
+ // if (result == -1)
+ // reply_received = -1;
- return result;
+ return result;
}
// Register the handler with the Reactor.
@@ -110,7 +116,27 @@ TAO_Wait_On_Reactor::register_handler (void)
// Constructor.
TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (TAO_Transport *transport)
- : TAO_Wait_Strategy (transport),
+ : TAO_Wait_Strategy (transport)
+{
+}
+
+// Destructor.
+TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void)
+{
+}
+
+// Register the handler.
+int
+TAO_Wait_On_Leader_Follower::register_handler (void)
+{
+ return this->transport_->register_handler ();
+}
+
+// *********************************************************************
+
+// Constructor.
+TAO_Exclusive_Wait_On_Leader_Follower::TAO_Exclusive_Wait_On_Leader_Follower (TAO_Transport *transport)
+ : TAO_Wait_On_Leader_Follower (transport),
calling_thread_ (ACE_OS::NULL_thread),
cond_response_available_ (0),
expecting_response_ (0),
@@ -119,7 +145,7 @@ TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (TAO_Transport *transpo
}
// Destructor.
-TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void)
+TAO_Exclusive_Wait_On_Leader_Follower::~TAO_Exclusive_Wait_On_Leader_Follower (void)
{
delete this->cond_response_available_;
this->cond_response_available_ = 0;
@@ -130,7 +156,7 @@ TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void)
// with the <Transport> object and <two_way> flag wont make sense
// at this level since this is common for AMI also. (Alex).
int
-TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core,
+TAO_Exclusive_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core,
int two_way)
{
{
@@ -175,7 +201,8 @@ TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core,
}
int
-TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time)
+TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
+ int &)
{
// Cache the ORB core, it won't change and is used multiple times
// below:
@@ -400,7 +427,7 @@ TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time)
// Handle the input. Return -1 on error, 0 on success.
int
-TAO_Wait_On_Leader_Follower::handle_input (void)
+TAO_Exclusive_Wait_On_Leader_Follower::handle_input (void)
{
TAO_ORB_Core* orb_core =
this->transport_->orb_core ();
@@ -460,15 +487,8 @@ TAO_Wait_On_Leader_Follower::handle_input (void)
return result;
}
-// Register the handler.
-int
-TAO_Wait_On_Leader_Follower::register_handler (void)
-{
- return this->transport_->register_handler ();
-}
-
ACE_SYNCH_CONDITION *
-TAO_Wait_On_Leader_Follower::cond_response_available (void)
+TAO_Exclusive_Wait_On_Leader_Follower::cond_response_available (void)
{
// @@ TODO This condition variable should per-ORB-per-thread, not
// per-connection, it is a waste to have more than one of this in
@@ -485,7 +505,7 @@ TAO_Wait_On_Leader_Follower::cond_response_available (void)
}
void
-TAO_Wait_On_Leader_Follower::wake_up (void)
+TAO_Exclusive_Wait_On_Leader_Follower::wake_up (void)
{
if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ()))
{
@@ -534,6 +554,286 @@ TAO_Wait_On_Leader_Follower::wake_up (void)
// *********************************************************************
// Constructor.
+TAO_Muxed_Wait_On_Leader_Follower::TAO_Muxed_Wait_On_Leader_Follower (TAO_Transport *transport)
+ : TAO_Wait_On_Leader_Follower (transport)
+{
+}
+
+// Destructor.
+TAO_Muxed_Wait_On_Leader_Follower::~TAO_Muxed_Wait_On_Leader_Follower (void)
+{
+}
+
+int
+TAO_Muxed_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core,
+ int two_way)
+{
+ // Register the handler.
+ // @@ We could probably move this somewhere else, and remove this
+ // function totally. (Alex).
+ this->transport_->register_handler ();
+
+ // Send the request.
+ return this->TAO_Wait_Strategy::sending_request (orb_core,
+ two_way);
+}
+
+int
+TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
+ int &reply_received)
+{
+ // Cache the ORB core, it won't change and is used multiple times
+ // below:
+ TAO_ORB_Core* orb_core =
+ this->transport_->orb_core ();
+
+ TAO_Leader_Follower& leader_follower =
+ orb_core->leader_follower ();
+
+ // Obtain the lock.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
+ leader_follower.lock (), -1);
+
+ leader_follower.set_client_thread ();
+
+ ACE_Countdown_Time countdown (max_wait_time);
+
+ // Check if there is a leader, but the leader is not us
+ if (leader_follower.leader_available ()
+ && !leader_follower.is_leader_thread ())
+ {
+ // = Wait as a follower.
+
+ // Grab the condtion variable.
+ ACE_SYNCH_CONDITION* cond =
+ orb_core->leader_follower_condition_variable ();
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on Transport <%x>, cond <%x>\n",
+ this->transport_,
+ cond));
+
+ // Add ourselves to the list, do it only once because we can
+ // wake up multiple times from the CV loop. And only do it if
+ // the reply has not been received (it could have arrived while
+ // we were preparing to receive it).
+
+ if (!reply_received
+ && leader_follower.leader_available ())
+ {
+ if (leader_follower.add_follower (cond) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - "
+ "add_follower failed for <%x>\n",
+ cond));
+ }
+
+
+ while (!reply_received &&
+ leader_follower.leader_available ())
+ {
+ if (max_wait_time == 0)
+ {
+ if (cond == 0 || cond->wait () == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on <%x> "
+ "cond == 0 || cond->wait () == -1 : cond = %d\n",
+ this->transport_, (cond == 0) ? 0 : cond));
+ return -1;
+ }
+ }
+ else
+ {
+ countdown.update ();
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+ if (cond == 0 || cond->wait (&tv) == -1)
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on <%x> "
+ "cond == 0 || cond->wait (tv) == -1\n",
+ this->transport_));
+ return -1;
+ }
+ }
+ }
+
+ countdown.update ();
+
+#if 0
+ // Cannot remove the follower here, we *must* remove it when we
+ // signal it so the same condition is not signalled for both
+ // wake up as a follower and as the next leader.
+ if (leader_follower.remove_follower (cond) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - "
+ "remove_follower failed for <%x>\n", cond));
+#endif /* 0 */
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n",
+ this->transport_, reply_received));
+
+ // Now somebody woke us up to become a leader or to handle
+ // our input. We are already removed from the follower queue.
+
+ if (reply_received == 1)
+ return 0;
+
+ // FALLTHROUGH
+ // We only get here if we woke up but the reply is not complete
+ // yet, time to assume the leader role....
+ // i.e. ACE_ASSERT (this->reply_received_ == 0);
+ }
+
+ // = Leader Code.
+
+ // The only way to reach this point is if we must become the leader,
+ // because there is no leader or we have to update to a leader or we
+ // are doing nested upcalls in this case we do increase the refcount
+ // on the leader in TAO_ORB_Core.
+
+ // This might increase the refcount of the leader.
+ leader_follower.set_leader_thread ();
+
+ int result = 1;
+
+ {
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, rev_mon,
+ leader_follower.reverse_lock (), -1);
+
+ // @@ Do we need to do this?
+ // Become owner of the reactor.
+ orb_core->reactor ()->owner (ACE_Thread::self ());
+
+ // Run the reactor event loop.
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n",
+ this->transport_));
+
+ while (result > 0 && reply_received == 0)
+ result = orb_core->reactor ()->handle_events (max_wait_time);
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n",
+ this->transport_));
+ }
+
+ // Wake up the next leader, we cannot do that in handle_input,
+ // because the woken up thread would try to get into
+ // handle_events, which is at the time in handle_input still
+ // occupied. But do it before checking the error in <result>, even
+ // if there is an error in our input we should continue running the
+ // loop in another thread.
+
+ leader_follower.reset_leader_thread ();
+ leader_follower.reset_client_thread ();
+
+ if (leader_follower.elect_new_leader () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::send_request: "
+ "Failed to unset the leader and wake up a new follower.\n"),
+ -1);
+
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::wait: "
+ "handle_events failed.\n"),
+ -1);
+
+ // Return an error if there was a problem receiving the reply...
+ if (max_wait_time != 0)
+ {
+ if (reply_received != 1
+ && *max_wait_time == ACE_Time_Value::zero)
+ {
+ result = -1;
+ errno = ETIME;
+ }
+ }
+ else
+ {
+ result = 0;
+ if (reply_received == -1)
+ {
+ result = -1;
+ }
+ }
+
+ return result;
+}
+
+// Handle the input. Return -1 on error, 0 on success.
+int
+TAO_Muxed_Wait_On_Leader_Follower::handle_input (void)
+{
+ // Cache the ORB core, it won't change and is used multiple times
+ // below:
+ TAO_ORB_Core* orb_core =
+ this->transport_->orb_core ();
+
+ // Obtain the lock.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
+ orb_core->leader_follower ().lock (),
+ -1);
+
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - reading reply on <%x>\n",
+ this->transport_));
+
+ // Receive any data that is available, without blocking...
+ int result = this->transport_->handle_client_input (0);
+
+ // Data was read, but there the reply has not been completely
+ // received...
+ if (result == 0)
+ return 0;
+
+ if (result == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Wait_On_LF::handle_input, "
+ "handle_client_input == -1\n"));
+ // this->reply_received_ = -1;
+ }
+
+ if (result == 1)
+ {
+ // Change the result value to something that the Reactor can
+ // understand
+ result = 0;
+
+ // reply_received_ = 1;
+ // This would have been done by the dispatch already.
+ }
+
+ // Wake up any threads waiting for this message, either because the
+ // message failed or because we really received it.
+ // this->wake_up ();
+ // <wake_up> will be done in the <dispatch_reply>
+
+ return result;
+}
+
+ACE_SYNCH_CONDITION *
+TAO_Muxed_Wait_On_Leader_Follower::leader_follower_condition_variable (void)
+{
+ return this->transport_->orb_core ()->leader_follower_condition_variable ();
+}
+
+// *********************************************************************
+
+// Constructor.
TAO_Wait_On_Read::TAO_Wait_On_Read (TAO_Transport *transport)
: TAO_Wait_Strategy (transport)
{
@@ -546,7 +846,8 @@ TAO_Wait_On_Read::~TAO_Wait_On_Read (void)
// Wait on the read operation.
int
-TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time)
+TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time,
+ int &)
{
int reply_complete = 0;
while (reply_complete != 1)