summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-03-01 05:53:34 +0000
committerwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-03-01 05:53:34 +0000
commite4135e0a9a69939657fe4372b5567898e53cb65c (patch)
tree02c4bc77921f5e59934ad831a12888fcdf1706c6
parent07a620e0da6999fd9e96159035fb720eeb7bbffe (diff)
downloadATCD-e4135e0a9a69939657fe4372b5567898e53cb65c.tar.gz
ChangeLogTag: Sun Mar 1 05:53:24 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
-rw-r--r--TAO/ChangeLog8
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp295
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h22
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl14
4 files changed, 250 insertions, 89 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 80a705521cd..c67b097d341 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,11 @@
+Sun Mar 1 05:53:24 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
+
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.h
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp:
+
+ Exchanged replica_selection_algorithm.
+
Sat Feb 28 22:21:06 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
* orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
index 420d2efd64b..ccca7908e88 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
@@ -160,7 +160,9 @@ ReplicationManager_i::ReplicationManager_i (CORBA::ORB_ptr orb,
static_mode_ (static_mode),
update_available_(update_mutex_),
update_list_full_(update_mutex_),
- subscription_counter_ (1)
+ ranklist_constraints_ (10),
+ subscription_counter_ (1),
+ notify_subscriptions_ (1)
{
algo_thread_ = new Algorithm (this);
algo_thread_->activate ();
@@ -825,98 +827,185 @@ ReplicationManager_i::replace_backup_tags (
}
}
-bool
-ReplicationManager_i::replica_selection_algo (void)
+void
+ReplicationManager_i::load_based_selection_algo (void)
{
- ACE_DLList <MonitorUpdate> update_list;
- move_update_list(this->update_list_, update_list);
-
- if (process_updates (update_list))
- {
- objectid_rankedior_map_.close ();
- objectid_rankedior_map_.open ();
-
- if (!static_mode_)
+ for (STRING_TO_STRING_LIST_MAP::iterator hp_iter =
+ hostid_process_map_.begin ();
+ hp_iter != hostid_process_map_.end ();
+ ++hp_iter)
+ {
+ STRING_TO_DOUBLE_MAP processor_level_host_util_map;
+
+ copy_map (this->hostid_util_map_,
+ processor_level_host_util_map);
+
+ STRING_LIST & process_list = (*hp_iter).item ();
+
+ for (STRING_LIST::iterator pl_iter =
+ process_list.begin ();
+ pl_iter != process_list.end ();
+ ++pl_iter)
{
- for (STRING_TO_STRING_LIST_MAP::iterator hp_iter =
- hostid_process_map_.begin ();
- hp_iter != hostid_process_map_.end ();
- ++hp_iter)
- {
- STRING_TO_DOUBLE_MAP processor_level_host_util_map;
-
- copy_map (this->hostid_util_map_,
- processor_level_host_util_map);
-
- STRING_LIST & process_list = (*hp_iter).item ();
-
- for (STRING_LIST::iterator pl_iter =
- process_list.begin ();
- pl_iter != process_list.end ();
- ++pl_iter)
- {
- STRING_TO_DOUBLE_MAP process_level_host_util_map;
-
- copy_map (this->hostid_util_map_,
- process_level_host_util_map);
+ STRING_TO_DOUBLE_MAP process_level_host_util_map;
+
+ copy_map (this->hostid_util_map_,
+ process_level_host_util_map);
- STRING_TO_DOUBLE_MAP & host_util_map =
- (this->mode_ == PROCESS_LEVEL)
- ? process_level_host_util_map
- : processor_level_host_util_map;
-
- STRING_LIST primary_object_list;
+ STRING_TO_DOUBLE_MAP & host_util_map =
+ (this->mode_ == PROCESS_LEVEL)
+ ? process_level_host_util_map
+ : processor_level_host_util_map;
+
+ STRING_LIST primary_object_list;
+
+ if (processid_primary_map_.find (*pl_iter,
+ primary_object_list) == 0) // If present
+ {
+ for (STRING_LIST::iterator po_iter =
+ primary_object_list.begin ();
+ po_iter != primary_object_list.end ();
+ ++po_iter)
+ {
+ STRING_LIST host_list =
+ non_primary_host_list (*po_iter);
- if (processid_primary_map_.find (*pl_iter,
- primary_object_list) == 0) // If present
+ // If the object has backups...
+ if (host_list.size () >= 1)
{
- for (STRING_LIST::iterator po_iter =
- primary_object_list.begin ();
- po_iter != primary_object_list.end ();
- ++po_iter)
+ std::priority_queue<UtilRank> util_ranked_queue =
+ util_sorted_host_list(*po_iter, host_list, host_util_map);
+
+ // this check is necessary to make sure
+ // there is utilization information for
+ // this host available. When a
+ // ReplicationManager starts up for
+ // example the host_monitor on that host
+ // is not yet running.
+ if (util_ranked_queue.size () > 0)
{
- STRING_LIST host_list =
- non_primary_host_list (*po_iter);
+ UtilRank ur (util_ranked_queue.top ());
- // If the object has backups...
- if (host_list.size () >= 1)
- {
- std::priority_queue<UtilRank> util_ranked_queue =
- util_sorted_host_list(*po_iter, host_list, host_util_map);
-
- // this check is necessary to make sure
- // there is utilization information for
- // this host available. When a
- // ReplicationManager starts up for
- // example the host_monitor on that host
- // is not yet running.
- if (util_ranked_queue.size () > 0)
- {
- UtilRank ur (util_ranked_queue.top ());
-
- host_util_map.rebind (ur.host_id.c_str (),
- ur.util);
-
- update_ior_map (*po_iter,
- util_ranked_queue);
- }
- }
+ host_util_map.rebind (ur.host_id.c_str (),
+ ur.util);
+
+ update_ior_map (*po_iter,
+ util_ranked_queue);
+ }
+ } //end if (host_list.size () >= 1)
+ } // end for primary_object_list
+ } // end if processid_primary_map_.find ()
+ } // end for process_list
+ } // end for hostid_process_map
+}
+
+void
+ReplicationManager_i::static_selection_algo (void)
+{
+ // enter entries from objectid_appset_map_ into
+ // objectid_rankedior_map_ according to the sorted list given by an
+ // external source.
+
+ ACE_DEBUG ((LM_INFO, "RM: static ranklist order:\n"));
+
+ APP_SET tmp_apps;
+ for (OBJECTID_APPSET_MAP::iterator appset_it = objectid_appset_map_.begin ();
+ appset_it != objectid_appset_map_.begin ();
+ ++appset_it)
+ {
+ ACE_DEBUG ((LM_INFO, "[\toid %s:", appset_it->key ().c_str ()));
+
+ RANKED_IOR_LIST iorlist;
+ iorlist.now = false;
+
+ tmp_apps = appset_it->item ();
+
+ RANKLIST_CONSTRAINT constr;
+ // add all entries mentioned in the constraints in sorted order
+ if (ranklist_constraints_.find (appset_it->key (),
+ constr) == 0)
+ {
+ for (RANKLIST_CONSTRAINT::iterator const_it = constr.begin ();
+ const_it != constr.end ();
+ ++const_it)
+ {
+ // this is inefficient, but we'll go through the set and find
+ // the appropriate entry
+ for (APP_SET::iterator as_it = tmp_apps.begin ();
+ as_it != tmp_apps.end ();
+ ++as_it)
+ {
+ // if we found the right guy
+ if (const_it->compare ((*as_it).host_name) == 0)
+ {
+ if ((*as_it).role != PRIMARY)
+ {
+ ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*as_it).host_name.c_str ()));
+
+ // add to ranklist
+ iorlist.host_list.push_back ((*as_it).host_name);
+ iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*as_it).ior.in ()));
}
+
+ // remove from set
+ tmp_apps.remove (*as_it);
+
+ break;
}
- }
+ }
+ }
+ } // end if find in ranklist_constraints_
+
+ // add remaing application entries
+ for (APP_SET::iterator app_it = tmp_apps.begin ();
+ app_it != tmp_apps.end ();
+ ++app_it)
+ {
+ if ((*app_it).role != PRIMARY)
+ {
+ ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*app_it).host_name.c_str ()));
+
+ iorlist.host_list.push_back ((*app_it).host_name);
+ iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*app_it).ior.in ()));
}
+ } // end for tmp_apps
+ objectid_rankedior_map_.bind (appset_it->key (),
+ iorlist);
+
+ ACE_DEBUG ((LM_INFO, "\n"));
+ } // end for objectid_appset_map_
+}
+
+bool
+ReplicationManager_i::replica_selection_algo (void)
+{
+ ACE_DLList <MonitorUpdate> update_list;
+ move_update_list(this->update_list_, update_list);
+
+ if (!static_mode_)
+ {
+ if (process_updates (update_list))
+ {
+ objectid_rankedior_map_.close ();
+ objectid_rankedior_map_.open ();
+
+#ifdef ORIGINAL_SELECTION_ALGORITHM
+ this->load_based_selection_algo ();
+#else
+ this->static_selection_algo ();
+#endif
this->build_rank_list ();
this->update_enhanced_ranklist ();
- }
- }
-
+ } // end if process_updates
+ } // end if static_mode
+
if (!standby_)
{
send_rank_list ();
send_state_synchronization_rank_list ();
}
-
+
return true;
}
@@ -1259,19 +1348,19 @@ ReplicationManager_i::update_enhanced_ranklist (void)
// find the primary
for (APP_SET::iterator s_it = as.begin ();
s_it != as.end ();
- ++s_it)
- {
- APP_INFO & ai = *s_it;
-
- // by looking at its role member
- if (ai.role == PRIMARY)
- {
- primaries.bind ((*it).key (), ai);
-
- // if the primary has been found we can abort this loop
- break;
- }
- }
+ ++s_it)
+ {
+ APP_INFO & ai = *s_it;
+
+ // by looking at its role member
+ if (ai.role == PRIMARY)
+ {
+ primaries.bind ((*it).key (), ai);
+
+ // if the primary has been found we can abort this loop
+ break;
+ }
+ }
}
// to determine the size of the enhanced rank_list we have to
@@ -1850,6 +1939,36 @@ ReplicationManager_i::send_failure_notice (const char * object_id,
}
}
+void
+ReplicationManager_i::set_ranklist_constraints (
+ const RankListConstraints & constraints)
+{
+ ACE_Write_Guard <ACE_RW_Thread_Mutex> guard (constraint_lock_);
+
+ ACE_DEBUG ((LM_TRACE, "RM: received ranklist constraints:\n"));
+
+ RANKLIST_CONSTRAINT constraint;
+ for (CORBA::ULong i = 0; i < constraints.length (); ++i)
+ {
+ ACE_DEBUG ((LM_TRACE, "|\toid %s:", constraints[i].object_id.in ()));
+
+ constraint.clear ();
+ for (CORBA::ULong j = 0; j < constraints[i].hosts.length (); ++j)
+ {
+ ACE_DEBUG ((LM_TRACE,
+ "\n|\t\t%s",
+ constraints[i].hosts[j].in ()));
+
+ constraint.push_back (constraints[i].hosts[j].in ());
+ }
+
+ ranklist_constraints_.bind (constraints[i].object_id.in (),
+ constraint);
+
+ ACE_DEBUG ((LM_TRACE, "\n"));
+ }
+}
+
MonitorUpdate *
MonitorUpdate::create_proc_fail_update (const char * pid)
{
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
index c88ad1a834d..d3450873b3d 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
@@ -5,6 +5,7 @@
#define REPLICATION_MANAGER_H
#include <list>
+#include <string>
#include <queue>
#include "ace/Hash_Map_Manager_T.h"
@@ -179,6 +180,15 @@ public:
virtual void unregister_fault_notification (
FLARE::NotificationId id);
+ virtual void set_ranklist_constraints (
+ const RankListConstraints & constraints);
+
+ void
+ load_based_selection_algo (void);
+
+ void
+ static_selection_algo (void);
+
bool
replica_selection_algo (void);
@@ -249,6 +259,15 @@ public:
ACE_Hash<FLARE::NotificationId>,
ACE_Equal_To<FLARE::NotificationId>,
ACE_Null_Mutex> NOTIFICATION_MAP;
+
+ typedef std::list<ACE_CString> RANKLIST_CONSTRAINT;
+
+ typedef ACE_Hash_Map_Manager_Ex <
+ ACE_CString,
+ RANKLIST_CONSTRAINT,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> RANKLIST_CONSTRAINT_MAP;
enum
{
@@ -283,6 +302,9 @@ private:
STRING_TO_DOUBLE_MAP objectid_load_map_;
STRING_TO_STRING_MAP processid_host_map_;
+ RANKLIST_CONSTRAINT_MAP ranklist_constraints_;
+ ACE_RW_Thread_Mutex constraint_lock_;
+
STRING_TO_STRING_LIST_MAP processid_backup_map_;
STRING_TO_STRING_LIST_MAP processid_primary_map_;
STRING_TO_STRING_LIST_MAP hostid_process_map_;
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
index 5efb44faac6..8476ce80009 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
@@ -5,6 +5,16 @@
#include "StateSynchronizationAgent.idl"
#include "FaultNotification.idl"
+typedef sequence<string> HostList;
+
+struct RankListConstraint
+{
+ string object_id;
+ HostList hosts;
+};
+
+typedef sequence<RankListConstraint> RankListConstraints;
+
interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier
{
RankList register_agent (in Object agent_reference);
@@ -15,7 +25,9 @@ interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier
in string process_id,
in short role,
in Object server_reference);
-
+
+ void set_ranklist_constraints (in RankListConstraints constraints);
+
RankList register_state_synchronization_agent (
in string host_id,
in string process_id,