summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp167
1 files changed, 160 insertions, 7 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
index 4357599c3d9..9cc65a98a52 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
@@ -7,11 +7,12 @@
#include <string>
#include <set>
#include <ace/Date_Time.h>
-#include "ace/OS_NS_unistd.h"
+#include <ace/OS_NS_unistd.h>
#include <ace/High_Res_Timer.h>
#include "AppSideMonitor_Thread.h"
#include "ForwardingAgentC.h"
#include "AppInfoC.h"
+#include "SSA_AMI_Handler.h"
template <class T>
void myswap (T & t1, T & t2)
@@ -1797,9 +1798,157 @@ ReplicationManager_i::get_next (const char * /* object_id */)
return CORBA::Object::_nil ();
}
-void ReplicationManager_i::finish_invocation(const char * object_id)
+CORBA::Object_ptr ReplicationManager_i::prepare_reinvocation(
+ const char * object_id,
+ CORBA::Long last_successful,
+ CORBA::Long tsec,
+ CORBA::Long tusec)
+{
+ // Acquiring two lock like this could lead to a deadlock if
+ // some other thread is acquiring the same two locks in the
+ // reverse order. One solution is to put all these locks in an
+ // array and acquire them only in the increasing order of index.
+ ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_);
+ ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
+
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "object_id = %s.\n",object_id));
+
+ // This phase is needed only in the lag-by-one approach to prepare
+ // the lagging object by discarding potential orphan state, which
+ // has really become orphan because of a failure.
+ size_t phase = 2;
+ //ami_phase(phase, object_id, tsec, tusec);
+
+ APP_SET app_set;
+ if(objectid_appset_map_.find(object_id, app_set) == 0)
+ {
+ for(APP_SET::iterator app_set_iter = app_set.begin();
+ app_set_iter != app_set.end();
+ ++app_set_iter)
+ {
+ if((*app_set_iter).role == phase)
+ {
+ return (*app_set_iter).ior.in();
+ }
+ }
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "Can't find failover replica for objectid = %s\n", object_id));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "Can't find APP_SET for objectid = %s\n", object_id));
+ }
+ return 0;
+}
+
+CORBA::Boolean ReplicationManager_i::finish_invocation(
+ const char * object_id,
+ ::CORBA::Long tsec,
+ ::CORBA::Long tusec)
+{
+ return two_phase_commit(object_id);
+ //return ami_two_phase_commit(object_id, tsec, tusec);
+}
+
+bool ReplicationManager_i::ami_two_phase_commit(const char * object_id, long
+ tsec, long tusec)
+{
+ // Acquiring two lock like this could lead to a deadlock if
+ // some other thread is acquiring the same two locks in the
+ // reverse order. One solution is to put all these locks in an
+ // array and acquire them only in the increasing order of index.
+ ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_);
+ ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
+
+ if(ami_phase(1, object_id, tsec, tusec))
+ {
+ ami_phase(2, object_id, tsec, tusec);
+ return true;
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::ami_two_phase_commit(): "
+ "phase 1 failed. Skipping phase 2.\n"));
+ return false;
+ }
+}
+
+bool ReplicationManager_i::ami_phase(int phase,
+ const char * /* object_id */,
+ long tsec, long tusec)
+{
+ SSA_AMI_Handler * ssa_ami_handler_ptr = new SSA_AMI_Handler;
+ AMI_StateSynchronizationAgentHandler_var ssa_ami_handler
+ = ssa_ami_handler_ptr->_this();
+
+ size_t ssa_invoked = 0;
+ for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin();
+ iter != objectid_appset_map_.end();
+ ++iter)
+ {
+ if ((*iter).ext_id_ == "ReplicationManager")
+ continue;
+
+ APP_SET app_set((*iter).int_id_);
+
+ for(APP_SET::iterator app_set_iter = app_set.begin();
+ app_set_iter != app_set.end();
+ ++app_set_iter)
+ {
+ if((*app_set_iter).role == phase)
+ {
+ StateSynchronizationAgent_var ssa;
+
+ // locks acquired in two_phase_commit function..
+ if(state_synchronization_agent_map_.find(
+ (*app_set_iter).process_id, ssa) == 0)
+ {
+ /*
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::ami_phase(): "
+ "Found SSA for objectid = %s, phase = %d\n",
+ (*iter).ext_id_.c_str(), phase));
+ */
+ if(phase == 1)
+ {
+ ssa->sendc_precommit_state(
+ ssa_ami_handler,
+ CORBA::string_dup((*iter).ext_id_.c_str()));
+ ++ssa_invoked;
+ }
+ if(phase == 2)
+ {
+ ssa->sendc_commit_state(
+ ssa_ami_handler,
+ CORBA::string_dup((*iter).ext_id_.c_str()));
+ ++ssa_invoked;
+ }
+ break; // no need to search further in the app_set.
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::phase(): "
+ "Can't find SSA for objectid = %s, phase = %d\n",
+ (*iter).ext_id_.c_str(), phase));
+
+ break; // Don't search further in the app_set.
+ }
+ }
+ }
+ }
+ ACE_Time_Value tv(tsec, tusec);
+ return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv);
+}
+
+bool ReplicationManager_i::two_phase_commit(const char * object_id)
{
- // 2PC
// Acquiring two lock like this could lead to a deadlock if
// some other thread is acquiring the same two locks in the
// reverse order. One solution is to put all these locks in an
@@ -1808,12 +1957,16 @@ void ReplicationManager_i::finish_invocation(const char * object_id)
ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
if(phase(1, object_id))
+ {
phase(2, object_id);
+ return true;
+ }
else
{
ACE_ERROR ((LM_ERROR,
- "ReplicationManager_i::finish_invocation(): "
+ "ReplicationManager_i::two_phase_commit(): "
"phase 1 failed. Skipping phase 2.\n"));
+ return false;
}
}
@@ -1826,7 +1979,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
bool precommit_success = true;
try {
- // locks acquired in finish_invocation.
+ // locks acquired in two_phase_commit function.
for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin();
(iter != objectid_appset_map_.end()) && precommit_success;
++iter)
@@ -1844,7 +1997,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
{
StateSynchronizationAgent_var ssa;
- // locks acquired in finish_invocation.
+ // locks acquired in two_phase_commit function..
if(state_synchronization_agent_map_.find(
(*app_set_iter).process_id, ssa) == 0)
{
@@ -1879,7 +2032,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
catch (::CORBA::Exception & ex)
{
ACE_ERROR ((LM_ERROR,
- "ReplicationManager_i::finish_invocation(): "
+ "ReplicationManager_i::phase(): "
"Exception raised while executing two-phase commit. phase = %d\n",
phase));
precommit_success = false;