From e4135e0a9a69939657fe4372b5567898e53cb65c Mon Sep 17 00:00:00 2001 From: wolff1 Date: Sun, 1 Mar 2009 05:53:34 +0000 Subject: ChangeLogTag: Sun Mar 1 05:53:24 UTC 2009 Friedhelm Wolf --- TAO/ChangeLog | 8 + TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp | 295 +++++++++++++++++------- TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h | 22 ++ TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl | 14 +- 4 files changed, 250 insertions(+), 89 deletions(-) diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 80a705521cd..c67b097d341 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,11 @@ +Sun Mar 1 05:53:24 UTC 2009 Friedhelm Wolf + + * orbsvcs/orbsvcs/LWFT/ReplicationManager.h + * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl + * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp: + + Exchanged replica_selection_algorithm. + Sat Feb 28 22:21:06 UTC 2009 Friedhelm Wolf * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp index 420d2efd64b..ccca7908e88 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp @@ -160,7 +160,9 @@ ReplicationManager_i::ReplicationManager_i (CORBA::ORB_ptr orb, static_mode_ (static_mode), update_available_(update_mutex_), update_list_full_(update_mutex_), - subscription_counter_ (1) + ranklist_constraints_ (10), + subscription_counter_ (1), + notify_subscriptions_ (1) { algo_thread_ = new Algorithm (this); algo_thread_->activate (); @@ -825,98 +827,185 @@ ReplicationManager_i::replace_backup_tags ( } } -bool -ReplicationManager_i::replica_selection_algo (void) +void +ReplicationManager_i::load_based_selection_algo (void) { - ACE_DLList update_list; - move_update_list(this->update_list_, update_list); - - if (process_updates (update_list)) - { - objectid_rankedior_map_.close (); - objectid_rankedior_map_.open (); - - if (!static_mode_) + for (STRING_TO_STRING_LIST_MAP::iterator hp_iter = + hostid_process_map_.begin (); + hp_iter != hostid_process_map_.end (); + ++hp_iter) + { + STRING_TO_DOUBLE_MAP processor_level_host_util_map; + + copy_map (this->hostid_util_map_, + processor_level_host_util_map); + + STRING_LIST & process_list = (*hp_iter).item (); + + for (STRING_LIST::iterator pl_iter = + process_list.begin (); + pl_iter != process_list.end (); + ++pl_iter) { - for (STRING_TO_STRING_LIST_MAP::iterator hp_iter = - hostid_process_map_.begin (); - hp_iter != hostid_process_map_.end (); - ++hp_iter) - { - STRING_TO_DOUBLE_MAP processor_level_host_util_map; - - copy_map (this->hostid_util_map_, - processor_level_host_util_map); - - STRING_LIST & process_list = (*hp_iter).item (); - - for (STRING_LIST::iterator pl_iter = - process_list.begin (); - pl_iter != process_list.end (); - ++pl_iter) - { - STRING_TO_DOUBLE_MAP process_level_host_util_map; - - copy_map (this->hostid_util_map_, - process_level_host_util_map); + STRING_TO_DOUBLE_MAP process_level_host_util_map; + + copy_map (this->hostid_util_map_, + process_level_host_util_map); - STRING_TO_DOUBLE_MAP & host_util_map = - (this->mode_ == PROCESS_LEVEL) - ? process_level_host_util_map - : processor_level_host_util_map; - - STRING_LIST primary_object_list; + STRING_TO_DOUBLE_MAP & host_util_map = + (this->mode_ == PROCESS_LEVEL) + ? process_level_host_util_map + : processor_level_host_util_map; + + STRING_LIST primary_object_list; + + if (processid_primary_map_.find (*pl_iter, + primary_object_list) == 0) // If present + { + for (STRING_LIST::iterator po_iter = + primary_object_list.begin (); + po_iter != primary_object_list.end (); + ++po_iter) + { + STRING_LIST host_list = + non_primary_host_list (*po_iter); - if (processid_primary_map_.find (*pl_iter, - primary_object_list) == 0) // If present + // If the object has backups... + if (host_list.size () >= 1) { - for (STRING_LIST::iterator po_iter = - primary_object_list.begin (); - po_iter != primary_object_list.end (); - ++po_iter) + std::priority_queue util_ranked_queue = + util_sorted_host_list(*po_iter, host_list, host_util_map); + + // this check is necessary to make sure + // there is utilization information for + // this host available. When a + // ReplicationManager starts up for + // example the host_monitor on that host + // is not yet running. + if (util_ranked_queue.size () > 0) { - STRING_LIST host_list = - non_primary_host_list (*po_iter); + UtilRank ur (util_ranked_queue.top ()); - // If the object has backups... - if (host_list.size () >= 1) - { - std::priority_queue util_ranked_queue = - util_sorted_host_list(*po_iter, host_list, host_util_map); - - // this check is necessary to make sure - // there is utilization information for - // this host available. When a - // ReplicationManager starts up for - // example the host_monitor on that host - // is not yet running. - if (util_ranked_queue.size () > 0) - { - UtilRank ur (util_ranked_queue.top ()); - - host_util_map.rebind (ur.host_id.c_str (), - ur.util); - - update_ior_map (*po_iter, - util_ranked_queue); - } - } + host_util_map.rebind (ur.host_id.c_str (), + ur.util); + + update_ior_map (*po_iter, + util_ranked_queue); + } + } //end if (host_list.size () >= 1) + } // end for primary_object_list + } // end if processid_primary_map_.find () + } // end for process_list + } // end for hostid_process_map +} + +void +ReplicationManager_i::static_selection_algo (void) +{ + // enter entries from objectid_appset_map_ into + // objectid_rankedior_map_ according to the sorted list given by an + // external source. + + ACE_DEBUG ((LM_INFO, "RM: static ranklist order:\n")); + + APP_SET tmp_apps; + for (OBJECTID_APPSET_MAP::iterator appset_it = objectid_appset_map_.begin (); + appset_it != objectid_appset_map_.begin (); + ++appset_it) + { + ACE_DEBUG ((LM_INFO, "[\toid %s:", appset_it->key ().c_str ())); + + RANKED_IOR_LIST iorlist; + iorlist.now = false; + + tmp_apps = appset_it->item (); + + RANKLIST_CONSTRAINT constr; + // add all entries mentioned in the constraints in sorted order + if (ranklist_constraints_.find (appset_it->key (), + constr) == 0) + { + for (RANKLIST_CONSTRAINT::iterator const_it = constr.begin (); + const_it != constr.end (); + ++const_it) + { + // this is inefficient, but we'll go through the set and find + // the appropriate entry + for (APP_SET::iterator as_it = tmp_apps.begin (); + as_it != tmp_apps.end (); + ++as_it) + { + // if we found the right guy + if (const_it->compare ((*as_it).host_name) == 0) + { + if ((*as_it).role != PRIMARY) + { + ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*as_it).host_name.c_str ())); + + // add to ranklist + iorlist.host_list.push_back ((*as_it).host_name); + iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*as_it).ior.in ())); } + + // remove from set + tmp_apps.remove (*as_it); + + break; } - } + } + } + } // end if find in ranklist_constraints_ + + // add remaing application entries + for (APP_SET::iterator app_it = tmp_apps.begin (); + app_it != tmp_apps.end (); + ++app_it) + { + if ((*app_it).role != PRIMARY) + { + ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*app_it).host_name.c_str ())); + + iorlist.host_list.push_back ((*app_it).host_name); + iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*app_it).ior.in ())); } + } // end for tmp_apps + objectid_rankedior_map_.bind (appset_it->key (), + iorlist); + + ACE_DEBUG ((LM_INFO, "\n")); + } // end for objectid_appset_map_ +} + +bool +ReplicationManager_i::replica_selection_algo (void) +{ + ACE_DLList update_list; + move_update_list(this->update_list_, update_list); + + if (!static_mode_) + { + if (process_updates (update_list)) + { + objectid_rankedior_map_.close (); + objectid_rankedior_map_.open (); + +#ifdef ORIGINAL_SELECTION_ALGORITHM + this->load_based_selection_algo (); +#else + this->static_selection_algo (); +#endif this->build_rank_list (); this->update_enhanced_ranklist (); - } - } - + } // end if process_updates + } // end if static_mode + if (!standby_) { send_rank_list (); send_state_synchronization_rank_list (); } - + return true; } @@ -1259,19 +1348,19 @@ ReplicationManager_i::update_enhanced_ranklist (void) // find the primary for (APP_SET::iterator s_it = as.begin (); s_it != as.end (); - ++s_it) - { - APP_INFO & ai = *s_it; - - // by looking at its role member - if (ai.role == PRIMARY) - { - primaries.bind ((*it).key (), ai); - - // if the primary has been found we can abort this loop - break; - } - } + ++s_it) + { + APP_INFO & ai = *s_it; + + // by looking at its role member + if (ai.role == PRIMARY) + { + primaries.bind ((*it).key (), ai); + + // if the primary has been found we can abort this loop + break; + } + } } // to determine the size of the enhanced rank_list we have to @@ -1850,6 +1939,36 @@ ReplicationManager_i::send_failure_notice (const char * object_id, } } +void +ReplicationManager_i::set_ranklist_constraints ( + const RankListConstraints & constraints) +{ + ACE_Write_Guard guard (constraint_lock_); + + ACE_DEBUG ((LM_TRACE, "RM: received ranklist constraints:\n")); + + RANKLIST_CONSTRAINT constraint; + for (CORBA::ULong i = 0; i < constraints.length (); ++i) + { + ACE_DEBUG ((LM_TRACE, "|\toid %s:", constraints[i].object_id.in ())); + + constraint.clear (); + for (CORBA::ULong j = 0; j < constraints[i].hosts.length (); ++j) + { + ACE_DEBUG ((LM_TRACE, + "\n|\t\t%s", + constraints[i].hosts[j].in ())); + + constraint.push_back (constraints[i].hosts[j].in ()); + } + + ranklist_constraints_.bind (constraints[i].object_id.in (), + constraint); + + ACE_DEBUG ((LM_TRACE, "\n")); + } +} + MonitorUpdate * MonitorUpdate::create_proc_fail_update (const char * pid) { diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h index c88ad1a834d..d3450873b3d 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h @@ -5,6 +5,7 @@ #define REPLICATION_MANAGER_H #include +#include #include #include "ace/Hash_Map_Manager_T.h" @@ -179,6 +180,15 @@ public: virtual void unregister_fault_notification ( FLARE::NotificationId id); + virtual void set_ranklist_constraints ( + const RankListConstraints & constraints); + + void + load_based_selection_algo (void); + + void + static_selection_algo (void); + bool replica_selection_algo (void); @@ -249,6 +259,15 @@ public: ACE_Hash, ACE_Equal_To, ACE_Null_Mutex> NOTIFICATION_MAP; + + typedef std::list RANKLIST_CONSTRAINT; + + typedef ACE_Hash_Map_Manager_Ex < + ACE_CString, + RANKLIST_CONSTRAINT, + ACE_Hash, + ACE_Equal_To, + ACE_Null_Mutex> RANKLIST_CONSTRAINT_MAP; enum { @@ -283,6 +302,9 @@ private: STRING_TO_DOUBLE_MAP objectid_load_map_; STRING_TO_STRING_MAP processid_host_map_; + RANKLIST_CONSTRAINT_MAP ranklist_constraints_; + ACE_RW_Thread_Mutex constraint_lock_; + STRING_TO_STRING_LIST_MAP processid_backup_map_; STRING_TO_STRING_LIST_MAP processid_primary_map_; STRING_TO_STRING_LIST_MAP hostid_process_map_; diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl index 5efb44faac6..8476ce80009 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl @@ -5,6 +5,16 @@ #include "StateSynchronizationAgent.idl" #include "FaultNotification.idl" +typedef sequence HostList; + +struct RankListConstraint +{ + string object_id; + HostList hosts; +}; + +typedef sequence RankListConstraints; + interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier { RankList register_agent (in Object agent_reference); @@ -15,7 +25,9 @@ interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier in string process_id, in short role, in Object server_reference); - + + void set_ranklist_constraints (in RankListConstraints constraints); + RankList register_state_synchronization_agent ( in string host_id, in string process_id, -- cgit v1.2.1