summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-02-28 00:35:24 +0000
committerwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-02-28 00:35:24 +0000
commit868f706bbb632a1bd789e9bda9c9431218bf35de (patch)
treead8794c24ee4e7bde8ce92f58e2985c00ab02a56
parent80ce4efec3ef05e3be6a4a0d41b86b6fef31f21c (diff)
downloadATCD-868f706bbb632a1bd789e9bda9c9431218bf35de.tar.gz
ChangeLogTag: Sat Feb 28 00:31:20 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
-rw-r--r--TAO/ChangeLog13
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl13
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp74
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h24
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl3
6 files changed, 124 insertions, 5 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index a9333d5683d..233b945280c 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,16 @@
+Sat Feb 28 00:31:20 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
+
+ * orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp:
+
+ Changed debugging level in error case.
+
+ * orbsvcs/orbsvcs/LWFT/FaultNotification.idl:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.h
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+
+ Added FaultNotifier interface to ReplicationManager.
+
Thu Feb 26 23:27:15 UTC 2009 Friedhelm Wolf <fwolf@dre.vanderbilt.edu>
* orbsvcs/orbsvcs/LWFT/LWFT.mpc
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp b/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp
index 475799f22d8..4b6ba6a7912 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.cpp
@@ -50,7 +50,7 @@ AppSideMonitor_Thread::svc (void)
if (acceptor_.open (serv_addr_) == -1)
{
- ACE_DEBUG ((LM_DEBUG,
+ ACE_DEBUG ((LM_ERROR,
"AppSideMonitor_Thread::svc: can't open the socket.\n"));
// this->synchronizer_->wait ();
return EXIT_FAILURE;
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl b/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl
index b592c2a5247..28fe737b14e 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl
+++ b/TAO/orbsvcs/orbsvcs/LWFT/FaultNotification.idl
@@ -10,6 +10,19 @@ module FLARE {
in string component_instance);
};
+ exception NotifyRegistrationError {};
+
+ typedef unsigned long NotificationId;
+
+ interface FaultNotifier {
+
+ NotificationId register_fault_notification (in FaultNotification receiver)
+ raises (NotifyRegistrationError);
+
+ void unregister_fault_notification (in NotificationId id);
+
+ };
+
};
#endif /* FAULTNOTIFICATION_IDL */
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
index 3e79e7a2946..97f7615308c 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
@@ -159,7 +159,8 @@ ReplicationManager_i::ReplicationManager_i (CORBA::ORB_ptr orb,
mode_(mode),
static_mode_ (static_mode),
update_available_(update_mutex_),
- update_list_full_(update_mutex_)
+ update_list_full_(update_mutex_),
+ subscription_counter_ (1)
{
algo_thread_ = new Algorithm (this);
algo_thread_->activate ();
@@ -393,6 +394,8 @@ void
ReplicationManager_i::process_proc_failure (
ACE_CString const & process_id)
{
+ ACE_DEBUG ((LM_TRACE, "RM: process_proc_failure (%s)\n", process_id.c_str ()));
+
if (static_mode_)
{
//ACE_DEBUG ((LM_TRACE, "RM: process_proc_failure ()\n"));
@@ -609,6 +612,19 @@ ReplicationManager_i::process_proc_failure (
// If present...
if (this->processid_host_map_.find (process_id, host) == 0)
{
+ ACE_DEBUG ((LM_TRACE,
+ "RM: process %s on host %s failed",
+ process_id.c_str (),
+ host.c_str ()));
+
+ STRING_LIST primaries;
+ if (processid_primary_map_.find (process_id, primaries) == 0)
+ {
+ // for now just take the first entry and send it
+ send_failure_notice ((*(primaries.begin ())).c_str (),
+ host.c_str ());
+ }
+
replace_primary_tags (process_id, host);
replace_backup_tags (process_id, host);
this->processid_host_map_.unbind (process_id);
@@ -979,6 +995,9 @@ ReplicationManager_i::app_reg (APP_INFO & app)
const char * process_id = app.process_id.c_str ();
Role role = app.role;
+ // add the received information to a map that knows which object ids
+ // are in a process
+
if (!static_mode_)
{
update_map (host_name, process_id,
@@ -1768,6 +1787,59 @@ ReplicationManager_i::object_id (const char * /* object_id */)
// no-op
}
+FLARE::NotificationId
+ReplicationManager_i::register_fault_notification (
+ FLARE::FaultNotification_ptr receiver)
+{
+ ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_);
+
+ if (notify_subscriptions_.bind (
+ subscription_counter_,
+ FLARE::FaultNotification::_duplicate (receiver)) != 0)
+ {
+ throw FLARE::NotifyRegistrationError ();
+ }
+
+ return subscription_counter_++;
+}
+
+void
+ReplicationManager_i::unregister_fault_notification (
+ FLARE::NotificationId id)
+{
+ ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_);
+
+ notify_subscriptions_.unbind (id);
+}
+
+void
+ReplicationManager_i::send_failure_notice (const char * object_id,
+ const char * host_id)
+{
+ ACE_DEBUG ((LM_TRACE,
+ "RM: '%s' on '%s' failed.\n",
+ object_id,
+ host_id));
+
+ try
+ {
+ ACE_Guard <ACE_Thread_Mutex> guard (notify_mutex_);
+
+ for (NOTIFICATION_MAP::iterator it = notify_subscriptions_.begin ();
+ it != notify_subscriptions_.end ();
+ ++it)
+ {
+ it->item ()->proc_failure (host_id, object_id);
+ }
+ }
+ catch (const CORBA::Exception & ex)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "RM: send_failure_notice - caught %s\n",
+ ex._info ().c_str ()));
+ }
+}
+
MonitorUpdate *
MonitorUpdate::create_proc_fail_update (const char * pid)
{
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
index 90ef719612f..d777e265a20 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
@@ -130,7 +130,7 @@ public:
double hertz,
bool proactive = true,
bool static_mode = false,
- AlgoMode mode = PROCESS_LEVEL);
+ AlgoMode mode = PROCESS_LEVEL);
~ReplicationManager_i (void);
@@ -173,6 +173,12 @@ public:
virtual void object_id (const char * object_id);
+ virtual FLARE::NotificationId register_fault_notification (
+ FLARE::FaultNotification_ptr receiver);
+
+ virtual void unregister_fault_notification (
+ FLARE::NotificationId id);
+
bool
replica_selection_algo (void);
@@ -232,6 +238,13 @@ public:
typedef ACE_Unbounded_Set<StateSynchronizationAgent_var>
STATE_SYNC_AGENT_LIST;
+
+ typedef ACE_Hash_Map_Manager_Ex<
+ FLARE::NotificationId,
+ FLARE::FaultNotification_var,
+ ACE_Hash<FLARE::NotificationId>,
+ ACE_Equal_To<FLARE::NotificationId>,
+ ACE_Null_Mutex> NOTIFICATION_MAP;
enum
{
@@ -265,7 +278,7 @@ private:
STRING_TO_DOUBLE_MAP hostid_util_map_;
STRING_TO_DOUBLE_MAP objectid_load_map_;
STRING_TO_STRING_MAP processid_host_map_;
-
+
STRING_TO_STRING_LIST_MAP processid_backup_map_;
STRING_TO_STRING_LIST_MAP processid_primary_map_;
STRING_TO_STRING_LIST_MAP hostid_process_map_;
@@ -278,6 +291,10 @@ private:
STATE_SYNC_AGENT_LIST state_synchronization_agent_list_;
ACE_Thread_Mutex state_sync_agent_list_mutex_;
+ ACE_Thread_Mutex notify_mutex_;
+ FLARE::NotificationId subscription_counter_;
+ NOTIFICATION_MAP notify_subscriptions_;
+
void update_enhanced_ranklist (void);
void update_map (const char * key_str,
@@ -349,6 +366,9 @@ private:
STRING_TO_DOUBLE_MAP & dest);
void print_queue (std::priority_queue <UtilRank> queue);
+
+ void send_failure_notice (const char * object_id,
+ const char * host_id);
};
#endif /* REPLICATION_MANAGER_H */
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
index 1732484445e..5efb44faac6 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
@@ -3,8 +3,9 @@
#include "LWFTDataStructures.idl"
#include "StateSynchronizationAgent.idl"
+#include "FaultNotification.idl"
-interface ReplicationManager : ReplicatedApplication
+interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier
{
RankList register_agent (in Object agent_reference);