diff options
Diffstat (limited to 'TAO/tao/Connect.cpp')
-rw-r--r-- | TAO/tao/Connect.cpp | 392 |
1 files changed, 195 insertions, 197 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp index fd9dc8b87d5..60010eb3937 100644 --- a/TAO/tao/Connect.cpp +++ b/TAO/tao/Connect.cpp @@ -579,128 +579,128 @@ TAO_Client_Connection_Handler::send_request (TAO_OutputCDR &stream, if (!is_twoway) { int success = (int) TAO_GIOP::send_request (this, stream); - + if (!success) return -1; } else // is_twoway - { - if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire() == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to get the lock.\n"), - -1); - - // Set the state so that we know we're looking for a response. - this->expecting_response_ = 1; - // remember in which thread the client connection handler was running - this->calling_thread_ = ACE_Thread::self (); - - - int success = (int) TAO_GIOP::send_request (this, stream); - - if (!success) { - TAO_ORB_Core_instance ()->leader_follower_lock ().release (); - return -1; - } - - // check if there is a leader, but the leader is not us - if (TAO_ORB_Core_instance ()->leader_available () - && !TAO_ORB_Core_instance ()->I_am_the_leader_thread ()) - { - // wait as long as no input is available and/or - // no leader is available - while (!this->input_available_ - && TAO_ORB_Core_instance ()->leader_available ()) - { - if (TAO_ORB_Core_instance ()->add_follower (this->cond_response_available_) == -1) - ACE_ERROR ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to add a follower thread\n")); - this->cond_response_available_->wait (); - } - // now somebody woke us up to become a leader or - // to handle our input. We are already removed from the - // follower queue - if (this->input_available_) - { - // there is input waiting for me - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to release the lock.\n"), - -1); - - int ret = 0; - if (ret < 0) - { - ACE_DEBUG ((LM_DEBUG, - "Client_Connection_Handler::send_request: (%d) " - "failure faking handle_input\n", - ACE_Thread::self ())); - TAO_ORB_Core_instance ()->reactor ()->remove_handler (this, + if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire() == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to get the lock.\n"), + -1); + + // Set the state so that we know we're looking for a response. + this->expecting_response_ = 1; + // remember in which thread the client connection handler was running + this->calling_thread_ = ACE_Thread::self (); + + + int success = (int) TAO_GIOP::send_request (this, stream); + + if (!success) + { + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + return -1; + } + + // check if there is a leader, but the leader is not us + if (TAO_ORB_Core_instance ()->leader_available () + && !TAO_ORB_Core_instance ()->I_am_the_leader_thread ()) + { + // wait as long as no input is available and/or + // no leader is available + while (!this->input_available_ + && TAO_ORB_Core_instance ()->leader_available ()) + { + if (TAO_ORB_Core_instance ()->add_follower (this->cond_response_available_) == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to add a follower thread\n")); + this->cond_response_available_->wait (); + } + // now somebody woke us up to become a leader or + // to handle our input. We are already removed from the + // follower queue + if (this->input_available_) + { + // there is input waiting for me + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to release the lock.\n"), + -1); + + int ret = 0; + if (ret < 0) + { + ACE_DEBUG ((LM_DEBUG, + "Client_Connection_Handler::send_request: (%d) " + "failure faking handle_input\n", + ACE_Thread::self ())); + TAO_ORB_Core_instance ()->reactor ()->remove_handler (this, ACE_Event_Handler::ALL_EVENTS_MASK); - // failure handling - return -1; - } - /* else if (ret > 0) - // we have to reschedule, not implemented yet - */ - - // the following variables are safe, because we are not registered with - // the reactor any more. - this->input_available_ = 0; - this->expecting_response_ = 0; - this->calling_thread_ = 0; - return 0; - } + // failure handling + return -1; + } + /* else if (ret > 0) + // we have to reschedule, not implemented yet + */ + + // the following variables are safe, because we are not registered with + // the reactor any more. + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = 0; + return 0; + } + } + + // become a leader, because there is no leader or we have to update to a leader + // or we are doing nested upcalls in this case we do increase the refcount + // on the leader in TAO_ORB_Core. + + TAO_ORB_Core_instance ()->set_leader_thread (); + // this might increase the recount of the leader + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to release the lock.\n"), + -1); + + ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); + r->owner (ACE_Thread::self ()); + + int ret = 0; + + while (ret != -1 && !this->input_available_) + ret = r->handle_events (); + + if (ret == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "handle_events failed.\n"), + -1); + + + // wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into handle_events, + // which is at the time in handle_input still occupied. + + if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to unset the leader and wake up a new follower.\n"), + -1); + + // Make use reusable + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = 0; } - - // become a leader, because there is no leader or we have to update to a leader - // or we are doing nested upcalls in this case we do increase the refcount - // on the leader in TAO_ORB_Core. - - TAO_ORB_Core_instance ()->set_leader_thread (); - // this might increase the recount of the leader - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to release the lock.\n"), - -1); - - ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); - r->owner (ACE_Thread::self ()); - - int ret = 0; - - while (ret != -1 && !this->input_available_) - ret = r->handle_events (); - - if (ret == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "handle_events failed.\n"), - -1); - - - // wake up the next leader, we cannot do that in handle_input, - // because the woken up thread would try to get into handle_events, - // which is at the time in handle_input still occupied. - - if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to unset the leader and wake up a new follower.\n"), - -1); - - // Make use reusable - this->input_available_ = 0; - this->expecting_response_ = 0; - this->calling_thread_ = 0; - } - + return 0; } @@ -708,31 +708,31 @@ int TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) { if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to get the lock.\n"), - -1); - + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to get the lock.\n"), + -1); + if (!this->expecting_response_) - { - // we got something, but did not want - // @@ wake up an other thread, we are lost - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - - // We're a client, so we're not expecting to see input. Still - // we better check what it is! - char ignored; - ssize_t ret; - + { + // we got something, but did not want + // @@ wake up an other thread, we are lost + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + + // We're a client, so we're not expecting to see input. Still + // we better check what it is! + char ignored; + ssize_t ret; + ret = this->peer().recv_n (&ignored, sizeof ignored); - + switch (ret) - { + { case -1: // Error...but we weren't expecting input, either...what // should we do? @@ -740,7 +740,7 @@ TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) "Client_Connection_Handler::handle_input: closing connection on fd %d\n", this->peer().get_handle ())); break; - + case 1: // We weren't expecting input, so what should we do with it? // Log an error, and close the connection. @@ -749,76 +749,74 @@ TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) "input while not expecting a response; closing connection on fd %d\n", this->peer().get_handle ())); break; - + case 0: // This is an EOF, so we will return -1 and let // handle_close() take over. As long as handle_close() // calls the Svc_Handler<>::handle_close(), the socket will // be shutdown properly. break; - } - + } + // We're not expecting input at this time, so we'll always // return -1 for now. return -1; - } - - if (this->calling_thread_ == ACE_Thread::self ()) - { - // we are now a leader getting its response - // or a follower faking the handle_input + } - this->input_available_ = 1; - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - - TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); - // resume_handler is called in TAO_GIOP_Invocation::invoke - - return 0; - } + if (this->calling_thread_ == ACE_Thread::self ()) + { + // we are now a leader getting its response + // or a follower faking the handle_input + + this->input_available_ = 1; + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + + TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); + // resume_handler is called in TAO_GIOP_Invocation::invoke + + return 0; + } else - { - // we are a leader, which got a response for one of the followers, - // which means we are now a thread running the wrong Client_Connection_Handler - - TAO_ORB_Core_instance ()->remove_follower (this->cond_response_available_); - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - - - TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); - // @@ Michael: we should wake suspend the thread before we wake him up. - // resume_handler is called in TAO_GIOP_Invocation::invoke - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to acquire the lock.\n"), - -1); - - // the thread was already selected to become a leader, - // so we will be called again. - this->input_available_ = 1; - this->cond_response_available_->signal (); - - if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - - - return 0; - } + { + // we are a leader, which got a response for one of the followers, + // which means we are now a thread running the wrong Client_Connection_Handler + + TAO_ORB_Core_instance ()->remove_follower (this->cond_response_available_); + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + + + TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); + // @@ Michael: we should wake suspend the thread before we wake him up. + // resume_handler is called in TAO_GIOP_Invocation::invoke + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to acquire the lock.\n"), + -1); + + // the thread was already selected to become a leader, + // so we will be called again. + this->input_available_ = 1; + this->cond_response_available_->signal (); + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::handle_input: " + "Failed to release the lock.\n"), + -1); + return 0; + } } int |