summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSumant Tambe <sutambe@users.noreply.github.com>2010-08-21 23:23:33 +0000
committerSumant Tambe <sutambe@users.noreply.github.com>2010-08-21 23:23:33 +0000
commitefda5de16fa6e2e99ce9994994edebbabcb5b229 (patch)
tree2d8653b0326dd3247e9dec633a169a30b89d9c71
parent5840b75f1b2d701c6d26fef8d31e36db17ecda80 (diff)
downloadATCD-FTCIAO.tar.gz
Sat Aug 21 23:18:27 UTC 2010 Sumant Tambe <sutambe@nospam>FTCIAO
-rw-r--r--TAO/ChangeLog27
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp56
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h1
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc11
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp167
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h14
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl3
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp156
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h48
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp53
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h26
13 files changed, 544 insertions, 27 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 84791569278..fddedb712c4 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,30 @@
+Sat Aug 21 23:18:27 UTC 2010 Sumant Tambe <sutambe@nospam>
+
+ * orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h:
+ * orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp:
+ * orbsvcs/orbsvcs/LWFT/ForwardingAgent.h:
+ * orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp:
+ * orbsvcs/orbsvcs/LWFT/LWFT.mpc:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.h:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl:
+ * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h:
+ * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp:
+ * orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h:
+ * orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp:
+
+ Improved group failover support with lag-by-one approach.
+
+Fri Aug 13 06:00:50 UTC 2010 Sumant Tambe <sutambe@nospam>
+
+ * orbsvcs/orbsvcs/LWFT/LWFT.mpc:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.h:
+ * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp:
+ * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h:
+ * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp:
+
+ Two-phase commit with CORBA AMI.
+
Thu Aug 12 22:14:46 UTC 2010 Sumant Tambe <sutambe@nospam>
* orbsvcs/LWFT_Service/ReplicationManager_process.cpp:
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp
index 6ca04cf7265..a50f1ed9fa3 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp
@@ -4,6 +4,7 @@
#include "Client_Request_Interceptor.h"
#include "ace/Log_Msg.h"
+#include "ace/High_Res_Timer.h"
#include "ForwardingAgent.h"
@@ -13,7 +14,9 @@ Client_Request_Interceptor::Client_Request_Interceptor (
: orb_id_ (CORBA::string_dup (orb_id)),
orb_ (),
request_count_ (0),
- agent_ (agent)
+ agent_ (agent),
+ last_successful_(0),
+ lag_by_one_(0)
{
}
@@ -47,9 +50,32 @@ Client_Request_Interceptor::send_poll (
void
Client_Request_Interceptor::receive_reply (
- PortableInterceptor::ClientRequestInfo_ptr /* ri */)
+ PortableInterceptor::ClientRequestInfo_ptr ri )
{
/*
+ const CORBA::ULong tagID = 9654;
+ char *tag = 0;
+
+ IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID);
+ tag = reinterpret_cast <char *>(mytag->component_data.get_buffer ());
+
+ if((strcmp(tag,"Alpha") == 0) && (!lag_by_one_))
+ {
+ ++last_successful_;
+ if(this->agent_->getRM()->finish_invocation(
+ CORBA::string_dup(tag), 2, 0))
+ {
+ std::cerr << "Client_Request_Interceptor::receive_reply(): "
+ << "finish_invocation successful." << std::endl;
+ }
+ else
+ {
+ std::cerr << "Client_Request_Interceptor::receive_reply(): "
+ << "finish_invocation failed." << std::endl;
+ }
+ }
+ */
+ /*
ACE_DEBUG ((LM_INFO,
ACE_TEXT("(%P|%t) Client_Request_Interceptor::receive_reply (%s)\n"),
ri->operation ()));
@@ -75,13 +101,29 @@ Client_Request_Interceptor::receive_exception (
*/
const CORBA::ULong tagID = 9654;
char *tag = 0;
-
+
try
{
- IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID);
- tag = reinterpret_cast <char *> ( mytag->component_data.get_buffer ());
- ACE_CString new_string = CORBA::string_dup (tag);
- CORBA::Object_var forward = this->agent_->next_member (tag);
+ CORBA::Object_var forward;
+ IOP::TaggedComponent_var mytag = ri->get_effective_component(tagID);
+ tag = reinterpret_cast <char *>(mytag->component_data.get_buffer());
+
+ if(lag_by_one_)
+ {
+ ACE_High_Res_Timer timer;
+ timer.start();
+ forward =
+ this->agent_->getRM()->prepare_reinvocation(
+ CORBA::string_dup(tag),
+ last_successful_, 5, 0);
+ timer.stop();
+ ACE_Time_Value tv;
+ timer.elapsed_time(tv);
+ std::cerr << "prepare_reinvocation overhead = " << tv.msec() << std::endl;
+ }
+ else
+ forward = this->agent_->next_member (tag);
+
ACE_THROW (PortableInterceptor::ForwardRequest (forward.in ()));
}
catch (CORBA::BAD_PARAM&)
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h
index 343c7cb622f..746fbe66d94 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h
@@ -77,6 +77,10 @@ private:
CORBA::ULong request_count_;
ForwardingAgent_i *agent_;
+
+ long last_successful_;
+
+ bool lag_by_one_;
};
#if defined(_MSC_VER)
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp
index 396e28aef2e..cc3554d36dd 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp
@@ -96,3 +96,8 @@ ForwardingAgent_i::initialize (CORBA::Object_ptr rm_ior)
RankList *rank_list = this->RM_var_->register_agent (temp.in ());
update_rank_list (*rank_list);
}
+
+ReplicationManager_ptr ForwardingAgent_i::getRM()
+{
+ return this->RM_var_.in();
+}
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h
index 0f71384ead9..c63ff2c4e6c 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h
@@ -30,6 +30,7 @@ public:
virtual void update_rank_list (const RankList & rank_list);
void initialize (CORBA::Object_ptr);
+ ReplicationManager_ptr getRM();
void proactive (bool v);
typedef ACE_Hash_Map_Manager_Ex<ACE_CString,
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc b/TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc
index 2d61595ed9f..0a301045e7c 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc
+++ b/TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc
@@ -112,11 +112,12 @@ project(*Server) : iorinterceptor, orbsvcs_output {
}
}
-project(*StateSyncAgent) : portableserver, orbsvcs_output { // , lwft_dds , splicelib {
+project(*StateSyncAgent) : portableserver, orbsvcs_output, ami { // , lwft_dds , splicelib {
after += *Common
idlflags += -Wb,export_macro=SSA_Export \
- -Wb,export_include=ssa_export.h
- idlflags -= -St -Sa
+ -Wb,export_include=ssa_export.h
+
+ idlflags -= -St -Sa
dynamicflags = SSA_BUILD_DLL
libs += LWFT_Common
sharedname = LWFT_StateSyncAgent
@@ -136,12 +137,14 @@ project(*StateSyncAgent) : portableserver, orbsvcs_output { // , lwft_dds , spli
StateSynchronizationAgentC.cpp
StateSynchronizationAgentS.cpp
StateSynchronizationAgent_i.cpp
+ SSA_AMI_Handler.cpp
}
Header_Files {
DDSStateReaderListener_T.h
DDSStateUpdate_T.h
ssa_export.h
+ SSA_AMI_Handler.h
}
Inline_Files {
@@ -207,11 +210,13 @@ project(*ReplicationManagerImpl) : portableserver, orbsvcs_output, lwft_server {
Source_Files {
ReplicationManager.cpp
AppInfoC.cpp
+ SSA_AMI_Handler.cpp
}
Header_Files {
ReplicationManager.h
rm_impl_export.h
+ SSA_AMI_Handler.h
}
Inline_Files {
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
index 4357599c3d9..9cc65a98a52 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp
@@ -7,11 +7,12 @@
#include <string>
#include <set>
#include <ace/Date_Time.h>
-#include "ace/OS_NS_unistd.h"
+#include <ace/OS_NS_unistd.h>
#include <ace/High_Res_Timer.h>
#include "AppSideMonitor_Thread.h"
#include "ForwardingAgentC.h"
#include "AppInfoC.h"
+#include "SSA_AMI_Handler.h"
template <class T>
void myswap (T & t1, T & t2)
@@ -1797,9 +1798,157 @@ ReplicationManager_i::get_next (const char * /* object_id */)
return CORBA::Object::_nil ();
}
-void ReplicationManager_i::finish_invocation(const char * object_id)
+CORBA::Object_ptr ReplicationManager_i::prepare_reinvocation(
+ const char * object_id,
+ CORBA::Long last_successful,
+ CORBA::Long tsec,
+ CORBA::Long tusec)
+{
+ // Acquiring two lock like this could lead to a deadlock if
+ // some other thread is acquiring the same two locks in the
+ // reverse order. One solution is to put all these locks in an
+ // array and acquire them only in the increasing order of index.
+ ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_);
+ ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
+
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "object_id = %s.\n",object_id));
+
+ // This phase is needed only in the lag-by-one approach to prepare
+ // the lagging object by discarding potential orphan state, which
+ // has really become orphan because of a failure.
+ size_t phase = 2;
+ //ami_phase(phase, object_id, tsec, tusec);
+
+ APP_SET app_set;
+ if(objectid_appset_map_.find(object_id, app_set) == 0)
+ {
+ for(APP_SET::iterator app_set_iter = app_set.begin();
+ app_set_iter != app_set.end();
+ ++app_set_iter)
+ {
+ if((*app_set_iter).role == phase)
+ {
+ return (*app_set_iter).ior.in();
+ }
+ }
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "Can't find failover replica for objectid = %s\n", object_id));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::prepare_reinvocation(): "
+ "Can't find APP_SET for objectid = %s\n", object_id));
+ }
+ return 0;
+}
+
+CORBA::Boolean ReplicationManager_i::finish_invocation(
+ const char * object_id,
+ ::CORBA::Long tsec,
+ ::CORBA::Long tusec)
+{
+ return two_phase_commit(object_id);
+ //return ami_two_phase_commit(object_id, tsec, tusec);
+}
+
+bool ReplicationManager_i::ami_two_phase_commit(const char * object_id, long
+ tsec, long tusec)
+{
+ // Acquiring two lock like this could lead to a deadlock if
+ // some other thread is acquiring the same two locks in the
+ // reverse order. One solution is to put all these locks in an
+ // array and acquire them only in the increasing order of index.
+ ACE_Guard<ACE_Recursive_Thread_Mutex> appset_guard(appset_lock_);
+ ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
+
+ if(ami_phase(1, object_id, tsec, tusec))
+ {
+ ami_phase(2, object_id, tsec, tusec);
+ return true;
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::ami_two_phase_commit(): "
+ "phase 1 failed. Skipping phase 2.\n"));
+ return false;
+ }
+}
+
+bool ReplicationManager_i::ami_phase(int phase,
+ const char * /* object_id */,
+ long tsec, long tusec)
+{
+ SSA_AMI_Handler * ssa_ami_handler_ptr = new SSA_AMI_Handler;
+ AMI_StateSynchronizationAgentHandler_var ssa_ami_handler
+ = ssa_ami_handler_ptr->_this();
+
+ size_t ssa_invoked = 0;
+ for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin();
+ iter != objectid_appset_map_.end();
+ ++iter)
+ {
+ if ((*iter).ext_id_ == "ReplicationManager")
+ continue;
+
+ APP_SET app_set((*iter).int_id_);
+
+ for(APP_SET::iterator app_set_iter = app_set.begin();
+ app_set_iter != app_set.end();
+ ++app_set_iter)
+ {
+ if((*app_set_iter).role == phase)
+ {
+ StateSynchronizationAgent_var ssa;
+
+ // locks acquired in two_phase_commit function..
+ if(state_synchronization_agent_map_.find(
+ (*app_set_iter).process_id, ssa) == 0)
+ {
+ /*
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::ami_phase(): "
+ "Found SSA for objectid = %s, phase = %d\n",
+ (*iter).ext_id_.c_str(), phase));
+ */
+ if(phase == 1)
+ {
+ ssa->sendc_precommit_state(
+ ssa_ami_handler,
+ CORBA::string_dup((*iter).ext_id_.c_str()));
+ ++ssa_invoked;
+ }
+ if(phase == 2)
+ {
+ ssa->sendc_commit_state(
+ ssa_ami_handler,
+ CORBA::string_dup((*iter).ext_id_.c_str()));
+ ++ssa_invoked;
+ }
+ break; // no need to search further in the app_set.
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ReplicationManager_i::phase(): "
+ "Can't find SSA for objectid = %s, phase = %d\n",
+ (*iter).ext_id_.c_str(), phase));
+
+ break; // Don't search further in the app_set.
+ }
+ }
+ }
+ }
+ ACE_Time_Value tv(tsec, tusec);
+ return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv);
+}
+
+bool ReplicationManager_i::two_phase_commit(const char * object_id)
{
- // 2PC
// Acquiring two lock like this could lead to a deadlock if
// some other thread is acquiring the same two locks in the
// reverse order. One solution is to put all these locks in an
@@ -1808,12 +1957,16 @@ void ReplicationManager_i::finish_invocation(const char * object_id)
ACE_Guard<ACE_Thread_Mutex> ssa_guard(state_sync_agent_list_mutex_);
if(phase(1, object_id))
+ {
phase(2, object_id);
+ return true;
+ }
else
{
ACE_ERROR ((LM_ERROR,
- "ReplicationManager_i::finish_invocation(): "
+ "ReplicationManager_i::two_phase_commit(): "
"phase 1 failed. Skipping phase 2.\n"));
+ return false;
}
}
@@ -1826,7 +1979,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
bool precommit_success = true;
try {
- // locks acquired in finish_invocation.
+ // locks acquired in two_phase_commit function.
for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin();
(iter != objectid_appset_map_.end()) && precommit_success;
++iter)
@@ -1844,7 +1997,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
{
StateSynchronizationAgent_var ssa;
- // locks acquired in finish_invocation.
+ // locks acquired in two_phase_commit function..
if(state_synchronization_agent_map_.find(
(*app_set_iter).process_id, ssa) == 0)
{
@@ -1879,7 +2032,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */)
catch (::CORBA::Exception & ex)
{
ACE_ERROR ((LM_ERROR,
- "ReplicationManager_i::finish_invocation(): "
+ "ReplicationManager_i::phase(): "
"Exception raised while executing two-phase commit. phase = %d\n",
phase));
precommit_success = false;
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
index e63790faa76..088ea0a3355 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h
@@ -182,7 +182,16 @@ public:
virtual void set_ranklist_constraints (
const RankListConstraints & constraints);
- void finish_invocation(const char *);
+ CORBA::Boolean finish_invocation(const char *, ::CORBA::Long tsec, ::CORBA::Long tusec);
+ CORBA::Object_ptr prepare_reinvocation(
+ const char *,
+ CORBA::Long last_successful,
+ CORBA::Long tsec,
+ CORBA::Long tusec);
+
+ bool ami_two_phase_commit(const char *, long tsec, long tusec);
+
+ bool two_phase_commit(const char *);
void
load_based_selection_algo (void);
@@ -408,7 +417,8 @@ private:
void send_failure_notice (const char * host,
const ::FLARE::ApplicationList & object_ids);
- bool phase(int, const char * object_id);
+ bool phase(int phase, const char * object_id);
+ bool ami_phase(int phase, const char * object_id, long tsec, long tusec);
};
#endif /* REPLICATION_MANAGER_H */
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
index 7cdcf081460..33bce6a5e47 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
+++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl
@@ -39,5 +39,6 @@ interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier
Object get_next (in string object_id);
- void finish_invocation(in string object_id);
+ boolean finish_invocation(in string object_id, in long tsec, in long tusec);
+ Object prepare_reinvocation(in string object_id, in long last_successful, in long tsec, in long tusec);
};
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp
new file mode 100644
index 00000000000..dccdc21fb90
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp
@@ -0,0 +1,156 @@
+#include "SSA_AMI_Handler.h"
+#include <ace/Guard_T.h>
+
+
+SSA_AMI_Handler::SSA_AMI_Handler()
+ : accumulated_results_(0),
+ success_(true)
+ //phase_end_condition_(phase_wait_mutex_)
+{}
+
+bool SSA_AMI_Handler::wait_for_results(size_t expected_results, ACE_Time_Value abstime)
+{
+ if(expected_results == 0)
+ return true;
+
+ ACE_Time_Value tv(0, 100), total(0,0);
+ while(true)
+ {
+ ami_result_lock_.acquire();
+ if((accumulated_results_ >= expected_results) || (total > abstime))
+ {
+ ami_result_lock_.release();
+ break;
+ }
+ else
+ {
+ ami_result_lock_.release();
+ ACE_OS::sleep(tv);
+ total += tv;
+ }
+ }
+ if(total > abstime)
+ return false;
+ else
+ return success_;
+}
+
+void SSA_AMI_Handler::precommit_state(CORBA::Boolean b)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= (b ? 1 : 0);
+ ++accumulated_results_;
+}
+
+void SSA_AMI_Handler::precommit_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ ++accumulated_results_;
+}
+
+void SSA_AMI_Handler::commit_state()
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= true;
+ ++accumulated_results_;
+}
+
+void SSA_AMI_Handler::commit_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ ++accumulated_results_;
+}
+
+void SSA_AMI_Handler::transfer_state()
+{
+ ACE_ERROR ((LM_EMERGENCY,
+ "(%P|%t) SSA_AMI_Handler::transfer_state () "
+ "for the application %s\n"));
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= true;
+ ++accumulated_results_;
+}
+
+void SSA_AMI_Handler::transfer_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ ++accumulated_results_;
+}
+
+
+/*
+SSA_AMI_Handler::SSA_AMI_Handler()
+ : ssa_invoked_(0),
+ success_(true),
+ phase_end_condition_(phase_wait_mutex_)
+{}
+
+bool SSA_AMI_Handler::wait_for_results(const ACE_Time_Value *abstime)
+{
+ if(!ssa_invoked_)
+ return false;
+ else if(phase_end_condition_.wait(phase_wait_mutex_, abstime) == -1)
+ return false;
+ else
+ return success_;
+}
+
+void SSA_AMI_Handler::ssa_invoked()
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ ++ssa_invoked_;
+}
+
+void SSA_AMI_Handler::release()
+{
+ --ssa_invoked_;
+
+ if((ssa_invoked_ == 0) || !success_)
+ phase_end_condition_.broadcast();
+}
+
+void SSA_AMI_Handler::precommit_state(CORBA::Boolean b)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= (b ? 1 : 0);
+ release();
+}
+
+void SSA_AMI_Handler::precommit_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ release();
+}
+
+void SSA_AMI_Handler::commit_state()
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= true;
+ release();
+}
+
+void SSA_AMI_Handler::commit_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ release();
+}
+
+void SSA_AMI_Handler::transfer_state()
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ &= true;
+ release();
+}
+
+void SSA_AMI_Handler::transfer_state_excep(Messaging::ExceptionHolder*)
+{
+ ACE_Guard<ACE_Thread_Mutex> guard(ami_result_lock_);
+ success_ = false;
+ release();
+}
+ */
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h
new file mode 100644
index 00000000000..4899a3b3ab3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h
@@ -0,0 +1,48 @@
+#ifndef __SSA_AMI_HANDLER_H
+#define __SSA_AMI_HANDLER_H
+
+#include "StateSynchronizationAgentS.h"
+#include "ssa_export.h"
+#include <ace/Condition_T.h>
+#include <ace/Thread_Mutex.h>
+#include <ace/Time_Value.h>
+
+class SSA_Export SSA_AMI_Handler
+ : public ::POA_AMI_StateSynchronizationAgentHandler
+{
+ private:
+ size_t accumulated_results_;
+ bool success_;
+ ACE_Thread_Mutex ami_result_lock_;
+ //ACE_Thread_Mutex phase_wait_mutex_;
+ //ACE_Condition<ACE_Thread_Mutex> phase_end_condition_;
+
+ //void release();
+
+ public:
+ SSA_AMI_Handler();
+ bool wait_for_results(size_t expected_results, ACE_Time_Value abstime);
+ //void ssa_invoked();
+
+ virtual void state_changed() {}
+ virtual void state_changed_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void precommit_state(CORBA::Boolean);
+ virtual void precommit_state_excep(Messaging::ExceptionHolder*);
+
+ virtual void commit_state();
+ virtual void commit_state_excep(Messaging::ExceptionHolder*);
+
+ virtual void transfer_state();
+ virtual void transfer_state_excep(Messaging::ExceptionHolder*);
+
+ virtual void update_rank_list() {}
+ virtual void update_rank_list_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void register_application() {}
+ virtual void register_application_excep(Messaging::ExceptionHolder*) {}
+};
+
+#endif // __SSA_AMI_HANDLER_H
+
+
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
index 52b4345c7dc..63ecb647b3f 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
+++ b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
@@ -14,6 +14,7 @@
#include "StateSynchronizationAgent_i.h"
#include "CorbaStateUpdate.h"
+#include "SSA_AMI_Handler.h"
#ifdef FLARE_USES_DDS
# include "DDSStateUpdate_T.h"
@@ -31,7 +32,9 @@ StateSynchronizationAgent_i::StateSynchronizationAgent_i (
publisher_ (DDS::Publisher::_nil ()),
subscriber_ (DDS::Subscriber::_nil ()),
#endif /* FLARE_USES_DDS */
- use_corba_ (use_corba)
+ use_corba_ (use_corba),
+ ssa_in_replica_(StateSynchronizationAgent::_nil()),
+ ssa_invoked_(0)
{
#if defined (FLARE_USES_DDS)
if (!use_corba_)
@@ -160,8 +163,9 @@ StateSynchronizationAgent_i::state_changed (const char * object_id)
::CORBA::Boolean StateSynchronizationAgent_i::precommit_state (const char * object_id)
{
+
ACE_ERROR ((LM_ERROR,
- "SSA::precommit_state (%s) called.\n",
+ "(%P|%t) SSA::precommit_state (%s) called.\n",
object_id));
// get application reference
@@ -212,7 +216,14 @@ StateSynchronizationAgent_i::state_changed (const char * object_id)
}
ReplicatedApplication_var replica;
-
+
+ bool use_ami = true; // Enabling AMI here causes abnormal termination.
+ if(use_ami && (ssa_invoked_ != 0))
+ ACE_ERROR ((LM_EMERGENCY,
+ "(%P|%t) SSA::transfer_state (): "
+ "Transfer state has not completed. ssa_invoked = %d\n",
+ ssa_invoked_));
+
for (REPLICA_OBJECT_LIST::iterator it = replica_group.replicas.begin ();
it != replica_group.replicas.end ();
++it)
@@ -222,8 +233,18 @@ StateSynchronizationAgent_i::state_changed (const char * object_id)
CorbaStateUpdate * csu = dynamic_cast<CorbaStateUpdate *> ((*it).get());
ReplicatedApplication_var rep_app =
ReplicatedApplication::_narrow(csu->application());
- StateSynchronizationAgent_var ssa_in_replica = rep_app->agent();
- ssa_in_replica->transfer_state(CORBA::string_dup(object_id), state.in ());
+ if(ssa_in_replica_.in() == 0)
+ ssa_in_replica_ = rep_app->agent();
+ if(use_ami)
+ {
+ ssa_in_replica_->sendc_transfer_state(
+ (AMI_StateSynchronizationAgentHandler *)this,
+ CORBA::string_dup(object_id),
+ state.in());
+ ++ssa_invoked_;
+ }
+ else
+ ssa_in_replica_->transfer_state(CORBA::string_dup(object_id), state.in ());
}
catch (const CORBA::Exception& ex)
{
@@ -235,22 +256,34 @@ StateSynchronizationAgent_i::state_changed (const char * object_id)
return false;
}
}
- return true;
+ if(use_ami)
+ {
+ // If 100 seconds pass in just waiting for state transfer something
+ // is wrong.
+ //ACE_Time_Value tv(100, 0);
+ //return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv);
+ return true;
+ }
+ else
+ return true;
}
void StateSynchronizationAgent_i::transfer_state (
const char * object_id,
const ::CORBA::Any & state_value)
{
+
ACE_ERROR ((LM_EMERGENCY,
"(%P|%t) SSA::transfer_state () "
"for the application %s\n",
object_id));
+
this->precommit_state_ = state_value;
}
void StateSynchronizationAgent_i::commit_state (const char * object_id)
{
+
ACE_ERROR ((LM_EMERGENCY,
"(%P|%t) SSA::commit_state () "
"for the application %s\n",
@@ -516,3 +549,11 @@ StateSynchronizationAgent_i::get_unique_id (
return unique_id;
}
+void StateSynchronizationAgent_i::transfer_state()
+{
+ --ssa_invoked_;
+ ACE_ERROR ((LM_EMERGENCY,
+ "(%P|%t) SSA::transfer_state (): "
+ "Transafer state completed ssa_invoked = %d\n",
+ ssa_invoked_));
+}
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h
index c3bb37c381c..f80a48d593a 100644
--- a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h
+++ b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h
@@ -22,6 +22,7 @@
#include "StateSynchronizationAgentS.h"
#include "StatefulObject.h"
#include "ssa_export.h"
+#include "SSA_AMI_Handler.h"
#if defined (FLARE_USES_DDS)
# include <ccpp_dds_dcps.h>
@@ -29,7 +30,8 @@
#endif
class SSA_Export StateSynchronizationAgent_i
- : public POA_StateSynchronizationAgent
+ : public POA_StateSynchronizationAgent,
+ public AMI_StateSynchronizationAgentHandler
{
public:
StateSynchronizationAgent_i (const std::string & host_id,
@@ -57,6 +59,24 @@ class SSA_Export StateSynchronizationAgent_i
/// Registers application for statesynchronization with CORBA.
virtual void register_application (const char * object_id,
ReplicatedApplication_ptr app);
+
+ virtual void state_changed() {}
+ virtual void state_changed_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void precommit_state(CORBA::Boolean) {}
+ virtual void precommit_state_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void commit_state() {}
+ virtual void commit_state_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void transfer_state();
+ virtual void transfer_state_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void update_rank_list() {}
+ virtual void update_rank_list_excep(Messaging::ExceptionHolder*) {}
+
+ virtual void register_application() {}
+ virtual void register_application_excep(Messaging::ExceptionHolder*) {}
#ifdef FLARE_USES_DDS
/// Registers application for state synchronization with DDS
@@ -139,6 +159,10 @@ private:
/// decides whether replicas should be updated through corba or dds
bool use_corba_;
+
+ StateSynchronizationAgent_var ssa_in_replica_;
+
+ size_t ssa_invoked_;
};
#include "StateSynchronizationAgent_i_T.cpp"