summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h')
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h399
1 files changed, 399 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
new file mode 100644
index 00000000000..5fc2bada0c8
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
@@ -0,0 +1,399 @@
+// -*- C++ -*-
+// $Id$
+
+#ifndef REPLICATION_MANAGER_H
+#define REPLICATION_MANAGER_H
+
+#include <list>
+#include <queue>
+
+#include "ace/Hash_Map_Manager_T.h"
+#include "ace/Unbounded_Queue.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Condition_T.h"
+#include "ace/Containers_T.h"
+#include "ace/Recursive_Thread_Mutex.h"
+#include "AppSideReg.h"
+#include "Timer.h"
+
+#include "ReplicationManagerS.h"
+
+#include "rm_impl_export.h"
+
+enum Role
+{
+ PRIMARY = 1,
+ BACKUP = 2
+};
+
+enum AlgoMode
+{
+ PROCESS_LEVEL = 1,
+ PROCESSOR_LEVEL = 2
+};
+
+class Algorithm;
+
+struct APP_INFO
+{
+ ACE_CString object_id;
+ double load;
+ ACE_CString host_name;
+ ACE_CString process_id;
+ Role role;
+ CORBA::Object_var ior;
+
+ APP_INFO (void);
+
+ APP_INFO (APP_INFO const & app_info);
+
+ APP_INFO (const char *oid,
+ double l,
+ const char *hname,
+ const char *pid,
+ Role r,
+ CORBA::Object_ptr ref);
+
+ APP_INFO (const char *oid,
+ const char *hname,
+ const char *pid,
+ Role r);
+
+ void swap (APP_INFO & app_info);
+ APP_INFO & operator = (APP_INFO const & app_info);
+};
+
+bool operator == (APP_INFO const & lhs, APP_INFO const & rhs);
+
+struct RANKED_IOR_LIST
+{
+ bool now;
+ std::list<CORBA::Object_var> ior_list;
+ std::list<ACE_CString> host_list;
+
+ RANKED_IOR_LIST (void);
+};
+
+struct UtilRank
+{
+ double util;
+ std::string host_id;
+
+ UtilRank (void);
+ UtilRank (UtilRank const & ur);
+ UtilRank (double u, const char * hid);
+};
+
+bool operator < (UtilRank const & u1, UtilRank const & u2);
+
+struct MonitorUpdate
+{
+ enum UpdateType
+ {
+ PROC_FAIL_UPDATE,
+ HOST_UTIL_UPDATE,
+ RUN_NOW,
+ APP_REG
+ };
+
+ UpdateType type;
+
+ ACE_CString process_id;
+ ACE_CString host_id;
+ double value;
+ APP_INFO app_info;
+
+ static MonitorUpdate *
+ create_proc_fail_update (const char * pid);
+
+ static MonitorUpdate *
+ create_host_util_update (const char *hid, double value);
+
+ static MonitorUpdate *
+ create_run_now_update (void);
+
+ static MonitorUpdate *
+ create_app_info_update (const char *oid,
+ double l,
+ const char *hname,
+ const char *pid,
+ Role r,
+ CORBA::Object_ptr ref);
+};
+
+class RM_Impl_Export ReplicationManager_i
+ : public virtual POA_ReplicationManager,
+ protected Timer
+{
+public:
+ ReplicationManager_i (CORBA::ORB_ptr orb,
+ double hertz,
+ bool proactive = true,
+ bool static_mode = false,
+ AlgoMode mode = PROCESS_LEVEL);
+
+ ~ReplicationManager_i (void);
+
+ virtual void
+ register_application (const char *object_id,
+ double load,
+ const char *host_name,
+ const char *process_id,
+ CORBA::Short role,
+ CORBA::Object_ptr server_reference);
+
+ void
+ util_update (const char *host_id,
+ double util);
+
+ virtual void
+ proc_failure (const char *process_id);
+
+ virtual RankList *
+ register_agent (CORBA::Object_ptr agent_reference);
+
+ virtual RankList *
+ register_state_synchronization_agent (
+ const char * host_id,
+ const char * process_id,
+ StateSynchronizationAgent_ptr agent);
+
+ virtual CORBA::Object_ptr
+ get_next (const char * object_id);
+
+ virtual void set_state (const ::CORBA::Any & state_value);
+
+ virtual ::CORBA::Any * get_state (void);
+
+ virtual ::StateSynchronizationAgent_ptr agent (void);
+
+ virtual void agent (StateSynchronizationAgent_ptr agent);
+
+ virtual char * object_id (void);
+
+ virtual void object_id (const char * object_id);
+
+ virtual FLARE::NotificationId register_fault_notification (
+ FLARE::FaultNotification_ptr receiver);
+
+ virtual void unregister_fault_notification (
+ FLARE::NotificationId id);
+
+ virtual void set_ranklist_constraints (
+ const RankListConstraints & constraints);
+
+ void
+ load_based_selection_algo (void);
+
+ void
+ static_selection_algo (void);
+
+ bool
+ replica_selection_algo (void);
+
+ typedef ACE_Unbounded_Set<ACE_CString> STRING_LIST;
+ typedef ACE_Unbounded_Set<APP_INFO> APP_SET;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ STRING_LIST,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> STRING_TO_STRING_LIST_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ ACE_CString,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> STRING_TO_STRING_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ double,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> STRING_TO_DOUBLE_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ APP_SET,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> OBJECTID_APPSET_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ APP_INFO,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> OBJECTID_APPINFO_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ ACE_CString,
+ RANKED_IOR_LIST,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> OBJECTID_RANKED_IOR_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<ACE_CString,
+ CORBA::Object_var,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex>
+ STRING_OBJECT_MAP;
+
+ typedef ACE_Unbounded_Set<CORBA::Object_var> AGENT_LIST;
+
+ typedef ACE_Hash_Map_Manager_Ex <
+ ACE_CString,
+ StateSynchronizationAgent_var,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> STATE_SYNC_AGENT_MAP;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ FLARE::NotificationId,
+ FLARE::FaultNotification_var,
+ ACE_Hash<FLARE::NotificationId>,
+ ACE_Equal_To<FLARE::NotificationId>,
+ ACE_Null_Mutex> NOTIFICATION_MAP;
+
+ typedef std::list<ACE_CString> RANKLIST_CONSTRAINT;
+
+ typedef ACE_Hash_Map_Manager_Ex <
+ ACE_CString,
+ RANKLIST_CONSTRAINT,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> RANKLIST_CONSTRAINT_MAP;
+
+ enum
+ {
+ UPDATE_LIST_MAX_SIZE = 100
+ };
+
+private:
+ CORBA::ORB_var orb_;
+ AppSideReg proc_reg_;
+
+ Algorithm * algo_thread_;
+ bool standby_;
+ bool proactive_;
+ AlgoMode mode_;
+
+ // local StateSynchonizationAgent
+ StateSynchronizationAgent_var agent_;
+
+ /// This flag disables the usage of host load calculations and
+ /// therefore enables single host scenarios.
+ bool static_mode_;
+
+ ACE_DLList <MonitorUpdate> update_list_;
+ ACE_Thread_Mutex update_mutex_;
+ ACE_Condition <ACE_Thread_Mutex> update_available_;
+ ACE_Condition <ACE_Thread_Mutex> update_list_full_;
+
+ OBJECTID_APPSET_MAP objectid_appset_map_;
+ ACE_Recursive_Thread_Mutex appset_lock_;
+ OBJECTID_RANKED_IOR_MAP objectid_rankedior_map_;
+ STRING_TO_DOUBLE_MAP hostid_util_map_;
+ STRING_TO_DOUBLE_MAP objectid_load_map_;
+ STRING_TO_STRING_MAP processid_host_map_;
+
+ RANKLIST_CONSTRAINT_MAP ranklist_constraints_;
+ ACE_RW_Thread_Mutex constraint_lock_;
+
+ STRING_TO_STRING_LIST_MAP processid_backup_map_;
+ STRING_TO_STRING_LIST_MAP processid_primary_map_;
+ STRING_TO_STRING_LIST_MAP hostid_process_map_;
+
+ RankList rank_list_;
+ RankList enhanced_rank_list_;
+ AGENT_LIST agent_list_;
+ ACE_RW_Thread_Mutex rank_list_mutex_;
+ ACE_Thread_Mutex enhanced_rank_list_agent_list_combined_mutex_;
+ STATE_SYNC_AGENT_MAP state_synchronization_agent_map_;
+ ACE_Thread_Mutex state_sync_agent_list_mutex_;
+
+ ACE_Thread_Mutex notify_mutex_;
+ FLARE::NotificationId subscription_counter_;
+ NOTIFICATION_MAP notify_subscriptions_;
+
+ void update_enhanced_ranklist (void);
+
+ void update_map (const char * key_str,
+ const char * value_str,
+ STRING_TO_STRING_LIST_MAP & map);
+
+ void update_util_map (const char * key_str,
+ double value,
+ STRING_TO_DOUBLE_MAP & map);
+
+ void update_appset_map (const char * key_str,
+ APP_INFO const & app_info,
+ OBJECTID_APPSET_MAP &);
+
+ void update_ior_map (ACE_CString const & oid,
+ std::priority_queue<UtilRank> const & rank_list);
+
+ void update_proc_host_map (const char *pid,
+ const char * hid,
+ STRING_TO_STRING_MAP & map);
+
+ std::priority_queue <UtilRank>
+ util_sorted_host_list (ACE_CString const & oid,
+ STRING_LIST const & host_list,
+ STRING_TO_DOUBLE_MAP const & hu_map);
+
+ virtual int pulse (void);
+
+ void move_update_list (ACE_DLList<MonitorUpdate> & source,
+ ACE_DLList<MonitorUpdate> & dest);
+
+ bool process_updates(ACE_DLList<MonitorUpdate> & update_list);
+
+ void remove_process(ACE_CString const & pid,
+ STRING_TO_STRING_LIST_MAP & map,
+ ACE_CString const & host,
+ Role role);
+
+ void send_rank_list (void);
+
+ void send_state_synchronization_rank_list (void);
+
+ void build_rank_list (void);
+
+ void app_reg(APP_INFO & app_info);
+
+ void static_ranklist_update (const char * object_id,
+ CORBA::Object_ptr ior,
+ Role role);
+
+ void process_proc_failure (ACE_CString const & process_id);
+
+ STRING_LIST non_primary_host_list (ACE_CString const & primary_object_id);
+
+ void replace_primary_tags (ACE_CString const & pid,
+ ACE_CString const & host);
+
+ void remove_from_appset (ACE_CString const & host,
+ ACE_CString const & pid,
+ ACE_CString const & tag,
+ Role role);
+
+ void elevate_backup_to_primary (ACE_CString const & tag);
+
+ void replace_backup_tags (ACE_CString const & pid,
+ ACE_CString const & host);
+
+ void copy_map (STRING_TO_DOUBLE_MAP const & source,
+ STRING_TO_DOUBLE_MAP & dest);
+
+ void print_queue (std::priority_queue <UtilRank> queue);
+
+ void send_failure_notice (const char * host,
+ const ::FLARE::ApplicationList & object_ids);
+};
+
+#endif /* REPLICATION_MANAGER_H */