summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp295
1 files changed, 207 insertions, 88 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
index 420d2efd64b..ccca7908e88 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
@@ -160,7 +160,9 @@ ReplicationManager_i::ReplicationManager_i (CORBA::ORB_ptr orb,
static_mode_ (static_mode),
update_available_(update_mutex_),
update_list_full_(update_mutex_),
- subscription_counter_ (1)
+ ranklist_constraints_ (10),
+ subscription_counter_ (1),
+ notify_subscriptions_ (1)
{
algo_thread_ = new Algorithm (this);
algo_thread_->activate ();
@@ -825,98 +827,185 @@ ReplicationManager_i::replace_backup_tags (
}
}
-bool
-ReplicationManager_i::replica_selection_algo (void)
+void
+ReplicationManager_i::load_based_selection_algo (void)
{
- ACE_DLList <MonitorUpdate> update_list;
- move_update_list(this->update_list_, update_list);
-
- if (process_updates (update_list))
- {
- objectid_rankedior_map_.close ();
- objectid_rankedior_map_.open ();
-
- if (!static_mode_)
+ for (STRING_TO_STRING_LIST_MAP::iterator hp_iter =
+ hostid_process_map_.begin ();
+ hp_iter != hostid_process_map_.end ();
+ ++hp_iter)
+ {
+ STRING_TO_DOUBLE_MAP processor_level_host_util_map;
+
+ copy_map (this->hostid_util_map_,
+ processor_level_host_util_map);
+
+ STRING_LIST & process_list = (*hp_iter).item ();
+
+ for (STRING_LIST::iterator pl_iter =
+ process_list.begin ();
+ pl_iter != process_list.end ();
+ ++pl_iter)
{
- for (STRING_TO_STRING_LIST_MAP::iterator hp_iter =
- hostid_process_map_.begin ();
- hp_iter != hostid_process_map_.end ();
- ++hp_iter)
- {
- STRING_TO_DOUBLE_MAP processor_level_host_util_map;
-
- copy_map (this->hostid_util_map_,
- processor_level_host_util_map);
-
- STRING_LIST & process_list = (*hp_iter).item ();
-
- for (STRING_LIST::iterator pl_iter =
- process_list.begin ();
- pl_iter != process_list.end ();
- ++pl_iter)
- {
- STRING_TO_DOUBLE_MAP process_level_host_util_map;
-
- copy_map (this->hostid_util_map_,
- process_level_host_util_map);
+ STRING_TO_DOUBLE_MAP process_level_host_util_map;
+
+ copy_map (this->hostid_util_map_,
+ process_level_host_util_map);
- STRING_TO_DOUBLE_MAP & host_util_map =
- (this->mode_ == PROCESS_LEVEL)
- ? process_level_host_util_map
- : processor_level_host_util_map;
-
- STRING_LIST primary_object_list;
+ STRING_TO_DOUBLE_MAP & host_util_map =
+ (this->mode_ == PROCESS_LEVEL)
+ ? process_level_host_util_map
+ : processor_level_host_util_map;
+
+ STRING_LIST primary_object_list;
+
+ if (processid_primary_map_.find (*pl_iter,
+ primary_object_list) == 0) // If present
+ {
+ for (STRING_LIST::iterator po_iter =
+ primary_object_list.begin ();
+ po_iter != primary_object_list.end ();
+ ++po_iter)
+ {
+ STRING_LIST host_list =
+ non_primary_host_list (*po_iter);
- if (processid_primary_map_.find (*pl_iter,
- primary_object_list) == 0) // If present
+ // If the object has backups...
+ if (host_list.size () >= 1)
{
- for (STRING_LIST::iterator po_iter =
- primary_object_list.begin ();
- po_iter != primary_object_list.end ();
- ++po_iter)
+ std::priority_queue<UtilRank> util_ranked_queue =
+ util_sorted_host_list(*po_iter, host_list, host_util_map);
+
+ // this check is necessary to make sure
+ // there is utilization information for
+ // this host available. When a
+ // ReplicationManager starts up for
+ // example the host_monitor on that host
+ // is not yet running.
+ if (util_ranked_queue.size () > 0)
{
- STRING_LIST host_list =
- non_primary_host_list (*po_iter);
+ UtilRank ur (util_ranked_queue.top ());
- // If the object has backups...
- if (host_list.size () >= 1)
- {
- std::priority_queue<UtilRank> util_ranked_queue =
- util_sorted_host_list(*po_iter, host_list, host_util_map);
-
- // this check is necessary to make sure
- // there is utilization information for
- // this host available. When a
- // ReplicationManager starts up for
- // example the host_monitor on that host
- // is not yet running.
- if (util_ranked_queue.size () > 0)
- {
- UtilRank ur (util_ranked_queue.top ());
-
- host_util_map.rebind (ur.host_id.c_str (),
- ur.util);
-
- update_ior_map (*po_iter,
- util_ranked_queue);
- }
- }
+ host_util_map.rebind (ur.host_id.c_str (),
+ ur.util);
+
+ update_ior_map (*po_iter,
+ util_ranked_queue);
+ }
+ } //end if (host_list.size () >= 1)
+ } // end for primary_object_list
+ } // end if processid_primary_map_.find ()
+ } // end for process_list
+ } // end for hostid_process_map
+}
+
+void
+ReplicationManager_i::static_selection_algo (void)
+{
+ // enter entries from objectid_appset_map_ into
+ // objectid_rankedior_map_ according to the sorted list given by an
+ // external source.
+
+ ACE_DEBUG ((LM_INFO, "RM: static ranklist order:\n"));
+
+ APP_SET tmp_apps;
+ for (OBJECTID_APPSET_MAP::iterator appset_it = objectid_appset_map_.begin ();
+ appset_it != objectid_appset_map_.begin ();
+ ++appset_it)
+ {
+ ACE_DEBUG ((LM_INFO, "[\toid %s:", appset_it->key ().c_str ()));
+
+ RANKED_IOR_LIST iorlist;
+ iorlist.now = false;
+
+ tmp_apps = appset_it->item ();
+
+ RANKLIST_CONSTRAINT constr;
+ // add all entries mentioned in the constraints in sorted order
+ if (ranklist_constraints_.find (appset_it->key (),
+ constr) == 0)
+ {
+ for (RANKLIST_CONSTRAINT::iterator const_it = constr.begin ();
+ const_it != constr.end ();
+ ++const_it)
+ {
+ // this is inefficient, but we'll go through the set and find
+ // the appropriate entry
+ for (APP_SET::iterator as_it = tmp_apps.begin ();
+ as_it != tmp_apps.end ();
+ ++as_it)
+ {
+ // if we found the right guy
+ if (const_it->compare ((*as_it).host_name) == 0)
+ {
+ if ((*as_it).role != PRIMARY)
+ {
+ ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*as_it).host_name.c_str ()));
+
+ // add to ranklist
+ iorlist.host_list.push_back ((*as_it).host_name);
+ iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*as_it).ior.in ()));
}
+
+ // remove from set
+ tmp_apps.remove (*as_it);
+
+ break;
}
- }
+ }
+ }
+ } // end if find in ranklist_constraints_
+
+ // add remaing application entries
+ for (APP_SET::iterator app_it = tmp_apps.begin ();
+ app_it != tmp_apps.end ();
+ ++app_it)
+ {
+ if ((*app_it).role != PRIMARY)
+ {
+ ACE_DEBUG ((LM_INFO, "[\n\t\t%s", (*app_it).host_name.c_str ()));
+
+ iorlist.host_list.push_back ((*app_it).host_name);
+ iorlist.ior_list.push_back (CORBA::Object::_duplicate ((*app_it).ior.in ()));
}
+ } // end for tmp_apps
+ objectid_rankedior_map_.bind (appset_it->key (),
+ iorlist);
+
+ ACE_DEBUG ((LM_INFO, "\n"));
+ } // end for objectid_appset_map_
+}
+
+bool
+ReplicationManager_i::replica_selection_algo (void)
+{
+ ACE_DLList <MonitorUpdate> update_list;
+ move_update_list(this->update_list_, update_list);
+
+ if (!static_mode_)
+ {
+ if (process_updates (update_list))
+ {
+ objectid_rankedior_map_.close ();
+ objectid_rankedior_map_.open ();
+
+#ifdef ORIGINAL_SELECTION_ALGORITHM
+ this->load_based_selection_algo ();
+#else
+ this->static_selection_algo ();
+#endif
this->build_rank_list ();
this->update_enhanced_ranklist ();
- }
- }
-
+ } // end if process_updates
+ } // end if static_mode
+
if (!standby_)
{
send_rank_list ();
send_state_synchronization_rank_list ();
}
-
+
return true;
}
@@ -1259,19 +1348,19 @@ ReplicationManager_i::update_enhanced_ranklist (void)
// find the primary
for (APP_SET::iterator s_it = as.begin ();
s_it != as.end ();
- ++s_it)
- {
- APP_INFO & ai = *s_it;
-
- // by looking at its role member
- if (ai.role == PRIMARY)
- {
- primaries.bind ((*it).key (), ai);
-
- // if the primary has been found we can abort this loop
- break;
- }
- }
+ ++s_it)
+ {
+ APP_INFO & ai = *s_it;
+
+ // by looking at its role member
+ if (ai.role == PRIMARY)
+ {
+ primaries.bind ((*it).key (), ai);
+
+ // if the primary has been found we can abort this loop
+ break;
+ }
+ }
}
// to determine the size of the enhanced rank_list we have to
@@ -1850,6 +1939,36 @@ ReplicationManager_i::send_failure_notice (const char * object_id,
}
}
+void
+ReplicationManager_i::set_ranklist_constraints (
+ const RankListConstraints & constraints)
+{
+ ACE_Write_Guard <ACE_RW_Thread_Mutex> guard (constraint_lock_);
+
+ ACE_DEBUG ((LM_TRACE, "RM: received ranklist constraints:\n"));
+
+ RANKLIST_CONSTRAINT constraint;
+ for (CORBA::ULong i = 0; i < constraints.length (); ++i)
+ {
+ ACE_DEBUG ((LM_TRACE, "|\toid %s:", constraints[i].object_id.in ()));
+
+ constraint.clear ();
+ for (CORBA::ULong j = 0; j < constraints[i].hosts.length (); ++j)
+ {
+ ACE_DEBUG ((LM_TRACE,
+ "\n|\t\t%s",
+ constraints[i].hosts[j].in ()));
+
+ constraint.push_back (constraints[i].hosts[j].in ());
+ }
+
+ ranklist_constraints_.bind (constraints[i].object_id.in (),
+ constraint);
+
+ ACE_DEBUG ((LM_TRACE, "\n"));
+ }
+}
+
MonitorUpdate *
MonitorUpdate::create_proc_fail_update (const char * pid)
{