diff options
author | stanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2013-02-05 21:11:03 +0000 |
---|---|---|
committer | stanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2013-02-05 21:11:03 +0000 |
commit | 5e030faf84086ab02059fcbcc3faed224bd57b95 (patch) | |
tree | 3a62df45ac6ccf599fb07cf6a03d672456ce2e3d /TAO/tao/Synch_Invocation.cpp | |
parent | 9d296f7fa51116ff7040ecb2ad18612cd94b5fd1 (diff) | |
download | ATCD-5e030faf84086ab02059fcbcc3faed224bd57b95.tar.gz |
Merge in OCI_Reliability_Enhancements branch.
Diffstat (limited to 'TAO/tao/Synch_Invocation.cpp')
-rw-r--r-- | TAO/tao/Synch_Invocation.cpp | 340 |
1 files changed, 220 insertions, 120 deletions
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp index 40064362ca6..71ef977e300 100644 --- a/TAO/tao/Synch_Invocation.cpp +++ b/TAO/tao/Synch_Invocation.cpp @@ -1,6 +1,7 @@ // $Id$ #include "tao/Synch_Invocation.h" +#include "tao/Invocation_Retry_State.h" #include "tao/Profile_Transport_Resolver.h" #include "tao/Profile.h" #include "tao/Synch_Reply_Dispatcher.h" @@ -42,9 +43,16 @@ namespace TAO resolver, detail, response_expected) + , retry_state_ (0) { } + void + Synch_Twoway_Invocation::set_retry_state (Invocation_Retry_State *retry_state) + { + this->retry_state_ = retry_state; + } + Invocation_Status Synch_Twoway_Invocation::remote_twoway (ACE_Time_Value *max_wait_time) { @@ -79,10 +87,25 @@ namespace TAO if (!transport) { - // Way back, we failed to find a profile we could connect to. - // We've come this far only so we reach the interception points - // in case they can fix things. Time to bail.... - throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); + if (this->retry_state_ && + this->retry_state_->forward_on_exception_increment(FOE_TRANSIENT)) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("remote_twoway retrying on TRANSIENT ") + ACE_TEXT ("exception\n"))); + this->retry_state_->next_profile_retry (*this->stub ()); + return TAO_INVOKE_RESTART; + } + else + { + // Way back, we failed to find a profile we could connect to. + // We've come this far only so we reach the interception points + // in case they can fix things. Time to bail.... + throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); + } + } ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, @@ -259,8 +282,8 @@ namespace TAO if (TAO_debug_level > 0 && max_wait_time) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, " - "timeout after recv is <%u> status <%d>\n", + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, ") + ACE_TEXT ("timeout after recv is <%u> status <%d>\n"), max_wait_time->msec (), reply_error)); } @@ -273,8 +296,9 @@ namespace TAO if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, " - "recovering after an error\n")); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("wait_for_reply, ") + ACE_TEXT ("recovering after an error\n"))); } // You the smarty, don't try to moving the unbind_dispatcher @@ -312,6 +336,19 @@ namespace TAO (void) bd.unbind_dispatcher (); this->resolver_.transport ()->close_connection (); + if (this->retry_state_ && + this->resolver_.transport ()->connection_closed_on_read() && + this->retry_state_->forward_on_reply_closed_increment ()) + { + if (TAO_debug_level > 4) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("wait_for_reply, forward profile on ") + ACE_TEXT ("connection closed\n"))); + this->retry_state_->next_profile_retry (*this->stub ()); + return TAO_INVOKE_RESTART; + } + try { return @@ -322,7 +359,11 @@ namespace TAO } catch (const ::CORBA::Exception&) { - this->resolver_.stub ()->reset_profiles (); + if (this->retry_state_ == 0 || + !this->retry_state_->forward_on_exception_limit_used ()) + { + this->resolver_.stub ()->reset_profiles (); + } throw; } } @@ -386,8 +427,9 @@ namespace TAO // permanent condition not given if (TAO_debug_level > 3) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::" - "check_reply_status: unexpected LOCATION_FORWARD_PERM reply\n")); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("check_reply_status: unexpected ") + ACE_TEXT ("LOCATION_FORWARD_PERM reply\n"))); throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO); } @@ -439,8 +481,8 @@ namespace TAO if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::location_forward ") - ACE_TEXT ("being handled\n"))); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("location_forward being handled\n"))); } CORBA::Object_var fwd; @@ -469,8 +511,8 @@ namespace TAO if (TAO_debug_level > 3) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::" - "handle_user_exception\n")); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("handle_user_exception\n"))); // Pull the exception from the stream. CORBA::String_var buf; @@ -513,8 +555,8 @@ namespace TAO if (TAO_debug_level > 3) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::" - "handle_system_exception\n")); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("handle_system_exception\n"))); CORBA::String_var type_id; @@ -533,40 +575,80 @@ namespace TAO throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE); } + bool retry_on_exception = false; bool do_forward = false; - int foe_kind = this->stub ()->orb_core ()->orb_params ()->forward_once_exception(); - - if ((CORBA::CompletionStatus) completion != CORBA::COMPLETED_YES - && (((foe_kind & TAO::FOE_TRANSIENT) == 0 - && ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0) || - ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0 || - ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/NO_RESPONSE:1.0") == 0 || - ((foe_kind & TAO::FOE_COMM_FAILURE) == 0 - && ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0) || - (this->stub ()->orb_core ()->orb_params ()->forward_invocation_on_object_not_exist () - && ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0) || - (do_forward = ! this->stub ()->forwarded_on_exception () - && ((((foe_kind & TAO::FOE_OBJECT_NOT_EXIST) == TAO::FOE_OBJECT_NOT_EXIST) - && (ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0)) || - (((foe_kind & TAO::FOE_COMM_FAILURE) == TAO::FOE_COMM_FAILURE) - && (ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)) || - (((foe_kind & TAO::FOE_TRANSIENT) == TAO::FOE_TRANSIENT) - && (ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)) || - (((foe_kind & TAO::FOE_INV_OBJREF) == TAO::FOE_INV_OBJREF) - && (ACE_OS_String::strcmp (type_id.in (), - "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0)))))) + + const TAO_ORB_Parameters *orb_params = this->stub ()->orb_core ()->orb_params (); + + if (this->retry_state_ && + this->retry_state_->forward_on_exception_limit_used () && + (CORBA::CompletionStatus) completion == CORBA::COMPLETED_NO) + { + if ((ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0 && + this->retry_state_->forward_on_exception_increment (TAO::FOE_TRANSIENT)) || + (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0 && + this->retry_state_->forward_on_exception_increment (TAO::FOE_COMM_FAILURE)) || + (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0 && + this->retry_state_->forward_on_exception_increment (TAO::FOE_OBJECT_NOT_EXIST)) || + (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0 && + this->retry_state_->forward_on_exception_increment (TAO::FOE_INV_OBJREF)) + ) + { + retry_on_exception = true; + this->retry_state_->sleep_at_starting_profile (*this->stub ()); + } + } + else + { + int foe_kind = orb_params->forward_once_exception(); + + retry_on_exception = + (CORBA::CompletionStatus) completion != CORBA::COMPLETED_YES + && (((foe_kind & TAO::FOE_TRANSIENT) == 0 + && ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0) || + ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0 || + ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/NO_RESPONSE:1.0") == 0 || + ((foe_kind & TAO::FOE_COMM_FAILURE) == 0 + && ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0) || + (orb_params->forward_invocation_on_object_not_exist () + && ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0) || + (do_forward = ! this->stub ()->forwarded_on_exception () + && ((((foe_kind & TAO::FOE_OBJECT_NOT_EXIST) == TAO::FOE_OBJECT_NOT_EXIST) + && (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0)) || + (((foe_kind & TAO::FOE_COMM_FAILURE) == TAO::FOE_COMM_FAILURE) + && (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)) || + (((foe_kind & TAO::FOE_TRANSIENT) == TAO::FOE_TRANSIENT) + && (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)) || + (((foe_kind & TAO::FOE_INV_OBJREF) == TAO::FOE_INV_OBJREF) + && (ACE_OS_String::strcmp (type_id.in (), + "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0))))); + } + + if (retry_on_exception) { // If we are here then possibly we'll need a restart. mon.set_status (TAO_INVOKE_RESTART); + if (TAO_debug_level > 4) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("handle_system_exception, profile forwarding ") + ACE_TEXT ("on exception "), + type_id.in (), + ACE_TEXT ("\n"))); + if (do_forward) this->stub ()->forwarded_on_exception (true); @@ -625,8 +707,8 @@ namespace TAO if (TAO_debug_level > 4) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Twoway_Invocation::" - "handle_system_exception, about to raise\n")); + ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::") + ACE_TEXT ("handle_system_exception, about to raise\n"))); mon.set_status (TAO_INVOKE_SYSTEM_EXCEPTION); @@ -676,80 +758,98 @@ namespace TAO if (!transport) { - // Way back, we failed to find a profile we could connect to. - // We've come this far only so we reach the interception points - // in case they can fix things. Time to bail.... - throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); - } + if (this->retry_state_ && + this->retry_state_->forward_on_exception_limit_used ()) + { + if (this->retry_state_->forward_on_exception_increment(FOE_TRANSIENT)) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::") + ACE_TEXT ("remote_oneway retrying on TRANSIENT ") + ACE_TEXT ("exception\n"))); + this->retry_state_->next_profile_retry (*this->stub ()); + return TAO_INVOKE_RESTART; + } + } + else + { + // Way back, we failed to find a profile we could connect to. + // We've come this far only so we reach the interception points + // in case they can fix things. Time to bail.... + throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); + } - { - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, transport->output_cdr_lock (), - TAO_INVOKE_FAILURE); + } - TAO_OutputCDR &cdr = transport->out_stream (); + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, transport->output_cdr_lock (), + TAO_INVOKE_FAILURE); - cdr.message_attributes (this->details_.request_id (), - this->resolver_.stub (), - TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST), - max_wait_time); + TAO_OutputCDR &cdr = transport->out_stream (); - this->write_header (cdr); + cdr.message_attributes (this->details_.request_id (), + this->resolver_.stub (), + TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST), + max_wait_time); - this->marshal_data (cdr); + this->write_header (cdr); - countdown.update (); + this->marshal_data (cdr); - if (transport->is_connected ()) - { - // We have a connected transport so we can send the message - s = this->send_message (cdr, - TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST), - max_wait_time); + countdown.update (); - if (transport->wait_strategy ()->non_blocking () == 0 && - transport->orb_core ()->client_factory ()->use_cleanup_options ()) - { - if (!transport->wait_strategy ()->is_registered()) - { - ACE_Event_Handler * const eh = - transport->event_handler_i (); - - ACE_Reactor * const r = - transport->orb_core ()->reactor (); - - if (r->register_handler (eh, ACE_Event_Handler::READ_MASK) == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Synch_Oneway_Invocation::" - "remote_oneway transport[%d] registration with" - "reactor returned an error\n", - transport->id ())); - } - else - { - // Only set this flag when registration succeeds - transport->wait_strategy ()->is_registered(true); - } - } - } + if (transport->is_connected ()) + { + // We have a connected transport so we can send the message + s = this->send_message (cdr, + TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST), + max_wait_time); - } - else - { - if (TAO_debug_level > 4) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Oneway_Invocation::" - "remote_oneway, queueing message\n")); + if (transport->wait_strategy ()->non_blocking () == 0 && + transport->orb_core ()->client_factory ()->use_cleanup_options ()) + { + if (!transport->wait_strategy ()->is_registered()) + { + ACE_Event_Handler * const eh = + transport->event_handler_i (); + + ACE_Reactor * const r = + transport->orb_core ()->reactor (); + + if (r->register_handler (eh, ACE_Event_Handler::READ_MASK) == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::") + ACE_TEXT ("remote_oneway transport[%d] registration with") + ACE_TEXT ("reactor returned an error\n"), + transport->id ())); + } + else + { + // Only set this flag when registration succeeds + transport->wait_strategy ()->is_registered (true); + } + } + } - if (transport->format_queue_message (cdr, - max_wait_time, - this->resolver_.stub()) != 0) - { - s = TAO_INVOKE_FAILURE; - } - } - } + } + else + { + if (TAO_debug_level > 4) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::") + ACE_TEXT ("remote_oneway, queueing message\n"))); + + if (transport->format_queue_message (cdr, + max_wait_time, + this->resolver_.stub()) != 0) + { + s = TAO_INVOKE_FAILURE; + } + } + } #if TAO_HAS_INTERCEPTORS == 1 s = this->receive_other_interception (); @@ -763,7 +863,7 @@ namespace TAO status == PortableInterceptor::TRANSPORT_RETRY) s = TAO_INVOKE_RESTART; else if (status == PortableInterceptor::SYSTEM_EXCEPTION - || status == PortableInterceptor::USER_EXCEPTION) + || status == PortableInterceptor::USER_EXCEPTION) throw; } catch (...) @@ -771,14 +871,14 @@ namespace TAO // Notify interceptors of non-CORBA exception, and propagate // that exception to the caller. - PortableInterceptor::ReplyStatus const st = - this->handle_all_exception (); + PortableInterceptor::ReplyStatus const st = + this->handle_all_exception (); - if (st == PortableInterceptor::LOCATION_FORWARD || - st == PortableInterceptor::TRANSPORT_RETRY) - s = TAO_INVOKE_RESTART; - else - throw; + if (st == PortableInterceptor::LOCATION_FORWARD || + st == PortableInterceptor::TRANSPORT_RETRY) + s = TAO_INVOKE_RESTART; + else + throw; } #endif /* TAO_HAS_INTERCEPTORS */ |