diff options
author | wolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2009-02-28 00:35:24 +0000 |
---|---|---|
committer | wolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2009-02-28 00:35:24 +0000 |
commit | 868f706bbb632a1bd789e9bda9c9431218bf35de (patch) | |
tree | ad8794c24ee4e7bde8ce92f58e2985c00ab02a56 | |
parent | 80ce4efec3ef05e3be6a4a0d41b86b6fef31f21c (diff) | |
download | ATCD-868f706bbb632a1bd789e9bda9c9431218bf35de.tar.gz |
ChangeLogTag: Sat Feb 28 00:31:20 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
-rw-r--r-- | TAO/ChangeLog | 13 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl | 13 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp | 74 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h | 24 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl | 3 |
6 files changed, 124 insertions, 5 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index a9333d5683d..233b945280c 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,16 @@ +Sat Feb 28 00:31:20 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu> + + * orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp: + + Changed debugging level in error case. + + * orbsvcs/orbsvcs/LWFT/FaultNotification.idl: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl + * orbsvcs/orbsvcs/LWFT/ReplicationManager.h + * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp + + Added FaultNotifier interface to ReplicationManager. + Thu Feb 26 23:27:15 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu> * orbsvcs/orbsvcs/LWFT/LWFT.mpc diff --git a/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp b/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp index 475799f22d8..4b6ba6a7912 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp @@ -50,7 +50,7 @@ AppSideMonitor_Thread::svc (void) if (acceptor_.open (serv_addr_) == -1) { - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_ERROR, "AppSideMonitor_Thread::svc: can't open the socket.\n")); // this->synchronizer_->wait (); return EXIT_FAILURE; diff --git a/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl b/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl index b592c2a5247..28fe737b14e 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl +++ b/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl @@ -10,6 +10,19 @@ module FLARE { in string component_instance); }; + exception NotifyRegistrationError {}; + + typedef unsigned long NotificationId; + + interface FaultNotifier { + + NotificationId register_fault_notification (in FaultNotification receiver) + raises (NotifyRegistrationError); + + void unregister_fault_notification (in NotificationId id); + + }; + }; #endif /* FAULTNOTIFICATION_IDL */ diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp index 3e79e7a2946..97f7615308c 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp @@ -159,7 +159,8 @@ ReplicationManager_i::ReplicationManager_i (CORBA::ORB_ptr orb, mode_(mode), static_mode_ (static_mode), update_available_(update_mutex_), - update_list_full_(update_mutex_) + update_list_full_(update_mutex_), + subscription_counter_ (1) { algo_thread_ = new Algorithm (this); algo_thread_->activate (); @@ -393,6 +394,8 @@ void ReplicationManager_i::process_proc_failure ( ACE_CString const & process_id) { + ACE_DEBUG ((LM_TRACE, "RM: process_proc_failure (%s)\n", process_id.c_str ())); + if (static_mode_) { //ACE_DEBUG ((LM_TRACE, "RM: process_proc_failure ()\n")); @@ -609,6 +612,19 @@ ReplicationManager_i::process_proc_failure ( // If present... if (this->processid_host_map_.find (process_id, host) == 0) { + ACE_DEBUG ((LM_TRACE, + "RM: process %s on host %s failed", + process_id.c_str (), + host.c_str ())); + + STRING_LIST primaries; + if (processid_primary_map_.find (process_id, primaries) == 0) + { + // for now just take the first entry and send it + send_failure_notice ((*(primaries.begin ())).c_str (), + host.c_str ()); + } + replace_primary_tags (process_id, host); replace_backup_tags (process_id, host); this->processid_host_map_.unbind (process_id); @@ -979,6 +995,9 @@ ReplicationManager_i::app_reg (APP_INFO & app) const char * process_id = app.process_id.c_str (); Role role = app.role; + // add the received information to a map that knows which object ids + // are in a process + if (!static_mode_) { update_map (host_name, process_id, @@ -1768,6 +1787,59 @@ ReplicationManager_i::object_id (const char * /* object_id */) // no-op } +FLARE::NotificationId +ReplicationManager_i::register_fault_notification ( + FLARE::FaultNotification_ptr receiver) +{ + ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_); + + if (notify_subscriptions_.bind ( + subscription_counter_, + FLARE::FaultNotification::_duplicate (receiver)) != 0) + { + throw FLARE::NotifyRegistrationError (); + } + + return subscription_counter_++; +} + +void +ReplicationManager_i::unregister_fault_notification ( + FLARE::NotificationId id) +{ + ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_); + + notify_subscriptions_.unbind (id); +} + +void +ReplicationManager_i::send_failure_notice (const char * object_id, + const char * host_id) +{ + ACE_DEBUG ((LM_TRACE, + "RM: '%s' on '%s' failed.\n", + object_id, + host_id)); + + try + { + ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_); + + for (NOTIFICATION_MAP::iterator it = notify_subscriptions_.begin (); + it != notify_subscriptions_.end (); + ++it) + { + it->item ()->proc_failure (host_id, object_id); + } + } + catch (const CORBA::Exception & ex) + { + ACE_DEBUG ((LM_ERROR, + "RM: send_failure_notice - caught %s\n", + ex._info ().c_str ())); + } +} + MonitorUpdate * MonitorUpdate::create_proc_fail_update (const char * pid) { diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h index 90ef719612f..d777e265a20 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h @@ -130,7 +130,7 @@ public: double hertz, bool proactive = true, bool static_mode = false, - AlgoMode mode = PROCESS_LEVEL); + AlgoMode mode = PROCESS_LEVEL); ~ReplicationManager_i (void); @@ -173,6 +173,12 @@ public: virtual void object_id (const char * object_id); + virtual FLARE::NotificationId register_fault_notification ( + FLARE::FaultNotification_ptr receiver); + + virtual void unregister_fault_notification ( + FLARE::NotificationId id); + bool replica_selection_algo (void); @@ -232,6 +238,13 @@ public: typedef ACE_Unbounded_Set<StateSynchronizationAgent_var> STATE_SYNC_AGENT_LIST; + + typedef ACE_Hash_Map_Manager_Ex< + FLARE::NotificationId, + FLARE::FaultNotification_var, + ACE_Hash<FLARE::NotificationId>, + ACE_Equal_To<FLARE::NotificationId>, + ACE_Null_Mutex> NOTIFICATION_MAP; enum { @@ -265,7 +278,7 @@ private: STRING_TO_DOUBLE_MAP hostid_util_map_; STRING_TO_DOUBLE_MAP objectid_load_map_; STRING_TO_STRING_MAP processid_host_map_; - + STRING_TO_STRING_LIST_MAP processid_backup_map_; STRING_TO_STRING_LIST_MAP processid_primary_map_; STRING_TO_STRING_LIST_MAP hostid_process_map_; @@ -278,6 +291,10 @@ private: STATE_SYNC_AGENT_LIST state_synchronization_agent_list_; ACE_Thread_Mutex state_sync_agent_list_mutex_; + ACE_Thread_Mutex notify_mutex_; + FLARE::NotificationId subscription_counter_; + NOTIFICATION_MAP notify_subscriptions_; + void update_enhanced_ranklist (void); void update_map (const char * key_str, @@ -349,6 +366,9 @@ private: STRING_TO_DOUBLE_MAP & dest); void print_queue (std::priority_queue <UtilRank> queue); + + void send_failure_notice (const char * object_id, + const char * host_id); }; #endif /* REPLICATION_MANAGER_H */ diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl index 1732484445e..5efb44faac6 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl @@ -3,8 +3,9 @@ #include "LWFTDataStructures.idl" #include "StateSynchronizationAgent.idl" +#include "FaultNotification.idl" -interface ReplicationManager : ReplicatedApplication +interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier { RankList register_agent (in Object agent_reference); |