From efda5de16fa6e2e99ce9994994edebbabcb5b229 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Sat, 21 Aug 2010 23:23:33 +0000 Subject: Sat Aug 21 23:18:27 UTC 2010 Sumant Tambe --- TAO/ChangeLog | 27 ++++ .../orbsvcs/LWFT/Client_Request_Interceptor.cpp | 56 ++++++- .../orbsvcs/LWFT/Client_Request_Interceptor.h | 4 + TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp | 5 + TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h | 1 + TAO/orbsvcs/orbsvcs/LWFT/LWFT.mpc | 11 +- TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp | 167 ++++++++++++++++++++- TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h | 14 +- TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl | 3 +- TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp | 156 +++++++++++++++++++ TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h | 48 ++++++ .../orbsvcs/LWFT/StateSynchronizationAgent_i.cpp | 53 ++++++- .../orbsvcs/LWFT/StateSynchronizationAgent_i.h | 26 +++- 13 files changed, 544 insertions(+), 27 deletions(-) create mode 100644 TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp create mode 100644 TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 84791569278..fddedb712c4 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,30 @@ +Sat Aug 21 23:18:27 UTC 2010 Sumant Tambe + + * orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h: + * orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp: + * orbsvcs/orbsvcs/LWFT/ForwardingAgent.h: + * orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp: + * orbsvcs/orbsvcs/LWFT/LWFT.mpc: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.h: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.idl: + * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h: + * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp: + * orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h: + * orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp: + + Improved group failover support with lag-by-one approach. + +Fri Aug 13 06:00:50 UTC 2010 Sumant Tambe + + * orbsvcs/orbsvcs/LWFT/LWFT.mpc: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.h: + * orbsvcs/orbsvcs/LWFT/ReplicationManager.cpp: + * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h: + * orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp: + + Two-phase commit with CORBA AMI. + Thu Aug 12 22:14:46 UTC 2010 Sumant Tambe * orbsvcs/LWFT_Service/ReplicationManager_process.cpp: diff --git a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp index 6ca04cf7265..a50f1ed9fa3 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.cpp @@ -4,6 +4,7 @@ #include "Client_Request_Interceptor.h" #include "ace/Log_Msg.h" +#include "ace/High_Res_Timer.h" #include "ForwardingAgent.h" @@ -13,7 +14,9 @@ Client_Request_Interceptor::Client_Request_Interceptor ( : orb_id_ (CORBA::string_dup (orb_id)), orb_ (), request_count_ (0), - agent_ (agent) + agent_ (agent), + last_successful_(0), + lag_by_one_(0) { } @@ -47,8 +50,31 @@ Client_Request_Interceptor::send_poll ( void Client_Request_Interceptor::receive_reply ( - PortableInterceptor::ClientRequestInfo_ptr /* ri */) + PortableInterceptor::ClientRequestInfo_ptr ri ) { + /* + const CORBA::ULong tagID = 9654; + char *tag = 0; + + IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID); + tag = reinterpret_cast (mytag->component_data.get_buffer ()); + + if((strcmp(tag,"Alpha") == 0) && (!lag_by_one_)) + { + ++last_successful_; + if(this->agent_->getRM()->finish_invocation( + CORBA::string_dup(tag), 2, 0)) + { + std::cerr << "Client_Request_Interceptor::receive_reply(): " + << "finish_invocation successful." << std::endl; + } + else + { + std::cerr << "Client_Request_Interceptor::receive_reply(): " + << "finish_invocation failed." << std::endl; + } + } + */ /* ACE_DEBUG ((LM_INFO, ACE_TEXT("(%P|%t) Client_Request_Interceptor::receive_reply (%s)\n"), @@ -75,13 +101,29 @@ Client_Request_Interceptor::receive_exception ( */ const CORBA::ULong tagID = 9654; char *tag = 0; - + try { - IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID); - tag = reinterpret_cast ( mytag->component_data.get_buffer ()); - ACE_CString new_string = CORBA::string_dup (tag); - CORBA::Object_var forward = this->agent_->next_member (tag); + CORBA::Object_var forward; + IOP::TaggedComponent_var mytag = ri->get_effective_component(tagID); + tag = reinterpret_cast (mytag->component_data.get_buffer()); + + if(lag_by_one_) + { + ACE_High_Res_Timer timer; + timer.start(); + forward = + this->agent_->getRM()->prepare_reinvocation( + CORBA::string_dup(tag), + last_successful_, 5, 0); + timer.stop(); + ACE_Time_Value tv; + timer.elapsed_time(tv); + std::cerr << "prepare_reinvocation overhead = " << tv.msec() << std::endl; + } + else + forward = this->agent_->next_member (tag); + ACE_THROW (PortableInterceptor::ForwardRequest (forward.in ())); } catch (CORBA::BAD_PARAM&) diff --git a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h index 343c7cb622f..746fbe66d94 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/Client_Request_Interceptor.h @@ -77,6 +77,10 @@ private: CORBA::ULong request_count_; ForwardingAgent_i *agent_; + + long last_successful_; + + bool lag_by_one_; }; #if defined(_MSC_VER) diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp index 396e28aef2e..cc3554d36dd 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.cpp @@ -96,3 +96,8 @@ ForwardingAgent_i::initialize (CORBA::Object_ptr rm_ior) RankList *rank_list = this->RM_var_->register_agent (temp.in ()); update_rank_list (*rank_list); } + +ReplicationManager_ptr ForwardingAgent_i::getRM() +{ + return this->RM_var_.in(); +} diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h index 0f71384ead9..c63ff2c4e6c 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/ForwardingAgent.h @@ -30,6 +30,7 @@ public: virtual void update_rank_list (const RankList & rank_list); void initialize (CORBA::Object_ptr); + ReplicationManager_ptr getRM(); void proactive (bool v); typedef ACE_Hash_Map_Manager_Ex #include #include -#include "ace/OS_NS_unistd.h" +#include #include #include "AppSideMonitor_Thread.h" #include "ForwardingAgentC.h" #include "AppInfoC.h" +#include "SSA_AMI_Handler.h" template void myswap (T & t1, T & t2) @@ -1797,9 +1798,157 @@ ReplicationManager_i::get_next (const char * /* object_id */) return CORBA::Object::_nil (); } -void ReplicationManager_i::finish_invocation(const char * object_id) +CORBA::Object_ptr ReplicationManager_i::prepare_reinvocation( + const char * object_id, + CORBA::Long last_successful, + CORBA::Long tsec, + CORBA::Long tusec) +{ + // Acquiring two lock like this could lead to a deadlock if + // some other thread is acquiring the same two locks in the + // reverse order. One solution is to put all these locks in an + // array and acquire them only in the increasing order of index. + ACE_Guard appset_guard(appset_lock_); + ACE_Guard ssa_guard(state_sync_agent_list_mutex_); + + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "object_id = %s.\n",object_id)); + + // This phase is needed only in the lag-by-one approach to prepare + // the lagging object by discarding potential orphan state, which + // has really become orphan because of a failure. + size_t phase = 2; + //ami_phase(phase, object_id, tsec, tusec); + + APP_SET app_set; + if(objectid_appset_map_.find(object_id, app_set) == 0) + { + for(APP_SET::iterator app_set_iter = app_set.begin(); + app_set_iter != app_set.end(); + ++app_set_iter) + { + if((*app_set_iter).role == phase) + { + return (*app_set_iter).ior.in(); + } + } + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "Can't find failover replica for objectid = %s\n", object_id)); + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::prepare_reinvocation(): " + "Can't find APP_SET for objectid = %s\n", object_id)); + } + return 0; +} + +CORBA::Boolean ReplicationManager_i::finish_invocation( + const char * object_id, + ::CORBA::Long tsec, + ::CORBA::Long tusec) +{ + return two_phase_commit(object_id); + //return ami_two_phase_commit(object_id, tsec, tusec); +} + +bool ReplicationManager_i::ami_two_phase_commit(const char * object_id, long + tsec, long tusec) +{ + // Acquiring two lock like this could lead to a deadlock if + // some other thread is acquiring the same two locks in the + // reverse order. One solution is to put all these locks in an + // array and acquire them only in the increasing order of index. + ACE_Guard appset_guard(appset_lock_); + ACE_Guard ssa_guard(state_sync_agent_list_mutex_); + + if(ami_phase(1, object_id, tsec, tusec)) + { + ami_phase(2, object_id, tsec, tusec); + return true; + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::ami_two_phase_commit(): " + "phase 1 failed. Skipping phase 2.\n")); + return false; + } +} + +bool ReplicationManager_i::ami_phase(int phase, + const char * /* object_id */, + long tsec, long tusec) +{ + SSA_AMI_Handler * ssa_ami_handler_ptr = new SSA_AMI_Handler; + AMI_StateSynchronizationAgentHandler_var ssa_ami_handler + = ssa_ami_handler_ptr->_this(); + + size_t ssa_invoked = 0; + for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin(); + iter != objectid_appset_map_.end(); + ++iter) + { + if ((*iter).ext_id_ == "ReplicationManager") + continue; + + APP_SET app_set((*iter).int_id_); + + for(APP_SET::iterator app_set_iter = app_set.begin(); + app_set_iter != app_set.end(); + ++app_set_iter) + { + if((*app_set_iter).role == phase) + { + StateSynchronizationAgent_var ssa; + + // locks acquired in two_phase_commit function.. + if(state_synchronization_agent_map_.find( + (*app_set_iter).process_id, ssa) == 0) + { + /* + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::ami_phase(): " + "Found SSA for objectid = %s, phase = %d\n", + (*iter).ext_id_.c_str(), phase)); + */ + if(phase == 1) + { + ssa->sendc_precommit_state( + ssa_ami_handler, + CORBA::string_dup((*iter).ext_id_.c_str())); + ++ssa_invoked; + } + if(phase == 2) + { + ssa->sendc_commit_state( + ssa_ami_handler, + CORBA::string_dup((*iter).ext_id_.c_str())); + ++ssa_invoked; + } + break; // no need to search further in the app_set. + } + else + { + ACE_ERROR ((LM_ERROR, + "ReplicationManager_i::phase(): " + "Can't find SSA for objectid = %s, phase = %d\n", + (*iter).ext_id_.c_str(), phase)); + + break; // Don't search further in the app_set. + } + } + } + } + ACE_Time_Value tv(tsec, tusec); + return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv); +} + +bool ReplicationManager_i::two_phase_commit(const char * object_id) { - // 2PC // Acquiring two lock like this could lead to a deadlock if // some other thread is acquiring the same two locks in the // reverse order. One solution is to put all these locks in an @@ -1808,12 +1957,16 @@ void ReplicationManager_i::finish_invocation(const char * object_id) ACE_Guard ssa_guard(state_sync_agent_list_mutex_); if(phase(1, object_id)) + { phase(2, object_id); + return true; + } else { ACE_ERROR ((LM_ERROR, - "ReplicationManager_i::finish_invocation(): " + "ReplicationManager_i::two_phase_commit(): " "phase 1 failed. Skipping phase 2.\n")); + return false; } } @@ -1826,7 +1979,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) bool precommit_success = true; try { - // locks acquired in finish_invocation. + // locks acquired in two_phase_commit function. for(OBJECTID_APPSET_MAP::iterator iter = objectid_appset_map_.begin(); (iter != objectid_appset_map_.end()) && precommit_success; ++iter) @@ -1844,7 +1997,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) { StateSynchronizationAgent_var ssa; - // locks acquired in finish_invocation. + // locks acquired in two_phase_commit function.. if(state_synchronization_agent_map_.find( (*app_set_iter).process_id, ssa) == 0) { @@ -1879,7 +2032,7 @@ bool ReplicationManager_i::phase(int phase, const char * /* object_id */) catch (::CORBA::Exception & ex) { ACE_ERROR ((LM_ERROR, - "ReplicationManager_i::finish_invocation(): " + "ReplicationManager_i::phase(): " "Exception raised while executing two-phase commit. phase = %d\n", phase)); precommit_success = false; diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h index e63790faa76..088ea0a3355 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.h @@ -182,7 +182,16 @@ public: virtual void set_ranklist_constraints ( const RankListConstraints & constraints); - void finish_invocation(const char *); + CORBA::Boolean finish_invocation(const char *, ::CORBA::Long tsec, ::CORBA::Long tusec); + CORBA::Object_ptr prepare_reinvocation( + const char *, + CORBA::Long last_successful, + CORBA::Long tsec, + CORBA::Long tusec); + + bool ami_two_phase_commit(const char *, long tsec, long tusec); + + bool two_phase_commit(const char *); void load_based_selection_algo (void); @@ -408,7 +417,8 @@ private: void send_failure_notice (const char * host, const ::FLARE::ApplicationList & object_ids); - bool phase(int, const char * object_id); + bool phase(int phase, const char * object_id); + bool ami_phase(int phase, const char * object_id, long tsec, long tusec); }; #endif /* REPLICATION_MANAGER_H */ diff --git a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl index 7cdcf081460..33bce6a5e47 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl +++ b/TAO/orbsvcs/orbsvcs/LWFT/ReplicationManager.idl @@ -39,5 +39,6 @@ interface ReplicationManager : ReplicatedApplication, FLARE::FaultNotifier Object get_next (in string object_id); - void finish_invocation(in string object_id); + boolean finish_invocation(in string object_id, in long tsec, in long tusec); + Object prepare_reinvocation(in string object_id, in long last_successful, in long tsec, in long tusec); }; diff --git a/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp new file mode 100644 index 00000000000..dccdc21fb90 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.cpp @@ -0,0 +1,156 @@ +#include "SSA_AMI_Handler.h" +#include + + +SSA_AMI_Handler::SSA_AMI_Handler() + : accumulated_results_(0), + success_(true) + //phase_end_condition_(phase_wait_mutex_) +{} + +bool SSA_AMI_Handler::wait_for_results(size_t expected_results, ACE_Time_Value abstime) +{ + if(expected_results == 0) + return true; + + ACE_Time_Value tv(0, 100), total(0,0); + while(true) + { + ami_result_lock_.acquire(); + if((accumulated_results_ >= expected_results) || (total > abstime)) + { + ami_result_lock_.release(); + break; + } + else + { + ami_result_lock_.release(); + ACE_OS::sleep(tv); + total += tv; + } + } + if(total > abstime) + return false; + else + return success_; +} + +void SSA_AMI_Handler::precommit_state(CORBA::Boolean b) +{ + ACE_Guard guard(ami_result_lock_); + success_ &= (b ? 1 : 0); + ++accumulated_results_; +} + +void SSA_AMI_Handler::precommit_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + ++accumulated_results_; +} + +void SSA_AMI_Handler::commit_state() +{ + ACE_Guard guard(ami_result_lock_); + success_ &= true; + ++accumulated_results_; +} + +void SSA_AMI_Handler::commit_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + ++accumulated_results_; +} + +void SSA_AMI_Handler::transfer_state() +{ + ACE_ERROR ((LM_EMERGENCY, + "(%P|%t) SSA_AMI_Handler::transfer_state () " + "for the application %s\n")); + ACE_Guard guard(ami_result_lock_); + success_ &= true; + ++accumulated_results_; +} + +void SSA_AMI_Handler::transfer_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + ++accumulated_results_; +} + + +/* +SSA_AMI_Handler::SSA_AMI_Handler() + : ssa_invoked_(0), + success_(true), + phase_end_condition_(phase_wait_mutex_) +{} + +bool SSA_AMI_Handler::wait_for_results(const ACE_Time_Value *abstime) +{ + if(!ssa_invoked_) + return false; + else if(phase_end_condition_.wait(phase_wait_mutex_, abstime) == -1) + return false; + else + return success_; +} + +void SSA_AMI_Handler::ssa_invoked() +{ + ACE_Guard guard(ami_result_lock_); + ++ssa_invoked_; +} + +void SSA_AMI_Handler::release() +{ + --ssa_invoked_; + + if((ssa_invoked_ == 0) || !success_) + phase_end_condition_.broadcast(); +} + +void SSA_AMI_Handler::precommit_state(CORBA::Boolean b) +{ + ACE_Guard guard(ami_result_lock_); + success_ &= (b ? 1 : 0); + release(); +} + +void SSA_AMI_Handler::precommit_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + release(); +} + +void SSA_AMI_Handler::commit_state() +{ + ACE_Guard guard(ami_result_lock_); + success_ &= true; + release(); +} + +void SSA_AMI_Handler::commit_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + release(); +} + +void SSA_AMI_Handler::transfer_state() +{ + ACE_Guard guard(ami_result_lock_); + success_ &= true; + release(); +} + +void SSA_AMI_Handler::transfer_state_excep(Messaging::ExceptionHolder*) +{ + ACE_Guard guard(ami_result_lock_); + success_ = false; + release(); +} + */ diff --git a/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h new file mode 100644 index 00000000000..4899a3b3ab3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/LWFT/SSA_AMI_Handler.h @@ -0,0 +1,48 @@ +#ifndef __SSA_AMI_HANDLER_H +#define __SSA_AMI_HANDLER_H + +#include "StateSynchronizationAgentS.h" +#include "ssa_export.h" +#include +#include +#include + +class SSA_Export SSA_AMI_Handler + : public ::POA_AMI_StateSynchronizationAgentHandler +{ + private: + size_t accumulated_results_; + bool success_; + ACE_Thread_Mutex ami_result_lock_; + //ACE_Thread_Mutex phase_wait_mutex_; + //ACE_Condition phase_end_condition_; + + //void release(); + + public: + SSA_AMI_Handler(); + bool wait_for_results(size_t expected_results, ACE_Time_Value abstime); + //void ssa_invoked(); + + virtual void state_changed() {} + virtual void state_changed_excep(Messaging::ExceptionHolder*) {} + + virtual void precommit_state(CORBA::Boolean); + virtual void precommit_state_excep(Messaging::ExceptionHolder*); + + virtual void commit_state(); + virtual void commit_state_excep(Messaging::ExceptionHolder*); + + virtual void transfer_state(); + virtual void transfer_state_excep(Messaging::ExceptionHolder*); + + virtual void update_rank_list() {} + virtual void update_rank_list_excep(Messaging::ExceptionHolder*) {} + + virtual void register_application() {} + virtual void register_application_excep(Messaging::ExceptionHolder*) {} +}; + +#endif // __SSA_AMI_HANDLER_H + + diff --git a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp index 52b4345c7dc..63ecb647b3f 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp +++ b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp @@ -14,6 +14,7 @@ #include "StateSynchronizationAgent_i.h" #include "CorbaStateUpdate.h" +#include "SSA_AMI_Handler.h" #ifdef FLARE_USES_DDS # include "DDSStateUpdate_T.h" @@ -31,7 +32,9 @@ StateSynchronizationAgent_i::StateSynchronizationAgent_i ( publisher_ (DDS::Publisher::_nil ()), subscriber_ (DDS::Subscriber::_nil ()), #endif /* FLARE_USES_DDS */ - use_corba_ (use_corba) + use_corba_ (use_corba), + ssa_in_replica_(StateSynchronizationAgent::_nil()), + ssa_invoked_(0) { #if defined (FLARE_USES_DDS) if (!use_corba_) @@ -160,8 +163,9 @@ StateSynchronizationAgent_i::state_changed (const char * object_id) ::CORBA::Boolean StateSynchronizationAgent_i::precommit_state (const char * object_id) { + ACE_ERROR ((LM_ERROR, - "SSA::precommit_state (%s) called.\n", + "(%P|%t) SSA::precommit_state (%s) called.\n", object_id)); // get application reference @@ -212,7 +216,14 @@ StateSynchronizationAgent_i::state_changed (const char * object_id) } ReplicatedApplication_var replica; - + + bool use_ami = true; // Enabling AMI here causes abnormal termination. + if(use_ami && (ssa_invoked_ != 0)) + ACE_ERROR ((LM_EMERGENCY, + "(%P|%t) SSA::transfer_state (): " + "Transfer state has not completed. ssa_invoked = %d\n", + ssa_invoked_)); + for (REPLICA_OBJECT_LIST::iterator it = replica_group.replicas.begin (); it != replica_group.replicas.end (); ++it) @@ -222,8 +233,18 @@ StateSynchronizationAgent_i::state_changed (const char * object_id) CorbaStateUpdate * csu = dynamic_cast ((*it).get()); ReplicatedApplication_var rep_app = ReplicatedApplication::_narrow(csu->application()); - StateSynchronizationAgent_var ssa_in_replica = rep_app->agent(); - ssa_in_replica->transfer_state(CORBA::string_dup(object_id), state.in ()); + if(ssa_in_replica_.in() == 0) + ssa_in_replica_ = rep_app->agent(); + if(use_ami) + { + ssa_in_replica_->sendc_transfer_state( + (AMI_StateSynchronizationAgentHandler *)this, + CORBA::string_dup(object_id), + state.in()); + ++ssa_invoked_; + } + else + ssa_in_replica_->transfer_state(CORBA::string_dup(object_id), state.in ()); } catch (const CORBA::Exception& ex) { @@ -235,22 +256,34 @@ StateSynchronizationAgent_i::state_changed (const char * object_id) return false; } } - return true; + if(use_ami) + { + // If 100 seconds pass in just waiting for state transfer something + // is wrong. + //ACE_Time_Value tv(100, 0); + //return ssa_ami_handler_ptr->wait_for_results(ssa_invoked, tv); + return true; + } + else + return true; } void StateSynchronizationAgent_i::transfer_state ( const char * object_id, const ::CORBA::Any & state_value) { + ACE_ERROR ((LM_EMERGENCY, "(%P|%t) SSA::transfer_state () " "for the application %s\n", object_id)); + this->precommit_state_ = state_value; } void StateSynchronizationAgent_i::commit_state (const char * object_id) { + ACE_ERROR ((LM_EMERGENCY, "(%P|%t) SSA::commit_state () " "for the application %s\n", @@ -516,3 +549,11 @@ StateSynchronizationAgent_i::get_unique_id ( return unique_id; } +void StateSynchronizationAgent_i::transfer_state() +{ + --ssa_invoked_; + ACE_ERROR ((LM_EMERGENCY, + "(%P|%t) SSA::transfer_state (): " + "Transafer state completed ssa_invoked = %d\n", + ssa_invoked_)); +} diff --git a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h index c3bb37c381c..f80a48d593a 100644 --- a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h +++ b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.h @@ -22,6 +22,7 @@ #include "StateSynchronizationAgentS.h" #include "StatefulObject.h" #include "ssa_export.h" +#include "SSA_AMI_Handler.h" #if defined (FLARE_USES_DDS) # include @@ -29,7 +30,8 @@ #endif class SSA_Export StateSynchronizationAgent_i - : public POA_StateSynchronizationAgent + : public POA_StateSynchronizationAgent, + public AMI_StateSynchronizationAgentHandler { public: StateSynchronizationAgent_i (const std::string & host_id, @@ -57,6 +59,24 @@ class SSA_Export StateSynchronizationAgent_i /// Registers application for statesynchronization with CORBA. virtual void register_application (const char * object_id, ReplicatedApplication_ptr app); + + virtual void state_changed() {} + virtual void state_changed_excep(Messaging::ExceptionHolder*) {} + + virtual void precommit_state(CORBA::Boolean) {} + virtual void precommit_state_excep(Messaging::ExceptionHolder*) {} + + virtual void commit_state() {} + virtual void commit_state_excep(Messaging::ExceptionHolder*) {} + + virtual void transfer_state(); + virtual void transfer_state_excep(Messaging::ExceptionHolder*) {} + + virtual void update_rank_list() {} + virtual void update_rank_list_excep(Messaging::ExceptionHolder*) {} + + virtual void register_application() {} + virtual void register_application_excep(Messaging::ExceptionHolder*) {} #ifdef FLARE_USES_DDS /// Registers application for state synchronization with DDS @@ -139,6 +159,10 @@ private: /// decides whether replicas should be updated through corba or dds bool use_corba_; + + StateSynchronizationAgent_var ssa_in_replica_; + + size_t ssa_invoked_; }; #include "StateSynchronizationAgent_i_T.cpp" -- cgit v1.2.1