From 74e7c9754bcffca5dcdd73608c17b3112f2ca7bf Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 25 May 1999 23:43:50 +0000 Subject: Took over another strategy (MT) from the connection handlers on to the Wait Strategies. Got tht MT-Client test to work with the new architecture. --- TAO/tao/Wait_Strategy.cpp | 348 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 324 insertions(+), 24 deletions(-) diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 453c4101a2b..5f926765a3d 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -54,21 +54,25 @@ TAO_Wait_On_Reactor::handle_input (void) // another sub event loop is in effect still waiting for its // response, it doesn't spin tightly gobbling up CPU. result = this->transport_->suspend_handler (); + if (result != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:TAO_Wait_On_Reactor::handle_input:%p\n", + "suspend_handler failed"), + -1); // Ask the Transport object to read the input without blocking. result = this->transport_->handle_client_input (0); - - // If message is not read fully, resume the handler. - if (result == 0) - result = this->transport_->resume_handler (); - - if (result != 0) + if (result == -1) + return -1; + + // Resume the handler. + if (this->transport_->resume_handler () == -1) ACE_ERROR_RETURN ((LM_ERROR, - "TAO: %N:%l:TAO_Wait_On_Reactor::handle_input:%p\n", + "TAO:%N:%l:TAO_Wait_On_Reactor::handle_input:%p\n", "resume_handler failed"), -1); - return 0; + return result; } // Register the handler with the Reactor. @@ -88,23 +92,169 @@ TAO_Wait_On_Reactor::resume_handler (ACE_Reactor *reactor) // Constructor. TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (TAO_Transport *transport) - : TAO_Wait_Strategy (transport) + : TAO_Wait_Strategy (transport), + calling_thread_ (ACE_OS::NULL_thread), + cond_response_available_ (0), + expecting_response_ (0), + input_available_ (0) { } // Destructor. TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void) { + delete this->cond_response_available_; + this->cond_response_available_ = 0; } int TAO_Wait_On_Leader_Follower::wait (void) { + // @@ Do we need this code? (Alex). + + int result = 0; + +#if 0 + ACE_Reactor *r = this->transport_->orb_core ()->reactor (); + + if (this->handler_reactor () != r) + { + ACE_Reactor_Mask mask = + ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; + this->reactor ()->remove_handler (this, mask); + + r->register_handler (this, + ACE_Event_Handler::READ_MASK); + } +#endif /* 0 */ + + // Obtain the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().acquire() == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " + "Failed to get the lock.\n"), + -1); + + // Set the state so that we know we're looking for a response. + this->expecting_response_ = 1; + + // Remember in which thread the client connection handler was running + this->calling_thread_ = ACE_Thread::self (); + + // Check if there is a leader, but the leader is not us + if (this->transport_->orb_core ()->leader_available () && + !this->transport_->orb_core ()->I_am_the_leader_thread ()) + { + // = Wait as a follower. + + // wait as long as no input is available and/or + // no leader is available + while (!this->input_available_ && + this->transport_->orb_core ()->leader_available ()) + { + ACE_SYNCH_CONDITION* cond = + this->cond_response_available (); + if (this->transport_->orb_core ()->add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " + "Failed to add a follower thread\n")); + cond->wait (); + } + + // Now somebody woke us up to become a leader or to handle + // our input. We are already removed from the follower queue. + if (this->input_available_) + { + // There is input waiting for me. + + /// Release the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: " + "Failed to release the lock.\n"), + -1); + + // The following variables are safe, because we are not + // registered with the reactor any more. + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = ACE_OS::NULL_thread; + + // Ready to receive the input message. + // @@ Is it ok to read it blockingly. (Alex). + result = this->transport_->handle_client_input (1); + + // Resume the handler. + if (this->transport_->resume_handler () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: " + " failed.\n"), + -1); + + // We should have read the whole message. + if (result != 1) + ACE_ERROR_RETURN ((LM_WARNING, + "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " + "Input message was not read fully.\n"), + -1); + return 0; + } + } + + // Become a leader, because there is no leader or we have to + // update to a leader or we are doing nested upcalls in this + // case we do increase the refcount on the leader in + // TAO_ORB_Core. + + // This might increase the refcount of the leader. + this->transport_->orb_core ()->set_leader_thread (); + + // Release the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: " + "Failed to release the lock.\n"), + -1); + + // Become owner of the reactor. + this->transport_->orb_core ()->reactor ()->owner (ACE_Thread::self ()); + + // Run the reactor event loop. + + result = 0; + + while (result != -1 && !this->input_available_) + result = this->transport_->orb_core ()->reactor ()->handle_events (); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: " + "handle_events failed.\n"), + -1); + + // Wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into + // handle_events, which is at the time in handle_input still + // occupied. + + if (this->transport_->orb_core ()->unset_leader_wake_up_follower () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Client_Connection_Handler::send_request: " + "Failed to unset the leader and wake up a new follower.\n"), + -1); + + // Make use reusable + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = ACE_OS::NULL_thread; + + return 0; + #if 0 // Grab leader follower lock. ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, - this->orb_core_->leader_follower_lock (), + this->transport_->orb_core ()->leader_follower_lock (), -1)); // Bind the reply handler. @@ -114,17 +264,18 @@ TAO_Wait_On_Leader_Follower::wait (void) rrs->bind (this->request_id, &rh); // Check if we need to become the leader. - if (!this->orb_core_->leader_available ()) + if (!this->transport_->orb_core ()->leader_available ()) { // This might increase the refcount of the leader. - this->orb_core_->set_leader_thread (); + this->transport_->orb_core ()->set_leader_thread (); // Do the reactor event loop. - this->orb_core_->reactor ()->owner (ACE_Thread::self ()); + this->transport_->orb_core ()->reactor ()->owner (ACE_Thread::self ()); + + result = 0; - int result = 0; while (result != -1) - result = this->orb_core_->reactor ()->handle_events (); + result = this->transport_->orb_core ()->reactor ()->handle_events (); if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -136,23 +287,141 @@ TAO_Wait_On_Leader_Follower::wait (void) { // Block on condition variable. ACE_SYNCH_CONDITION* cond = - this->cond_response_available (orb_core); - if (this->orb_core_->add_follower (cond) == -1) + this->cond_response_available (); + if (this->transport_->orb_core ()->add_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "%N:%l:(%P|%t) TAO_Wait_On_Leader_Follower::wait: " "Failed to add a follower thread\n")); cond->wait (); } #endif /* 0 */ - return 0; } -// Handle the input. Delegate this job to Transport object. Before -// that, suspend the handler in the Reactor. +// Handle the input. +// I am returning -1 on error, 1 on fully read message, 0 on +// paritially complete read. int TAO_Wait_On_Leader_Follower::handle_input (void) { - return -1; + int result = 0; + + // Obtain the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::handle_input: " + "Failed to get the lock.\n"), + -1); + +#if 0 + // @@ Later (Alex). + + if (!this->expecting_response_) + { + // we got something, but did not want. + // @@ wake up an other thread, we are lost + + if (this->orb_core_->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + return this->check_unexpected_data (); + } +#endif /* 0 */ + + if (ACE_OS::thr_equal (this->calling_thread_, + ACE_Thread::self ())) + { + // We are now a leader getting its response. + + // Set the flag on. + this->input_available_ = 1; + + // Release the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + + // Suspend the handler. is called in + // TAO_GIOP_Invocation::invoke + if (this->transport_->suspend_handler () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + " failed\n"), + -1); + + // Ask the transport object to read the message. But we should + // not block on receiving the whole reply. + + // @@ Are the states such as message_size and message_offset + // thread safe, in the Transport object? (Alex). + result = this->transport_->handle_client_input (0); + + // Resume the handler. + if (this->transport_->resume_handler () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + " failed\n"), + -1); + + // Return the result. + return result; + } + else + { + // We are a leader, which got a response for one of the + // followers, which means we are now a thread running the wrong + // Client_Connection_Handler + + // At this point we might fail to remove the follower, because + // it has been already chosen to become the leader, so it is + // awake and will get this too. + ACE_SYNCH_CONDITION* cond = + this->cond_response_available (); + + this->transport_->orb_core ()->remove_follower (cond); + + // Release the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + "Failed to release the lock.\n"), + -1); + + // We should wake suspend the thread before we wake him up. + // resume_handler is called in TAO_GIOP_Invocation::invoke. + if (this->transport_->suspend_handler () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + " failed\n"), + -1); + + // @@ TODO (Michael): We might be able to optimize this in + // doing the suspend_handler as last thing, but I am not sure + // if a race condition would occur. + + // Obtain the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + "Failed to acquire the lock.\n"), + -1); + + // The thread was already selected to become a leader, so we + // will be called again. + this->input_available_ = 1; + cond->signal (); + + // Release the lock. + if (this->transport_->orb_core ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::handle_input: " + "Failed to release the lock.\n"), + -1); + return 0; + } } // Register the handler. @@ -166,7 +435,38 @@ TAO_Wait_On_Leader_Follower::register_handler (TAO_IIOP_Handler_Base *) int TAO_Wait_On_Leader_Follower::resume_handler (ACE_Reactor *reactor) { - return -1; + return this->transport_->resume_handler (); +} + +// Send the request in . +// @@ Why do I need here, when I have . +int +TAO_Wait_On_Leader_Follower::send_request (TAO_ORB_Core* /* orb_core */, + TAO_OutputCDR &stream) +{ + // Send the request. + int success = (int) TAO_GIOP::send_request (this->transport_, + stream, + this->transport_->orb_core ()); + if (!success) + return -1; + else + return 0; +} + +ACE_SYNCH_CONDITION * +TAO_Wait_On_Leader_Follower::cond_response_available (void) +{ + // @@ TODO This condition variable should per-ORB-per-thread, not + // per-connection, it is a waste to have more than one of this in + // the same thread. + if (this->cond_response_available_ == 0) + { + ACE_NEW_RETURN (this->cond_response_available_, + ACE_SYNCH_CONDITION (this->transport_->orb_core ()->leader_follower_lock ()), + 0); + } + return this->cond_response_available_; } // ********************************************************************* @@ -200,11 +500,11 @@ TAO_Wait_On_Read::wait (void) } // Handle the input. Delegate this job to Transport object. -// Resume the connection handler. No-op. Returns 0. int TAO_Wait_On_Read::handle_input (void) { - return -1; + // Block to get the whole message. + return this->transport_->handle_client_input (1); } // No-op. -- cgit v1.2.1