diff options
Diffstat (limited to 'TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp | 116 |
1 files changed, 55 insertions, 61 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp index 1eb1998732a..75a14cdb1a7 100644 --- a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp @@ -150,12 +150,11 @@ register_application (const char *object_id, const char *host_id, const char *process_id, CORBA::Short role, - CORBA::Object_ptr server_reference - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) + CORBA::Object_ptr server_reference) { ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_); ACE_Time_Value wait_time (5); + while (update_list_.size() >= UPDATE_LIST_MAX_SIZE) { if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout @@ -165,6 +164,7 @@ register_application (const char *object_id, return; } } + update_list_.insert_tail ( MonitorUpdate::create_app_info_update(object_id, load, host_id, process_id, Role(role), server_reference)); @@ -365,6 +365,7 @@ elevate_backup_to_primary (ACE_CString const & tag) { RANKED_IOR_LIST ranked_ior_list; ACE_CString failover_host; + if(this->objectid_rankedior_map_.find(tag,ranked_ior_list) == 0) { failover_host = ranked_ior_list.host_list.front(); @@ -376,6 +377,7 @@ elevate_backup_to_primary (ACE_CString const & tag) } APP_SET app_set; ACE_Guard <ACE_Recursive_Thread_Mutex> guard (this->appset_lock_); + if (this->objectid_appset_map_.find (tag,app_set) == 0) { for(APP_SET::iterator as_iter = app_set.begin(); @@ -384,11 +386,13 @@ elevate_backup_to_primary (ACE_CString const & tag) if ((*as_iter).host_name == failover_host.c_str()) { STRING_LIST backup_tag_list; + if (this->processid_backup_map_.find((*as_iter).process_id, backup_tag_list) == 0) { backup_tag_list.remove(tag); this->processid_backup_map_.rebind((*as_iter).process_id, backup_tag_list); STRING_LIST primary_tag_list; + if(this->processid_primary_map_.find( (*as_iter).process_id, primary_tag_list) == 0) { @@ -407,6 +411,7 @@ elevate_backup_to_primary (ACE_CString const & tag) "Data structure invariant broken\n", tag.c_str(),(*as_iter).process_id.c_str())); } + (*as_iter).role = PRIMARY; this->objectid_appset_map_.rebind(tag, app_set); break; @@ -439,6 +444,7 @@ replica_selection_algo () { ACE_DLList <MonitorUpdate> update_list; move_update_list(this->update_list_, update_list); + if (process_updates(update_list) == true) { objectid_rankedior_map_.close(); @@ -450,6 +456,7 @@ replica_selection_algo () STRING_TO_DOUBLE_MAP processor_level_host_util_map;; copy_map (this->hostid_util_map_, processor_level_host_util_map); STRING_LIST & process_list = (*hp_iter).item(); + for (STRING_LIST::iterator pl_iter = process_list.begin (); pl_iter != process_list.end (); ++pl_iter) { @@ -459,12 +466,14 @@ replica_selection_algo () (this->mode_ == PROCESS_LEVEL)? process_level_host_util_map : processor_level_host_util_map; STRING_LIST primary_object_list; + if (processid_primary_map_.find (*pl_iter, primary_object_list) == 0) // If present { for (STRING_LIST::iterator po_iter = primary_object_list.begin (); po_iter != primary_object_list.end (); ++po_iter) { STRING_LIST host_list = non_primary_host_list(*po_iter); + if (host_list.size() >= 1) // if the object has backups. { std::priority_queue <UtilRank> util_ranked_queue = @@ -478,6 +487,7 @@ replica_selection_algo () } } } + build_rank_list (); } send_rank_list(); @@ -676,7 +686,7 @@ send_rank_list() for (AGENT_LIST::iterator al_iter = agent_list_.begin(); al_iter != agent_list_.end(); ++ al_iter) { - Agent_var agent = Agent::_narrow (*al_iter ACE_ENV_ARG_PARAMETER); + Agent_var agent = Agent::_narrow (*al_iter); try { agent->update_rank_list(this->rank_list_); @@ -733,12 +743,11 @@ void ReplicationManager_i::update_ior_map ( } void ReplicationManager_i:: -proc_failure (const char *process_id - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +proc_failure (const char *process_id) { ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_); ACE_Time_Value wait_time (5); + while (update_list_.size() >= UPDATE_LIST_MAX_SIZE) { if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout @@ -754,13 +763,12 @@ proc_failure (const char *process_id void ReplicationManager_i:: util_update (const char *host_id, - double util - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) + double util) { //ACE_DEBUG ((LM_DEBUG, "Update from %s with UTIL %d\n", host_id, (int)util)); ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_); ACE_Time_Value wait_time (5); + while (update_list_.size() >= UPDATE_LIST_MAX_SIZE) { if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout @@ -796,14 +804,10 @@ ReplicationManager_i::pulse (void) } RankList * ReplicationManager_i:: -register_agent (CORBA::Object_ptr agent_reference - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +register_agent (CORBA::Object_ptr agent_reference) { ACE_DEBUG ((LM_DEBUG, "register_agent called\n")); - Agent_var agent = Agent::_narrow (agent_reference - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + Agent_var agent = Agent::_narrow (agent_reference); ACE_Guard <ACE_Thread_Mutex> guard (rank_list_agent_list_combined_mutex_); this->agent_list_.insert_tail (CORBA::Object::_duplicate (agent.in ())); @@ -811,15 +815,14 @@ register_agent (CORBA::Object_ptr agent_reference } CORBA::Object_ptr ReplicationManager_i:: -get_next (const char * object_id - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +get_next (const char * object_id) { /* if (proactive_) ACE_DEBUG((LM_DEBUG,"Replication Manager is in proactive mode.\n")); ACE_Guard <ACE_Recursive_Thread_Mutex> guard (this->appset_lock_); APP_SET app_set; + if (this->objectid_appset_map_.find (object_id,app_set) == 0) { for (APP_SET::iterator as_iter = app_set.begin(); @@ -937,23 +940,17 @@ ReplicationManager_i::~ReplicationManager_i (void) { } -CORBA::Long ReplicationManager_i::next_member (const char * object_id - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +CORBA::Long ReplicationManager_i::next_member (const char * object_id) { ACE_DEBUG ((LM_DEBUG, "register_agent called\n")); return 0; } void -ReplicationManager_i::register_agent (CORBA::Object_ptr agent_reference - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +ReplicationManager_i::register_agent (CORBA::Object_ptr agent_reference) { ACE_DEBUG ((LM_DEBUG, "register_agent called\n")); - Agent_var agent = Agent::_narrow (agent_reference - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + Agent_var agent = Agent::_narrow (agent_reference); this->agent_list_.insert_tail (CORBA::Object::_duplicate (agent.in ())); agent->initialize_agent (this->replica_list_, this->replicas_list_); @@ -1171,9 +1168,7 @@ ReplicationManager_i::register_application (const char *object_id, const char *host_id, const char *process_id, CORBA::Short role, - CORBA::Object_ptr server_reference - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) + CORBA::Object_ptr server_reference) { ACE_DEBUG ((LM_DEBUG, "register application called process_id = %s, \ object_id = %s, role = %d\n", process_id, object_id, role)); @@ -1227,12 +1222,11 @@ ReplicationManager_i::update_host_list (const char *object_id, } void -ReplicationManager_i::util_update (const char *host_id, double util - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +ReplicationManager_i::util_update (const char *host_id, double util) { //ACE_DEBUG ((LM_DEBUG, "Update from %s with UTIL %l\n", host_id, util)); double current_util; + if (this->host_util_map_.find (host_id, current_util) == 0) { this->host_util_map_.rebind (host_id, util); @@ -1244,9 +1238,7 @@ ReplicationManager_i::util_update (const char *host_id, double util } void -ReplicationManager_i::proc_failure (const char *process_id - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((::CORBA::SystemException)) +ReplicationManager_i::proc_failure (const char *process_id) { ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); @@ -1257,36 +1249,38 @@ ReplicationManager_i::proc_failure (const char *process_id ACE_CString process_name = CORBA::string_dup (process_id); FAILOVER_LIST failover_list; + if (this->failover_map_.find (process_name, failover_list) == 0) { - ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); + ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); - FailoverList fail_list; - fail_list.length (0); - CORBA::ULong fail_list_length; + FailoverList fail_list; + fail_list.length (0); + CORBA::ULong fail_list_length; - for (FAILOVER_LIST::iterator iter = failover_list.begin (); - iter != failover_list.end (); ++iter) - { - ACE_CString object_id = *iter; - char *object_name = CORBA::string_dup (object_id.c_str ()); - fail_list_length = fail_list.length (); - fail_list.length (fail_list_length + 1); - fail_list[fail_list_length] = object_name; - } - ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); + for (FAILOVER_LIST::iterator iter = failover_list.begin (); + iter != failover_list.end (); ++iter) + { + ACE_CString object_id = *iter; + char *object_name = CORBA::string_dup (object_id.c_str ()); + fail_list_length = fail_list.length (); + fail_list.length (fail_list_length + 1); + fail_list[fail_list_length] = object_name; + } + + ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); - for (AGENT_LIST::iterator it = this->agent_list_.begin (); - it != this->agent_list_.end (); ++it) - { - CORBA::Object_var agent_ref = *it; - Agent_var agent = Agent::_narrow (agent_ref - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - agent->update_failover_list (fail_list); + for (AGENT_LIST::iterator it = this->agent_list_.begin (); + it != this->agent_list_.end (); ++it) + { + CORBA::Object_var agent_ref = *it; + Agent_var agent = Agent::_narrow (agent_ref); + agent->update_failover_list (fail_list); + } + + ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); } - ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id)); - } + ACE_DEBUG ((LM_DEBUG, "END Update from Monitor that %s failed\n", process_id)); } |