diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-04 17:03:52 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-04 17:03:52 +0000 |
commit | f9f0e9cbef284346dd4baf9484c65fba1c046fa1 (patch) | |
tree | cc581037c280c97ef1c233d0ce2d3bbb0b89a77f /TAO/tao/Wait_Strategy.cpp | |
parent | 6d27093d1ac6828621a28997610ac8d5a4fa9520 (diff) | |
download | ATCD-f9f0e9cbef284346dd4baf9484c65fba1c046fa1.tar.gz |
ChangeLogTag:Wed Aug 4 12:02:45 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/tao/Wait_Strategy.cpp')
-rw-r--r-- | TAO/tao/Wait_Strategy.cpp | 361 |
1 files changed, 331 insertions, 30 deletions
diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 8fb141e5e94..215f60e951e 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -20,7 +20,13 @@ TAO_Wait_Strategy::~TAO_Wait_Strategy (void) int TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */, - int /* two_way */) + int /* two_way */) +{ + return 0; +} + +ACE_SYNCH_CONDITION * +TAO_Wait_Strategy::leader_follower_condition_variable (void) { return 0; } @@ -29,8 +35,8 @@ TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */, // Constructor. TAO_Wait_On_Reactor::TAO_Wait_On_Reactor (TAO_Transport *transport) - : TAO_Wait_Strategy (transport), - reply_received_ (0) + : TAO_Wait_Strategy (transport) + // reply_received_ (0) { } @@ -40,7 +46,8 @@ TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void) } int -TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time) +TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time, + int &reply_received) { // Reactor does not change inside the loop. ACE_Reactor* reactor = @@ -49,8 +56,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time) // Do the event loop, till we fully receive a reply. int result = 1; // Optimize the first iteration [no access to errno] - this->reply_received_ = 0; - while (this->reply_received_ == 0 + while (reply_received == 0 && (result > 0 || (result == 0 && max_wait_time != 0 @@ -59,14 +65,14 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time) result = reactor->handle_events (max_wait_time); } - if (result == -1 || this->reply_received_ == -1) + if (result == -1 || reply_received == -1) return -1; - // Return an error if there was a problem receiving the reply... + // Return an error if there was a problem receiving the reply. if (max_wait_time != 0) { - if (this->reply_received_ != 1 - && *max_wait_time == ACE_Time_Value::zero) + if (reply_received != 1 && + *max_wait_time == ACE_Time_Value::zero) { result = -1; errno = ETIME; @@ -75,7 +81,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time) else { result = 0; - if (this->reply_received_ == -1) + if (reply_received == -1) result = -1; } @@ -89,14 +95,14 @@ TAO_Wait_On_Reactor::handle_input (void) if (result == 1) { - this->reply_received_ = 1; + // this->reply_received_ = 1; result = 0; } - if (result == -1) - this->reply_received_ = -1; + // if (result == -1) + // reply_received = -1; - return result; + return result; } // Register the handler with the Reactor. @@ -110,7 +116,27 @@ TAO_Wait_On_Reactor::register_handler (void) // Constructor. TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (TAO_Transport *transport) - : TAO_Wait_Strategy (transport), + : TAO_Wait_Strategy (transport) +{ +} + +// Destructor. +TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void) +{ +} + +// Register the handler. +int +TAO_Wait_On_Leader_Follower::register_handler (void) +{ + return this->transport_->register_handler (); +} + +// ********************************************************************* + +// Constructor. +TAO_Exclusive_Wait_On_Leader_Follower::TAO_Exclusive_Wait_On_Leader_Follower (TAO_Transport *transport) + : TAO_Wait_On_Leader_Follower (transport), calling_thread_ (ACE_OS::NULL_thread), cond_response_available_ (0), expecting_response_ (0), @@ -119,7 +145,7 @@ TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (TAO_Transport *transpo } // Destructor. -TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void) +TAO_Exclusive_Wait_On_Leader_Follower::~TAO_Exclusive_Wait_On_Leader_Follower (void) { delete this->cond_response_available_; this->cond_response_available_ = 0; @@ -130,7 +156,7 @@ TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void) // with the <Transport> object and <two_way> flag wont make sense // at this level since this is common for AMI also. (Alex). int -TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, +TAO_Exclusive_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, int two_way) { { @@ -175,7 +201,8 @@ TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, } int -TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time) +TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, + int &) { // Cache the ORB core, it won't change and is used multiple times // below: @@ -400,7 +427,7 @@ TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time) // Handle the input. Return -1 on error, 0 on success. int -TAO_Wait_On_Leader_Follower::handle_input (void) +TAO_Exclusive_Wait_On_Leader_Follower::handle_input (void) { TAO_ORB_Core* orb_core = this->transport_->orb_core (); @@ -460,15 +487,8 @@ TAO_Wait_On_Leader_Follower::handle_input (void) return result; } -// Register the handler. -int -TAO_Wait_On_Leader_Follower::register_handler (void) -{ - return this->transport_->register_handler (); -} - ACE_SYNCH_CONDITION * -TAO_Wait_On_Leader_Follower::cond_response_available (void) +TAO_Exclusive_Wait_On_Leader_Follower::cond_response_available (void) { // @@ TODO This condition variable should per-ORB-per-thread, not // per-connection, it is a waste to have more than one of this in @@ -485,7 +505,7 @@ TAO_Wait_On_Leader_Follower::cond_response_available (void) } void -TAO_Wait_On_Leader_Follower::wake_up (void) +TAO_Exclusive_Wait_On_Leader_Follower::wake_up (void) { if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ())) { @@ -534,6 +554,286 @@ TAO_Wait_On_Leader_Follower::wake_up (void) // ********************************************************************* // Constructor. +TAO_Muxed_Wait_On_Leader_Follower::TAO_Muxed_Wait_On_Leader_Follower (TAO_Transport *transport) + : TAO_Wait_On_Leader_Follower (transport) +{ +} + +// Destructor. +TAO_Muxed_Wait_On_Leader_Follower::~TAO_Muxed_Wait_On_Leader_Follower (void) +{ +} + +int +TAO_Muxed_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, + int two_way) +{ + // Register the handler. + // @@ We could probably move this somewhere else, and remove this + // function totally. (Alex). + this->transport_->register_handler (); + + // Send the request. + return this->TAO_Wait_Strategy::sending_request (orb_core, + two_way); +} + +int +TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, + int &reply_received) +{ + // Cache the ORB core, it won't change and is used multiple times + // below: + TAO_ORB_Core* orb_core = + this->transport_->orb_core (); + + TAO_Leader_Follower& leader_follower = + orb_core->leader_follower (); + + // Obtain the lock. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + leader_follower.lock (), -1); + + leader_follower.set_client_thread (); + + ACE_Countdown_Time countdown (max_wait_time); + + // Check if there is a leader, but the leader is not us + if (leader_follower.leader_available () + && !leader_follower.is_leader_thread ()) + { + // = Wait as a follower. + + // Grab the condtion variable. + ACE_SYNCH_CONDITION* cond = + orb_core->leader_follower_condition_variable (); + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on Transport <%x>, cond <%x>\n", + this->transport_, + cond)); + + // Add ourselves to the list, do it only once because we can + // wake up multiple times from the CV loop. And only do it if + // the reply has not been received (it could have arrived while + // we were preparing to receive it). + + if (!reply_received + && leader_follower.leader_available ()) + { + if (leader_follower.add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - " + "add_follower failed for <%x>\n", + cond)); + } + + + while (!reply_received && + leader_follower.leader_available ()) + { + if (max_wait_time == 0) + { + if (cond == 0 || cond->wait () == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on <%x> " + "cond == 0 || cond->wait () == -1 : cond = %d\n", + this->transport_, (cond == 0) ? 0 : cond)); + return -1; + } + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (cond == 0 || cond->wait (&tv) == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on <%x> " + "cond == 0 || cond->wait (tv) == -1\n", + this->transport_)); + return -1; + } + } + } + + countdown.update (); + +#if 0 + // Cannot remove the follower here, we *must* remove it when we + // signal it so the same condition is not signalled for both + // wake up as a follower and as the next leader. + if (leader_follower.remove_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - " + "remove_follower failed for <%x>\n", cond)); +#endif /* 0 */ + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n", + this->transport_, reply_received)); + + // Now somebody woke us up to become a leader or to handle + // our input. We are already removed from the follower queue. + + if (reply_received == 1) + return 0; + + // FALLTHROUGH + // We only get here if we woke up but the reply is not complete + // yet, time to assume the leader role.... + // i.e. ACE_ASSERT (this->reply_received_ == 0); + } + + // = Leader Code. + + // The only way to reach this point is if we must become the leader, + // because there is no leader or we have to update to a leader or we + // are doing nested upcalls in this case we do increase the refcount + // on the leader in TAO_ORB_Core. + + // This might increase the refcount of the leader. + leader_follower.set_leader_thread (); + + int result = 1; + + { + ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, rev_mon, + leader_follower.reverse_lock (), -1); + + // @@ Do we need to do this? + // Become owner of the reactor. + orb_core->reactor ()->owner (ACE_Thread::self ()); + + // Run the reactor event loop. + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n", + this->transport_)); + + while (result > 0 && reply_received == 0) + result = orb_core->reactor ()->handle_events (max_wait_time); + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n", + this->transport_)); + } + + // Wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into + // handle_events, which is at the time in handle_input still + // occupied. But do it before checking the error in <result>, even + // if there is an error in our input we should continue running the + // loop in another thread. + + leader_follower.reset_leader_thread (); + leader_follower.reset_client_thread (); + + if (leader_follower.elect_new_leader () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::send_request: " + "Failed to unset the leader and wake up a new follower.\n"), + -1); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::wait: " + "handle_events failed.\n"), + -1); + + // Return an error if there was a problem receiving the reply... + if (max_wait_time != 0) + { + if (reply_received != 1 + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + } + else + { + result = 0; + if (reply_received == -1) + { + result = -1; + } + } + + return result; +} + +// Handle the input. Return -1 on error, 0 on success. +int +TAO_Muxed_Wait_On_Leader_Follower::handle_input (void) +{ + // Cache the ORB core, it won't change and is used multiple times + // below: + TAO_ORB_Core* orb_core = + this->transport_->orb_core (); + + // Obtain the lock. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + orb_core->leader_follower ().lock (), + -1); + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - reading reply on <%x>\n", + this->transport_)); + + // Receive any data that is available, without blocking... + int result = this->transport_->handle_client_input (0); + + // Data was read, but there the reply has not been completely + // received... + if (result == 0) + return 0; + + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "handle_client_input == -1\n")); + // this->reply_received_ = -1; + } + + if (result == 1) + { + // Change the result value to something that the Reactor can + // understand + result = 0; + + // reply_received_ = 1; + // This would have been done by the dispatch already. + } + + // Wake up any threads waiting for this message, either because the + // message failed or because we really received it. + // this->wake_up (); + // <wake_up> will be done in the <dispatch_reply> + + return result; +} + +ACE_SYNCH_CONDITION * +TAO_Muxed_Wait_On_Leader_Follower::leader_follower_condition_variable (void) +{ + return this->transport_->orb_core ()->leader_follower_condition_variable (); +} + +// ********************************************************************* + +// Constructor. TAO_Wait_On_Read::TAO_Wait_On_Read (TAO_Transport *transport) : TAO_Wait_Strategy (transport) { @@ -546,7 +846,8 @@ TAO_Wait_On_Read::~TAO_Wait_On_Read (void) // Wait on the read operation. int -TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time) +TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, + int &) { int reply_complete = 0; while (reply_complete != 1) |