diff options
Diffstat (limited to 'TAO/tao/Synch_Invocation.cpp')
-rw-r--r-- | TAO/tao/Synch_Invocation.cpp | 148 |
1 files changed, 107 insertions, 41 deletions
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp index a2c6b49bce4..0825460449c 100644 --- a/TAO/tao/Synch_Invocation.cpp +++ b/TAO/tao/Synch_Invocation.cpp @@ -25,16 +25,17 @@ namespace TAO } Invocation_Status - Synch_Twoway_Invocation::remote_twoway (Argument **args, - int args_number - ACE_ENV_ARG_DECL) + Synch_Twoway_Invocation::remote_twoway (ACE_Time_Value *max_wait_time + ACE_ENV_SINGLE_ARG_DECL) { + ACE_Countdown_Time countdown (max_wait_time); + TAO_Synch_Reply_Dispatcher rd (this->resolver_.stub ()->orb_core (), - this->detail_.reply_service_info ()); + this->details_.reply_service_info ()); // Register a reply dispatcher for this invocation. Use the // preallocated reply dispatcher. - TAO_Bind_Dispatcher_Guard dispatch_guard (this->detail_.request_id (), + TAO_Bind_Dispatcher_Guard dispatch_guard (this->details_.request_id (), &rd, this->resolver_.transport ()->tms ()); @@ -61,22 +62,25 @@ namespace TAO ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); - this->marshal_data (args, - args_number, - cdr + this->marshal_data (cdr ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + countdown.update (); + Invocation_Status s = this->send_message (cdr, - TAO_Transport::TAO_TWOWAY_REQUEST + TAO_Transport::TAO_TWOWAY_REQUEST, + max_wait_time ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (s); if (s != TAO_INVOKE_SUCCESS) return s; + countdown.update (); + // @@ In all MT environments, there's a cancellation point lurking // here; need to investigate. Client threads would frequently be // canceled sometime during recv_request ... the correct action to @@ -94,22 +98,90 @@ namespace TAO // (explicitly coded) handlers called. We assume a POSIX.1c/C/C++ // environment. - this->resolver_.transport ()->wait_strategy ()->wait (0, - rd); + s = + this->wait_for_reply (max_wait_time, + rd, + dispatch_guard + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + - return this->check_reply_status (rd, - args, - args_number + return this->check_reply_status (rd ACE_ENV_ARG_PARAMETER); } + Invocation_Status + Synch_Twoway_Invocation::wait_for_reply (ACE_Time_Value *max_wait_time, + TAO_Synch_Reply_Dispatcher &rd, + TAO_Bind_Dispatcher_Guard &bd + ACE_ENV_ARG_DECL) + { + int reply_error = + this->resolver_.transport ()->wait_strategy ()->wait (max_wait_time, + rd); + if (TAO_debug_level > 0 && max_wait_time != 0) + { + CORBA::ULong msecs = max_wait_time->msec (); + + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, " + "timeout after recv is <%u> status <%d>\n", + msecs, + reply_error)); + } + + // Check the reply error. + if (reply_error == -1) + { + // Unbind the dispatcher, since its of no use at this point of + // time + if (TAO_debug_level > 3) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply , " + "recovering after an error \n")); + + if (errno == ETIME) + { + // If the unbind succeeds then thrown an exception to the + // application, else just collect the reply and dispatch that to the + // application. + // NOTE: A fragile synchronization is provided when using the Muxed + // Transport strategy. We could infact be a follower thread getting + // timedout in the LF whereas the dispatching thread could be + // on the reply_dispatcher that we created. This would lead bad + // crashes. To get around that, the call to unbind_dispatcher () + // will wait on the lock on the Muxed_Transport_Strategy if + // dispatching has started. This is fragile. + if (bd.unbind_dispatcher () == 0) + { + // Just a timeout, don't close the connection or + // anything... + ACE_THROW_RETURN (CORBA::TIMEOUT ( + CORBA::SystemException::_tao_minor_code ( + TAO_TIMEOUT_SEND_MINOR_CODE, + errno), + CORBA::COMPLETED_NO), + TAO_INVOKE_FAILURE); + } + } + + this->resolver_.transport ()->close_connection (); + this->resolver_.stub ()->reset_profiles (); + + ACE_THROW_RETURN (CORBA::COMM_FAILURE ( + CORBA::SystemException::_tao_minor_code ( + TAO_INVOCATION_RECV_REQUEST_MINOR_CODE, + errno), + CORBA::COMPLETED_MAYBE), + TAO_INVOKE_FAILURE); + } + return TAO_INVOKE_SUCCESS; + } Invocation_Status - Synch_Twoway_Invocation::check_reply_status (TAO_Synch_Reply_Dispatcher &rd, - Argument **args, - int args_number + Synch_Twoway_Invocation::check_reply_status (TAO_Synch_Reply_Dispatcher &rd ACE_ENV_ARG_DECL) { // Grab the reply CDR @@ -129,12 +201,9 @@ namespace TAO switch (rd.reply_status ()) { case TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION: - for (i = 0; i != args_number; ++i) - { - if (!((*args[i]).demarshal (rd.reply_cdr ()))) - ACE_THROW_RETURN (CORBA::MARSHAL (), - TAO_INVOKE_FAILURE); - } + if (this->details_.demarshal_args (rd.reply_cdr ()) == false) + ACE_THROW_RETURN (CORBA::MARSHAL (), + TAO_INVOKE_FAILURE); break; case TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD: // Handle the forwarding and return so the stub restarts the @@ -204,8 +273,6 @@ namespace TAO return TAO_INVOKE_OK; } // NOTREACHED. - - case TAO_PLUGGABLE_MESSAGE_NEEDS_ADDRESSING_MODE: { // We have received an exception with a request to change the @@ -241,9 +308,7 @@ namespace TAO ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { - CORBA::Object_var forward_reference; - - if ((inp_stream >> forward_reference.out ()) == 0) + if ((inp_stream >> this->forwarded_to_.out ()) == 0) { ACE_THROW_RETURN (CORBA::MARSHAL (), TAO_INVOKE_FAILURE); @@ -251,7 +316,8 @@ namespace TAO // The object pointer has to be changed to a TAO_Stub pointer // in order to obtain the profiles. - TAO_Stub *stubobj = forward_reference->_stubobj (); + TAO_Stub *stubobj = + this->forwarded_to_->_stubobj (); if (stubobj == 0) ACE_THROW_RETURN (CORBA::INTERNAL (), @@ -290,7 +356,7 @@ namespace TAO } CORBA::Exception *exception = - this->detail_.corba_exception (buf.in () + this->details_.corba_exception (buf.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); @@ -335,21 +401,20 @@ namespace TAO Invocation_Status - Synch_Oneway_Invocation::remote_oneway (Argument **args, - int args_number + Synch_Oneway_Invocation::remote_oneway (ACE_Time_Value *max_wait_time ACE_ENV_ARG_DECL) { + ACE_Countdown_Time countdown (max_wait_time); + const CORBA::Octet response_flags = - this->detail_.response_flags (); + this->details_.response_flags (); if (response_flags == CORBA::Octet (Messaging::SYNC_WITH_SERVER) || response_flags == CORBA::Octet (Messaging::SYNC_WITH_TARGET)) - return Synch_Twoway_Invocation::remote_twoway (args, - args_number + return Synch_Twoway_Invocation::remote_twoway (max_wait_time ACE_ENV_ARG_PARAMETER); - TAO_Target_Specification tspec; this->init_target_spec (tspec ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); @@ -362,22 +427,23 @@ namespace TAO ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); - this->marshal_data (args, - args_number, - cdr + this->marshal_data (cdr ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + countdown.update (); if (response_flags == CORBA::Octet (Messaging::SYNC_WITH_TRANSPORT)) { return this->send_message (cdr, - TAO_Transport::TAO_TWOWAY_REQUEST + TAO_Transport::TAO_TWOWAY_REQUEST, + max_wait_time ACE_ENV_ARG_PARAMETER); } return this->send_message (cdr, - TAO_Transport::TAO_ONEWAY_REQUEST + TAO_Transport::TAO_ONEWAY_REQUEST, + max_wait_time ACE_ENV_ARG_PARAMETER); } |