diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp | 167 |
1 files changed, 160 insertions, 7 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp index 4357599c3d9..9cc65a98a52 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp @@ -7,11 +7,12 @@ #include <string> #include <set> #include <ace/Date_Time.h> -#include "ace/OS_NS_unistd.h" +#include <ace/OS_NS_unistd.h> #include <ace/High_Res_Timer.h> #include "AppSideMonitor_Thread.h" #include "ForwardingAgentC.h" #include "AppInfoC.h" +#include "SSA_AMI_Handler.h" template <class T> void myswap (T & t1, T & t2) @@ -1797,9 +1798,157 @@ ReplicationManager_i::get_next (const char * /* object_id */) return CORBA::Object::_nil (); } -void ReplicationManager_i::finish_invocation(const char * object_id) +CORBA::Object_ptr ReplicationManager_i::prepare_reinvocation( + const char * object_id, + CORBA::Long last_successful, + CORBA::Long tsec, + CORBA::Long tusec) +{ + // Acquiring two lock like this could lead to a deadlock if + // some other thread is acquiring the same two locks in the + // reverse order. One solution is to put all these locks in an + // array and acquire them only in the increasing order of index. + ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_); + ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_); + + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "object_id = %s.\n",object_id)); + + // This phase is needed only in the lag-by-one approach to prepare + // the lagging object by discarding potential orphan state, which + // has really become orphan because of a failure. + size_t phase = 2; + //ami_phase(phase, object_id, tsec, tusec); + + APP_SET app_set; + if(objectid_appset_map_.find(object_id, app_set) == 0) + { + for(APP_SET::iterator app_set_iter = app_set.begin(); + app_set_iter != app_set.end(); + ++app_set_iter) + { + if((*app_set_iter).role == phase) + { + return (*app_set_iter).ior.in(); + } + } + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "Can't find failover replica for objectid = %s\n", object_id)); + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "Can't find APP_SET for objectid = %s\n", object_id)); + } + return 0; +} + +CORBA::Boolean ReplicationManager_i::finish_invocation( + const char * object_id, + ::CORBA::Long tsec, + ::CORBA::Long tusec) +{ + return two_phase_commit(object_id); + //return ami_two_phase_commit(object_id, tsec, tusec); +} + +bool ReplicationManager_i::ami_two_phase_commit(const char * object_id, long + tsec, long tusec) +{ + // Acquiring two lock like this could lead to a deadlock if + // some other thread is acquiring the same two locks in the + // reverse order. One solution is to put all these locks in an + // array and acquire them only in the increasing order of index. + ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_); + ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_); + + if(ami_phase(1, object_id, tsec, tusec)) + { + ami_phase(2, object_id, tsec, tusec); + return true; + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::ami_two_phase_commit(): " + "phase 1 failed. Skipping phase 2.\n")); + return false; + } +} + +bool ReplicationManager_i::ami_phase(int phase, + const char * /* object_id */, + long tsec, long tusec) +{ + SSA_AMI_Handler * ssa_ami_handler_ptr = new SSA_AMI_Handler; + AMI_StateSynchronizationAgentHandler_var ssa_ami_handler + = ssa_ami_handler_ptr->_this(); + + size_t ssa_invoked = 0; + for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin(); + iter != objectid_appset_map_.end(); + ++iter) + { + if ((*iter).ext_id_ == "ReplicationManager") + continue; + + APP_SET app_set((*iter).int_id_); + + for(APP_SET::iterator app_set_iter = app_set.begin(); + app_set_iter != app_set.end(); + ++app_set_iter) + { + if((*app_set_iter).role == phase) + { + StateSynchronizationAgent_var ssa; + + // locks acquired in two_phase_commit function.. + if(state_synchronization_agent_map_.find( + (*app_set_iter).process_id, ssa) == 0) + { + /* + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::ami_phase(): " + "Found SSA for objectid = %s, phase = %d\n", + (*iter).ext_id_.c_str(), phase)); + */ + if(phase == 1) + { + ssa->sendc_precommit_state( + ssa_ami_handler, + CORBA::string_dup((*iter).ext_id_.c_str())); + ++ssa_invoked; + } + if(phase == 2) + { + ssa->sendc_commit_state( + ssa_ami_handler, + CORBA::string_dup((*iter).ext_id_.c_str())); + ++ssa_invoked; + } + break; // no need to search further in the app_set. + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::phase(): " + "Can't find SSA for objectid = %s, phase = %d\n", + (*iter).ext_id_.c_str(), phase)); + + break; // Don't search further in the app_set. + } + } + } + } + ACE_Time_Value tv(tsec, tusec); + return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv); +} + +bool ReplicationManager_i::two_phase_commit(const char * object_id) { - // 2PC // Acquiring two lock like this could lead to a deadlock if // some other thread is acquiring the same two locks in the // reverse order. One solution is to put all these locks in an @@ -1808,12 +1957,16 @@ void ReplicationManager_i::finish_invocation(const char * object_id) ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_); if(phase(1, object_id)) + { phase(2, object_id); + return true; + } else { ACE_ERROR ((LM_ERROR, - "ReplicationManager_i::finish_invocation(): " + "ReplicationManager_i::two_phase_commit(): " "phase 1 failed. Skipping phase 2.\n")); + return false; } } @@ -1826,7 +1979,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) bool precommit_success = true; try { - // locks acquired in finish_invocation. + // locks acquired in two_phase_commit function. for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin(); (iter != objectid_appset_map_.end()) && precommit_success; ++iter) @@ -1844,7 +1997,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) { StateSynchronizationAgent_var ssa; - // locks acquired in finish_invocation. + // locks acquired in two_phase_commit function.. if(state_synchronization_agent_map_.find( (*app_set_iter).process_id, ssa) == 0) { @@ -1879,7 +2032,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) catch (::CORBA::Exception & ex) { ACE_ERROR ((LM_ERROR, - "ReplicationManager_i::finish_invocation(): " + "ReplicationManager_i::phase(): " "Exception raised while executing two-phase commit. phase = %d\n", phase)); precommit_success = false; |