summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp')
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp116
1 files changed, 55 insertions, 61 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp
index 1eb1998732a..75a14cdb1a7 100644
--- a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ReplicationManager.cpp
@@ -150,12 +150,11 @@ register_application (const char *object_id,
const char *host_id,
const char *process_id,
CORBA::Short role,
- CORBA::Object_ptr server_reference
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ CORBA::Object_ptr server_reference)
{
ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_);
ACE_Time_Value wait_time (5);
+
while (update_list_.size() >= UPDATE_LIST_MAX_SIZE)
{
if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout
@@ -165,6 +164,7 @@ register_application (const char *object_id,
return;
}
}
+
update_list_.insert_tail (
MonitorUpdate::create_app_info_update(object_id, load, host_id,
process_id, Role(role), server_reference));
@@ -365,6 +365,7 @@ elevate_backup_to_primary (ACE_CString const & tag)
{
RANKED_IOR_LIST ranked_ior_list;
ACE_CString failover_host;
+
if(this->objectid_rankedior_map_.find(tag,ranked_ior_list) == 0)
{
failover_host = ranked_ior_list.host_list.front();
@@ -376,6 +377,7 @@ elevate_backup_to_primary (ACE_CString const & tag)
}
APP_SET app_set;
ACE_Guard <ACE_Recursive_Thread_Mutex> guard (this->appset_lock_);
+
if (this->objectid_appset_map_.find (tag,app_set) == 0)
{
for(APP_SET::iterator as_iter = app_set.begin();
@@ -384,11 +386,13 @@ elevate_backup_to_primary (ACE_CString const & tag)
if ((*as_iter).host_name == failover_host.c_str())
{
STRING_LIST backup_tag_list;
+
if (this->processid_backup_map_.find((*as_iter).process_id, backup_tag_list) == 0)
{
backup_tag_list.remove(tag);
this->processid_backup_map_.rebind((*as_iter).process_id, backup_tag_list);
STRING_LIST primary_tag_list;
+
if(this->processid_primary_map_.find(
(*as_iter).process_id, primary_tag_list) == 0)
{
@@ -407,6 +411,7 @@ elevate_backup_to_primary (ACE_CString const & tag)
"Data structure invariant broken\n",
tag.c_str(),(*as_iter).process_id.c_str()));
}
+
(*as_iter).role = PRIMARY;
this->objectid_appset_map_.rebind(tag, app_set);
break;
@@ -439,6 +444,7 @@ replica_selection_algo ()
{
ACE_DLList <MonitorUpdate> update_list;
move_update_list(this->update_list_, update_list);
+
if (process_updates(update_list) == true)
{
objectid_rankedior_map_.close();
@@ -450,6 +456,7 @@ replica_selection_algo ()
STRING_TO_DOUBLE_MAP processor_level_host_util_map;;
copy_map (this->hostid_util_map_, processor_level_host_util_map);
STRING_LIST & process_list = (*hp_iter).item();
+
for (STRING_LIST::iterator pl_iter = process_list.begin ();
pl_iter != process_list.end (); ++pl_iter)
{
@@ -459,12 +466,14 @@ replica_selection_algo ()
(this->mode_ == PROCESS_LEVEL)? process_level_host_util_map :
processor_level_host_util_map;
STRING_LIST primary_object_list;
+
if (processid_primary_map_.find (*pl_iter, primary_object_list) == 0) // If present
{
for (STRING_LIST::iterator po_iter = primary_object_list.begin ();
po_iter != primary_object_list.end (); ++po_iter)
{
STRING_LIST host_list = non_primary_host_list(*po_iter);
+
if (host_list.size() >= 1) // if the object has backups.
{
std::priority_queue <UtilRank> util_ranked_queue =
@@ -478,6 +487,7 @@ replica_selection_algo ()
}
}
}
+
build_rank_list ();
}
send_rank_list();
@@ -676,7 +686,7 @@ send_rank_list()
for (AGENT_LIST::iterator al_iter = agent_list_.begin();
al_iter != agent_list_.end(); ++ al_iter)
{
- Agent_var agent = Agent::_narrow (*al_iter ACE_ENV_ARG_PARAMETER);
+ Agent_var agent = Agent::_narrow (*al_iter);
try
{
agent->update_rank_list(this->rank_list_);
@@ -733,12 +743,11 @@ void ReplicationManager_i::update_ior_map (
}
void ReplicationManager_i::
-proc_failure (const char *process_id
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+proc_failure (const char *process_id)
{
ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_);
ACE_Time_Value wait_time (5);
+
while (update_list_.size() >= UPDATE_LIST_MAX_SIZE)
{
if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout
@@ -754,13 +763,12 @@ proc_failure (const char *process_id
void ReplicationManager_i::
util_update (const char *host_id,
- double util
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ double util)
{
//ACE_DEBUG ((LM_DEBUG, "Update from %s with UTIL %d\n", host_id, (int)util));
ACE_Guard <ACE_Thread_Mutex> guard(update_mutex_);
ACE_Time_Value wait_time (5);
+
while (update_list_.size() >= UPDATE_LIST_MAX_SIZE)
{
if(update_list_full_.wait(update_mutex_, &wait_time) == -1) // timeout
@@ -796,14 +804,10 @@ ReplicationManager_i::pulse (void)
}
RankList * ReplicationManager_i::
-register_agent (CORBA::Object_ptr agent_reference
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+register_agent (CORBA::Object_ptr agent_reference)
{
ACE_DEBUG ((LM_DEBUG, "register_agent called\n"));
- Agent_var agent = Agent::_narrow (agent_reference
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ Agent_var agent = Agent::_narrow (agent_reference);
ACE_Guard <ACE_Thread_Mutex> guard (rank_list_agent_list_combined_mutex_);
this->agent_list_.insert_tail (CORBA::Object::_duplicate (agent.in ()));
@@ -811,15 +815,14 @@ register_agent (CORBA::Object_ptr agent_reference
}
CORBA::Object_ptr ReplicationManager_i::
-get_next (const char * object_id
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+get_next (const char * object_id)
{
/* if (proactive_)
ACE_DEBUG((LM_DEBUG,"Replication Manager is in proactive mode.\n"));
ACE_Guard <ACE_Recursive_Thread_Mutex> guard (this->appset_lock_);
APP_SET app_set;
+
if (this->objectid_appset_map_.find (object_id,app_set) == 0)
{
for (APP_SET::iterator as_iter = app_set.begin();
@@ -937,23 +940,17 @@ ReplicationManager_i::~ReplicationManager_i (void)
{
}
-CORBA::Long ReplicationManager_i::next_member (const char * object_id
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+CORBA::Long ReplicationManager_i::next_member (const char * object_id)
{
ACE_DEBUG ((LM_DEBUG, "register_agent called\n"));
return 0;
}
void
-ReplicationManager_i::register_agent (CORBA::Object_ptr agent_reference
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ReplicationManager_i::register_agent (CORBA::Object_ptr agent_reference)
{
ACE_DEBUG ((LM_DEBUG, "register_agent called\n"));
- Agent_var agent = Agent::_narrow (agent_reference
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ Agent_var agent = Agent::_narrow (agent_reference);
this->agent_list_.insert_tail (CORBA::Object::_duplicate (agent.in ()));
agent->initialize_agent (this->replica_list_, this->replicas_list_);
@@ -1171,9 +1168,7 @@ ReplicationManager_i::register_application (const char *object_id,
const char *host_id,
const char *process_id,
CORBA::Short role,
- CORBA::Object_ptr server_reference
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ CORBA::Object_ptr server_reference)
{
ACE_DEBUG ((LM_DEBUG, "register application called process_id = %s, \
object_id = %s, role = %d\n", process_id, object_id, role));
@@ -1227,12 +1222,11 @@ ReplicationManager_i::update_host_list (const char *object_id,
}
void
-ReplicationManager_i::util_update (const char *host_id, double util
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ReplicationManager_i::util_update (const char *host_id, double util)
{
//ACE_DEBUG ((LM_DEBUG, "Update from %s with UTIL %l\n", host_id, util));
double current_util;
+
if (this->host_util_map_.find (host_id, current_util) == 0)
{
this->host_util_map_.rebind (host_id, util);
@@ -1244,9 +1238,7 @@ ReplicationManager_i::util_update (const char *host_id, double util
}
void
-ReplicationManager_i::proc_failure (const char *process_id
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((::CORBA::SystemException))
+ReplicationManager_i::proc_failure (const char *process_id)
{
ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
@@ -1257,36 +1249,38 @@ ReplicationManager_i::proc_failure (const char *process_id
ACE_CString process_name = CORBA::string_dup (process_id);
FAILOVER_LIST failover_list;
+
if (this->failover_map_.find (process_name, failover_list) == 0)
{
- ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
+ ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
- FailoverList fail_list;
- fail_list.length (0);
- CORBA::ULong fail_list_length;
+ FailoverList fail_list;
+ fail_list.length (0);
+ CORBA::ULong fail_list_length;
- for (FAILOVER_LIST::iterator iter = failover_list.begin ();
- iter != failover_list.end (); ++iter)
- {
- ACE_CString object_id = *iter;
- char *object_name = CORBA::string_dup (object_id.c_str ());
- fail_list_length = fail_list.length ();
- fail_list.length (fail_list_length + 1);
- fail_list[fail_list_length] = object_name;
- }
- ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
+ for (FAILOVER_LIST::iterator iter = failover_list.begin ();
+ iter != failover_list.end (); ++iter)
+ {
+ ACE_CString object_id = *iter;
+ char *object_name = CORBA::string_dup (object_id.c_str ());
+ fail_list_length = fail_list.length ();
+ fail_list.length (fail_list_length + 1);
+ fail_list[fail_list_length] = object_name;
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
- for (AGENT_LIST::iterator it = this->agent_list_.begin ();
- it != this->agent_list_.end (); ++it)
- {
- CORBA::Object_var agent_ref = *it;
- Agent_var agent = Agent::_narrow (agent_ref
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- agent->update_failover_list (fail_list);
+ for (AGENT_LIST::iterator it = this->agent_list_.begin ();
+ it != this->agent_list_.end (); ++it)
+ {
+ CORBA::Object_var agent_ref = *it;
+ Agent_var agent = Agent::_narrow (agent_ref);
+ agent->update_failover_list (fail_list);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
}
- ACE_DEBUG ((LM_DEBUG, "Update from Monitor that %s failed\n", process_id));
- }
+
ACE_DEBUG ((LM_DEBUG, "END Update from Monitor that %s failed\n", process_id));
}