diff options
author | huangming <huangminghuang@users.noreply.github.com> | 2004-02-15 20:17:45 +0000 |
---|---|---|
committer | huangming <huangminghuang@users.noreply.github.com> | 2004-02-15 20:17:45 +0000 |
commit | e5773a04ac924cb8113a1e26024e70806ef22102 (patch) | |
tree | 56d4123842f31b406920a88dab70d1d9adbae30b | |
parent | 40f0b3c06f66969453c810fdcabe4f1471098014 (diff) | |
download | ATCD-e5773a04ac924cb8113a1e26024e70806ef22102.tar.gz |
ChangeLogEntry: Sun Feb 15 14:06:33 2004 Huang-Ming Huang <hh1@cse.wustl.edu>
17 files changed, 551 insertions, 128 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 6651921a0fd..3fe9d65acc4 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,25 @@ +Sun Feb 15 14:06:33 2004 Huang-Ming Huang <hh1@cse.wustl.edu> + + * orbsvcs/orbsvcs/FtRtEvent.mpc + * orbsvcs/orbsvcs/EventChannel/ObjectGroupManagerHandler.cpp: + * orbsvcs/orbsvcs/EventChannel/ObjectGroupManagerHandler.h: + * orbsvcs/orbsvcs/EventChannel/AMI_Primary_Replication_Strategy.cpp: + * orbsvcs/orbsvcs/EventChannel/AMI_Primary_Replication_Strategy.h: + * orbsvcs/orbsvcs/EventChannel/AMI_Replication_Strategy.cpp: + * orbsvcs/orbsvcs/EventChannel/AMI_Replication_Strategy.h: + * orbsvcs/orbsvcs/EventChannel/Basic_Replication_Strategy.cpp: + * orbsvcs/orbsvcs/EventChannel/Basic_Replication_Strategy.h: + * orbsvcs/orbsvcs/EventChannel/FTEC_Group_Manager.cpp: + * orbsvcs/orbsvcs/EventChannel/FTEC_Group_Manager.h: + * orbsvcs/orbsvcs/EventChannel/GroupInfoPublisher.cpp: + * orbsvcs/orbsvcs/EventChannel/GroupInfoPublisher.h: + * orbsvcs/orbsvcs/EventChannel/Replication_Service.cpp: + * orbsvcs/orbsvcs/EventChannel/Replication_Service.h: + * orbsvcs/orbsvcs/EventChannel/Replication_Strategy.h: + Changed for using AMI calls when adding new replicas to a + object group under AMI replication strategy. + + Sun Feb 15 08:09:17 2004 Balachandran Natarajan <bala@dre.vanderbilt.edu> * orbsvcs/examples/CosEC/Factory/CosEventChannelFactory.idl: diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc b/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc index ae09f35cd4e..57ba4a816e7 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc @@ -11,11 +11,11 @@ project (FtRtEvent) : orbsvcslib, core, ftorbutils, rtevent, naming { idlflags += -GC FTRT.idl FtRtecEventComm.idl + FTRT_GroupManager.idl } IDL_Files { FtRtecEventChannelAdmin.idl - FTRT_GroupManager.idl } Source_Files (ORBSVCS_COMPONENTS) { diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp index 2d14b6132ac..deaa02702e9 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp @@ -3,19 +3,22 @@ #include "AMI_Primary_Replication_Strategy.h" #include "ace/Synch_T.h" #include "GroupInfoPublisher.h" -#include "../Utils/ScopeGuard.h" #include "Request_Context_Repository.h" -#include "../Utils/resolve_init.h" #include "create_persistent_poa.h" #include "Update_Manager.h" #include "tao/Utils/PolicyList_Destroyer.h" +#include "ObjectGroupManagerHandler.h" +#include "tao/Utils/Implicit_Deactivator.h" +#include "../Utils/resolve_init.h" +#include "../Utils/ScopeGuard.h" +#include "../Utils/Log.h" ACE_RCSID (EventChannel, AMI_Primary_Replication_Strategy, "$Id$") -AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy() -: handler_(this) +AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy(bool mt) +: handler_(this), mutex_(mt ? new ACE_RW_Thread_Mutex : 0) { } @@ -27,20 +30,24 @@ AMI_Primary_Replication_Strategy::~AMI_Primary_Replication_Strategy() int AMI_Primary_Replication_Strategy::acquire_read (void) { - return mutex_.acquire_read(); + return mutex_ ? mutex_->acquire_read() : 0; } int AMI_Primary_Replication_Strategy::acquire_write (void) { - return mutex_.acquire_write(); + return mutex_ ? mutex_->acquire_write() : 0; } int AMI_Primary_Replication_Strategy::release (void) { - return mutex_.release(); + return mutex_ ? mutex_->release() : 0; } +int AMI_Primary_Replication_Strategy::init() +{ + return this->activate(); +} int AMI_Primary_Replication_Strategy::svc() { @@ -51,12 +58,12 @@ int AMI_Primary_Replication_Strategy::svc() ACE_TRY_CHECK; PortableServer::POA_var - root_poa = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA" + root_poa_ = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA" ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; // create POAManager - mgr_ = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + mgr_ = root_poa_->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; @@ -64,7 +71,7 @@ int AMI_Primary_Replication_Strategy::svc() ACE_TRY_CHECK; PortableServer::IdUniquenessPolicy_var id_uniqueness_policy = - root_poa->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID + root_poa_->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK @@ -73,7 +80,7 @@ int AMI_Primary_Replication_Strategy::svc() policy_list[0] = PortableServer::IdUniquenessPolicy::_duplicate( id_uniqueness_policy.in() ); - poa_ = create_persistent_poa(root_poa, mgr_, "AMI_Update", policy_list + poa_ = create_persistent_poa(root_poa_, mgr_, "AMI_Update", policy_list ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; @@ -116,8 +123,10 @@ AMI_Primary_Replication_Strategy::replicate_request( size_t num_backups = backups.length(); - if ((size_t)transaction_depth > num_backups) + if ((size_t)transaction_depth > num_backups) { + TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n"); ACE_THROW(FTRT::TransactionDepthTooHigh()); + } ACE_NEW_THROW_EX(manager, Update_Manager(event, backups.length(), transaction_depth-1, success), @@ -162,8 +171,55 @@ AMI_Primary_Replication_Strategy::replicate_request( } ACE_ENDTRY; } - + TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n"); ACE_THROW(FTRT::TransactionDepthTooHigh()); } } + +void +AMI_Primary_Replication_Strategy::add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL) +{ + ACE_Auto_Event event; + const FtRtecEventChannelAdmin::EventChannelList& backups = + GroupInfoPublisher::instance()->backups(); + + size_t num_backups = backups.length(); + ObjectGroupManagerHandler add_member_handler(event, num_backups+1); + // The extra one is to prevent the event been signaled prematurely. + + PortableServer::ObjectId_var oid = + root_poa_->activate_object(&add_member_handler ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + TAO::Utils::Implicit_Deactivator deactivator(&add_member_handler); + + CORBA::Object_var obj = + root_poa_->id_to_reference(oid.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + FTRT::AMI_ObjectGroupManagerHandler_var handler = + FTRT::AMI_ObjectGroupManagerHandler::_narrow(obj.in() ACE_ENV_ARG_PARAMETER); + + for (unsigned i = 0; i < num_backups; ++i) { + ACE_TRY { + backups[i].in()->sendc_add_member(handler.in(), + info, + object_group_ref_version + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL { + add_member_handler.add_member_excep(NULL); + } + ACE_ENDTRY; + } + // decrement the number of members so the event can be signaled once + // all replys have been received. + add_member_handler.add_member_excep(NULL); + + event.wait(); +} + diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h index 3a019954100..aafee54d748 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h @@ -22,16 +22,29 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +/** + * @class AMI_Primary_Replication_Strategy. + * + * @brief Used by primary replicas when AMI calls are used for replicating requests. + */ + class AMI_Primary_Replication_Strategy : public Replication_Strategy , public ACE_Task_Base { public: - AMI_Primary_Replication_Strategy(); + /** + * @param mt Specifies whether multithreaded ORB is used. + */ + AMI_Primary_Replication_Strategy(bool mt); virtual ~AMI_Primary_Replication_Strategy(); + virtual int init(); virtual void replicate_request(const FTRT::State& state, RollbackOperation rollback, const FtRtecEventChannelAdmin::ObjectId& oid ACE_ENV_ARG_DECL); + virtual void add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL); virtual int acquire_read (void); virtual int acquire_write (void); virtual int release (void); @@ -40,11 +53,12 @@ public: private: virtual int svc (void); CORBA::ORB_var orb_; + PortableServer::POA_var root_poa_; PortableServer::POA_var poa_; PortableServer::POAManager_var mgr_; bool running_; - ACE_RW_Thread_Mutex mutex_; UpdateableHandler handler_; + ACE_RW_Thread_Mutex* mutex_; }; #endif // AMI_PRIMARY_REPLICATION_STRATEGY_H diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp index be41268c19e..b343c0e9af7 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp @@ -7,7 +7,12 @@ ACE_RCSID (EventChannel, AMI_Replication_Strategy, "$Id$") -AMI_Replication_Strategy::AMI_Replication_Strategy() +AMI_Replication_Strategy::AMI_Replication_Strategy(bool mt) +: mt_(mt) +{ +} + +AMI_Replication_Strategy::~AMI_Replication_Strategy() { } @@ -23,12 +28,20 @@ AMI_Replication_Strategy::replicate_request( ACE_UNUSED_ARG(oid); } +void +AMI_Replication_Strategy::add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL_NOT_USED) +{ + ACE_UNUSED_ARG(info); + ACE_UNUSED_ARG(object_group_ref_version); +} Replication_Strategy* AMI_Replication_Strategy::make_primary_strategy() { AMI_Primary_Replication_Strategy* result; - ACE_NEW_RETURN(result, AMI_Primary_Replication_Strategy, 0); + ACE_NEW_RETURN(result, AMI_Primary_Replication_Strategy(mt_), 0); auto_ptr<AMI_Primary_Replication_Strategy> holder(result); if (result->activate() == 0) return holder.release(); diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h index d7ef25c58eb..055b3ea22cc 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h @@ -22,21 +22,34 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +/** + * @class AMI_Replication_Strategy. + * + * @brief Used by backup replicas when AMI calls are used for replicating requests. + */ + class AMI_Replication_Strategy: public Replication_Strategy { public: - AMI_Replication_Strategy(); - virtual void replicate_request( - const FTRT::State& state, + /** + * @param mt Specifies whether multithreaded ORB is used. + */ + AMI_Replication_Strategy(bool mt); + ~AMI_Replication_Strategy(); + virtual void replicate_request(const FTRT::State& state, RollbackOperation rollback, const FtRtecEventChannelAdmin::ObjectId& oid ACE_ENV_ARG_DECL); + virtual void add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL); virtual Replication_Strategy* make_primary_strategy(); virtual int acquire_read (void); virtual int acquire_write (void); virtual int release (void); - +private: + bool mt_; }; diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp index 826d60f2b7d..8e2d300a0ab 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp @@ -4,23 +4,33 @@ #include "GroupInfoPublisher.h" #include "FTEC_Event_Channel.h" #include "Request_Context_Repository.h" +#include "../Utils/Log.h" ACE_RCSID (EventChannel, Basic_Replication_Strategy, "$Id$") -Basic_Replication_Strategy::Basic_Replication_Strategy() +/// The mutex has to be recursive; otherwise, if the second replicate_request() is +/// called while the first replicate_request() is waiting for reply, we will get +/// a deadlock. +Basic_Replication_Strategy::Basic_Replication_Strategy(bool mt) : sequence_num_(0) +, mutex_(mt ? new ACE_Recursive_Thread_Mutex : 0) { } +Basic_Replication_Strategy::~Basic_Replication_Strategy() +{ + delete mutex_; +} + void Basic_Replication_Strategy::check_validity(ACE_ENV_SINGLE_ARG_DECL) { FTRT::SequenceNumber seq_no = Request_Context_Repository().get_sequence_number(ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; - ACE_DEBUG((LM_DEBUG, "check_validity : sequence no = %d\n", sequence_num_)); + TAO_FTRTEC::Log(1 , "check_validity : sequence no = %d\n", sequence_num_); if (this->sequence_num_ == 0) { // this is the first set_update received from the primary @@ -33,6 +43,7 @@ Basic_Replication_Strategy::check_validity(ACE_ENV_SINGLE_ARG_DECL) // client_interceptor_->sequence_num_--; FTRT::OutOfSequence exception; exception.current = this->sequence_num_; + TAO_FTRTEC::Log(3, "Throwing FTRT::OutOfSequence (old sequence_num_ = %d)\n", this->sequence_num_); ACE_THROW(FTRT::OutOfSequence(exception)); } else @@ -60,7 +71,7 @@ Basic_Replication_Strategy::replicate_request( if (info_publisher->is_primary()) this->sequence_num_++; - ACE_DEBUG((LM_DEBUG, "replicate_request : sequence no = %d\n", sequence_num_)); + TAO_FTRTEC::Log(1, "replicate_request : sequence no = %d\n", sequence_num_); Request_Context_Repository().set_sequence_number(sequence_num_ ACE_ENV_ARG_PARAMETER); ACE_CHECK; @@ -69,37 +80,69 @@ Basic_Replication_Strategy::replicate_request( ACE_CHECK; if (transaction_depth > 1) { - successor->set_update(state - ACE_ENV_ARG_PARAMETER); + bool finished = true; + do { + ACE_TRY_EX(SET_UPDATE) { + successor->set_update(state ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(SET_UPDATE); + } + ACE_CATCH(CORBA::COMM_FAILURE, ex) { + if (ex.minor() == 6) finished = false; + else ACE_RE_THROW; + } + ACE_ENDTRY; + ACE_CHECK; + } while(!finished); } else { - ACE_TRY { - successor->oneway_set_update(state - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_TRY_EX(ONEWAY_SET_UPDATE) { + successor->oneway_set_update(state ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(ONEWAY_SET_UPDATE); } ACE_CATCHANY { } ACE_ENDTRY; } } - else if (transaction_depth > 1) + else if (transaction_depth > 1) { + TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n"); ACE_THROW(FTRT::TransactionDepthTooHigh()); + } } +void +Basic_Replication_Strategy::add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL_NOT_USED) +{ + FtRtecEventChannelAdmin::EventChannel_var successor = GroupInfoPublisher::instance()->successor(); + bool finished = true; + do { + ACE_TRY { + successor->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH(CORBA::COMM_FAILURE, ex) { + if (ex.minor() == 6) finished = false; + else ACE_RE_THROW; + } + ACE_ENDTRY; + ACE_CHECK; + } while (!finished); +} int Basic_Replication_Strategy::acquire_read (void) { - return mutex_.acquire_read(); + return mutex_ ? mutex_->acquire_read() : 0; } int Basic_Replication_Strategy::acquire_write (void) { - return mutex_.acquire_write(); + return mutex_ ? mutex_->acquire_write() : 0; } int Basic_Replication_Strategy::release (void) { - return mutex_.release(); + return mutex_ ? mutex_->release() : 0; } diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h index 282f4b97d56..f3966e0ade1 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h @@ -18,18 +18,30 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +/** + * @class Basic_Replication_Strategy + * + * @brief Use two-way CORBA call to replicate the state to backup replicas. + */ + class Basic_Replication_Strategy : public Replication_Strategy { public: - Basic_Replication_Strategy(); + /** + * @param mt Specifies whether multithreaded ORB is used. + */ + Basic_Replication_Strategy(bool mt); + ~Basic_Replication_Strategy(); virtual void check_validity(ACE_ENV_SINGLE_ARG_DECL); - virtual void replicate_request( - const FTRT::State& state, + virtual void replicate_request(const FTRT::State& state, RollbackOperation rollback, const FtRtecEventChannelAdmin::ObjectId& oid ACE_ENV_ARG_DECL); + virtual void add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL); virtual int acquire_read (void); virtual int acquire_write (void); @@ -37,7 +49,7 @@ public: private: FTRT::SequenceNumber sequence_num_; - ACE_Thread_Mutex mutex_; + ACE_Recursive_Thread_Mutex* mutex_; }; #endif diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp index ae92bfbb833..7b26102fde7 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp @@ -6,6 +6,8 @@ #include "Fault_Detector.h" #include "IOGR_Maker.h" #include "GroupInfoPublisher.h" +#include "Replication_Service.h" +#include "../Utils/Log.h" ACE_RCSID (EventChannel, TAO_FTEC_Group_Manager, @@ -64,7 +66,6 @@ struct TAO_FTEC_Group_Manager_Impl { FTRT::ManagerInfoList info_list; unsigned my_position; - FTRT::FaultListener_var listener; }; TAO_FTEC_Group_Manager::TAO_FTEC_Group_Manager() @@ -87,7 +88,7 @@ CORBA::Boolean TAO_FTEC_Group_Manager::start ( CORBA::SystemException )) { - impl_->listener = listener; + listener_ = listener; ACE_NEW_RETURN(cur , FTRT::Location(Fault_Detector::instance()->my_location()), false); return true; } @@ -98,27 +99,21 @@ void TAO_FTEC_Group_Manager::create_group ( CORBA::ULong object_group_ref_version ACE_ENV_ARG_DECL) { - ACE_DEBUG((LM_DEBUG, "create_group\n")); - IOGR_Maker::instance()->set_ref_version( object_group_ref_version ); + TAO_FTRTEC::Log(1, "create_group\n"); impl_->info_list = info_list; impl_->my_position = find_by_location(info_list, Fault_Detector::instance()->my_location()); GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance(); - - publisher->update(impl_->info_list, impl_->my_position + GroupInfoPublisherBase::Info_ptr info = + publisher->setup_info(impl_->info_list, impl_->my_position ACE_ENV_ARG_PARAMETER); ACE_CHECK; - FtRtecEventChannelAdmin::EventChannel_var successor - = publisher->successor(); + publisher->update_info(info); - if (!CORBA::is_nil(successor.in())) { - successor->create_group(info_list, object_group_ref_version - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } + IOGR_Maker::instance()->set_ref_version( object_group_ref_version ); if (impl_->my_position > 0) { Fault_Detector* detector = Fault_Detector::instance(); @@ -127,13 +122,22 @@ void TAO_FTEC_Group_Manager::create_group ( ACE_THROW(FTRT::PredecessorUnreachable()); } } + + FtRtecEventChannelAdmin::EventChannel_var successor + = publisher->successor(); + + if (!CORBA::is_nil(successor.in())) { + successor->create_group(info_list, object_group_ref_version + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } } void TAO_FTEC_Group_Manager::join_group ( const FTRT::ManagerInfo & info ACE_ENV_ARG_DECL) { - ACE_DEBUG((LM_DEBUG, "join group\n")); + TAO_FTRTEC::Log(1, "join group\n"); if (impl_->my_position == 0) { FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance(); ACE_Write_Guard<FTRTEC::Replication_Service> lock(*svc); @@ -147,44 +151,61 @@ void TAO_FTEC_Group_Manager::add_member ( CORBA::ULong object_group_ref_version ACE_ENV_ARG_DECL) { - ACE_DEBUG((LM_DEBUG, "add_member location = <%s>\n", - (const char*)info.the_location[0].id)); + TAO_FTRTEC::Log(1, "add_member location = <%s>\n", + (const char*)info.the_location[0].id); + auto_ptr<TAO_FTEC_Group_Manager_Impl> new_impl(new TAO_FTEC_Group_Manager_Impl); + + new_impl->my_position = impl_->my_position; size_t pos = impl_->info_list.length(); - impl_->info_list.length(pos+1); - impl_->info_list[pos] = info; + new_impl->info_list.length(pos+1); + for (size_t i = 0; i < pos; ++i) { + new_impl->info_list[i] = impl_->info_list[i]; + } + new_impl->info_list[pos] = info; - IOGR_Maker::instance()->set_ref_version( object_group_ref_version ); GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance(); - publisher->update(impl_->info_list, impl_->my_position + GroupInfoPublisherBase::Info_ptr group_info = + publisher->setup_info(new_impl->info_list, new_impl->my_position ACE_ENV_ARG_PARAMETER); ACE_CHECK; - if (impl_->my_position < impl_->info_list.length()-2) + int last_one = (impl_->my_position == impl_->info_list.length()-1); + + if (!last_one) { // I am not the last of replica, tell my successor that // a new member has joined in. ACE_TRY_EX(block) { - publisher->successor()->add_member(info, object_group_ref_version + FTRTEC::Replication_Service::instance()->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK_EX(block); - return; } ACE_CATCHANY { // Unable to send request to all the successors. // Now this node become the last replica of the object group. - } - ACE_ENDTRY; + // update the info list again + new_impl->info_list.length(new_impl->my_position+2); + new_impl->info_list[new_impl->my_position+1] = info; - // update the info list again - impl_->info_list.length(impl_->my_position+2); - impl_->info_list[impl_->my_position+1] = info; + /// group_info = publisher->set_info(..) should be enough. + /// However, GCC 2.96 is not happy with that. - publisher->update(impl_->info_list, impl_->my_position + GroupInfoPublisherBase::Info_ptr group_info1 = + publisher->setup_info(new_impl->info_list, + new_impl->my_position ACE_ENV_ARG_PARAMETER); - } + ACE_CHECK; + group_info.reset(group_info1.release()); + last_one = true; + } + ACE_ENDTRY; + ACE_CHECK; + } + if (last_one) + { // this is the last replica in the list // synchornize the state with the newly joined replica. FtRtecEventChannelAdmin::EventChannelState state; @@ -204,10 +225,21 @@ void TAO_FTEC_Group_Manager::add_member ( else s.replace(cdr.begin()->length(), cdr.begin()); - ACE_DEBUG((LM_DEBUG, "Setting state\n")); + TAO_FTRTEC::Log(2, "Setting state\n"); info.ior->set_state(s ACE_ENV_ARG_PARAMETER); - info.ior->create_group(impl_->info_list, object_group_ref_version); - ACE_DEBUG((LM_DEBUG, "After create_group\n")); + ACE_CHECK; + info.ior->create_group(new_impl->info_list, + object_group_ref_version + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + TAO_FTRTEC::Log(2, "After create_group\n"); + } + + // commit the changes + IOGR_Maker::instance()->set_ref_version( object_group_ref_version ); + publisher->update_info(group_info); + delete impl_; + impl_ = new_impl.release(); } template <class SEQ> @@ -224,7 +256,7 @@ void TAO_FTEC_Group_Manager::replica_crashed ( const FTRT::Location & location ACE_ENV_ARG_DECL) { - ACE_DEBUG((LM_DEBUG, "TAO_FTEC_Group_Manager::replica_crashed\n")); + TAO_FTRTEC::Log(1, "TAO_FTEC_Group_Manager::replica_crashed\n"); FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance(); ACE_Write_Guard<FTRTEC::Replication_Service> lock(*svc); remove_member(location, IOGR_Maker::instance()->increment_ref_version() @@ -250,30 +282,36 @@ void TAO_FTEC_Group_Manager::remove_member ( GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance(); - publisher->update(impl_->info_list, impl_->my_position + GroupInfoPublisherBase::Info_ptr info = + publisher->setup_info(impl_->info_list, impl_->my_position ACE_ENV_ARG_PARAMETER); ACE_CHECK; + publisher->update_info(info); FtRtecEventChannelAdmin::EventChannel_var successor = publisher->successor(); IOGR_Maker::instance()->set_ref_version(object_group_ref_version); if (!CORBA::is_nil(successor.in())) { - + ACE_TRY { successor->remove_member(crashed_location, object_group_ref_version ACE_ENV_ARG_PARAMETER); - ACE_CHECK; + ACE_TRY_CHECK; + } + ACE_CATCHALL { + } + ACE_ENDTRY; } - ACE_DEBUG((LM_DEBUG, "2. my_position = %d, crashed_pos = %d\n", impl_->my_position, crashed_pos)); + TAO_FTRTEC::Log(3, "my_position = %d, crashed_pos = %d\n", impl_->my_position, crashed_pos); if (impl_->my_position == crashed_pos && impl_->my_position > 0) Fault_Detector::instance()->connect(impl_->info_list[impl_->my_position-1].the_location); } void TAO_FTEC_Group_Manager::connection_closed() { - ACE_DEBUG((LM_DEBUG, "TAO_FTEC_Group_Manager::connection_closed\n")); + TAO_FTRTEC::Log(1, "TAO_FTEC_Group_Manager::connection_closed\n"); ACE_ASSERT(impl_->my_position > 0); // do not use referere here, because the the value pointed by the pointer to @@ -283,6 +321,7 @@ void TAO_FTEC_Group_Manager::connection_closed() ACE_DECLARE_NEW_CORBA_ENV; if (impl_->my_position > 1) { + // if I am not the new primary, tell the new primary ACE_TRY_EX(block1) { TAO_IOP::TAO_IOR_Manipulation::IORList iors; iors.length(impl_->my_position-1); diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h index a8557ae4cdb..e41fc061fe2 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h @@ -33,11 +33,8 @@ public: virtual CORBA::Boolean start ( FTRT::FaultListener_ptr listener, FTRT::Location_out cur - ACE_ENV_ARG_DECL - ) - ACE_THROW_SPEC (( - CORBA::SystemException - )); + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); void create_group ( const FTRT::ManagerInfoList & info_list, @@ -71,6 +68,7 @@ private: virtual void connection_closed(); protected: + FTRT::FaultListener_var listener_; TAO_FTEC_Group_Manager_Impl* impl_; }; diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp index f5340424076..1766ace1006 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp @@ -5,6 +5,7 @@ #include "IOGR_Maker.h" #include "Identification_Service.h" #include "FTEC_Become_Primary_Listener.h" +#include "../Utils/Log.h" ACE_RCSID (EventChannel, GroupInfoPublisher, @@ -13,8 +14,9 @@ ACE_RCSID (EventChannel, GroupInfoPublisherBase::GroupInfoPublisherBase() -: primary_(false) +: info_(new Info) { + info_->primary = false; } @@ -33,19 +35,19 @@ void GroupInfoPublisherBase::set_naming_context(CosNaming::NamingContext_var nam bool GroupInfoPublisherBase::is_primary() const { - return primary_; + return info_->primary; } CORBA::Object_var GroupInfoPublisherBase::group_reference() const { - return iogr_; + return info_->iogr; } FtRtecEventChannelAdmin::EventChannel_var GroupInfoPublisherBase::successor() const { - return successor_; + return info_->successor; } @@ -53,16 +55,17 @@ GroupInfoPublisherBase::successor() const const GroupInfoPublisherBase::BackupList& GroupInfoPublisherBase::backups() const { - return backups_; + return info_->backups; } -void -GroupInfoPublisherBase::update(const FTRT::ManagerInfoList & info_list, +GroupInfoPublisherBase::Info_ptr +GroupInfoPublisherBase::setup_info(const FTRT::ManagerInfoList & info_list, int my_position ACE_ENV_ARG_DECL) { - bool become_primary = (my_position == 0 && !primary_); - primary_ = (my_position == 0); + Info_ptr result(new Info); + + result->primary = (my_position == 0); /// create the object group size_t len = info_list.length(); @@ -79,52 +82,78 @@ GroupInfoPublisherBase::update(const FTRT::ManagerInfoList & info_list, IOGR_Maker::instance()->make_iogr(iors ACE_ENV_ARG_PARAMETER); ACE_CHECK; - iogr_ = + result->iogr = ::FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() ACE_ENV_ARG_PARAMETER); ACE_CHECK; - if (primary_ && !CORBA::is_nil(naming_context_.in())) { - ACE_DEBUG((LM_DEBUG, "Registering to the Name Service\n")); - naming_context_->rebind(FTRTEC::Identification_Service::instance()->name(), - iogr_.in() ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - /// check if sucessor changed size_t successors_length = info_list.length() - my_position -1; - if (successors_length != backups_.length()) { + if (successors_length != info_->backups.length()) { // successor changed, update successor iors.length(successors_length); for (i = 0; i < successors_length; ++i) { iors[i] = CORBA::Object::_duplicate(info_list[i+ my_position+1].ior.in()); - } obj = IOGR_Maker::instance()->merge_iors(iors ACE_ENV_ARG_PARAMETER); ACE_CHECK; - successor_ = FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() + result->successor = + FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() ACE_ENV_ARG_PARAMETER); - FtRtecEventChannelAdmin::EventChannel_var t = successor_; ACE_CHECK; + } + else { + result->successor = info_->successor; + } + + if (!CORBA::is_nil(result->successor.in())) + { + CORBA::PolicyList_var pols; + result->successor->_validate_connection (pols.out ()); + } // update backups - backups_.length(successors_length); + result->backups.length(successors_length); for (i = 0; i < successors_length; ++i) { - backups_[i] = + result->backups[i] = FtRtecEventChannelAdmin::EventChannel::_narrow( info_list[i+ my_position+1].ior.in() ACE_ENV_ARG_PARAMETER); + CORBA::PolicyList_var pols; + result->backups[i]->_validate_connection (pols.out ()); ACE_CHECK; } + + return result; } - if (become_primary) { - // notify the subscribers - for (i = 0; i < subscribers_.size(); ++i) - subscribers_[i]->become_primary(); +void +GroupInfoPublisherBase::update_info(GroupInfoPublisherBase::Info_ptr info) +{ + if (info->primary) { + if (!info_->primary) { + // now we become the primary, notify the subscribers + for (size_t i = 0; i < subscribers_.size(); ++i) + subscribers_[i]->become_primary(); + } + + if (!CORBA::is_nil(naming_context_.in())) { + TAO_FTRTEC::Log(1, "Registering to the Name Service\n"); + ACE_TRY_NEW_ENV { + naming_context_->rebind(FTRTEC::Identification_Service::instance()->name(), + info->iogr.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL { + /// there's nothing we can do if the naming service is down + } + ACE_ENDTRY; + } } + info_ = info; } + diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h index e4fd4275ce7..baeb1d8b8d4 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h @@ -28,6 +28,16 @@ class TAO_FTEC_Become_Primary_Listener; class GroupInfoPublisherBase { public: + typedef FtRtecEventChannelAdmin::EventChannelList BackupList; + + struct Info { + bool primary; + CORBA::Object_var iogr; + FtRtecEventChannelAdmin::EventChannel_var successor; + BackupList backups; + }; + + typedef auto_ptr<Info> Info_ptr; friend class ACE_Singleton<GroupInfoPublisherBase, ACE_Thread_Mutex>; void subscribe(TAO_FTEC_Become_Primary_Listener* listener); @@ -37,13 +47,14 @@ public: FtRtecEventChannelAdmin::EventChannel_var successor() const; - typedef FtRtecEventChannelAdmin::EventChannelList BackupList; const BackupList& backups() const; - void update(const FTRT::ManagerInfoList & info_list, + Info_ptr setup_info(const FTRT::ManagerInfoList & info_list, int my_position ACE_ENV_ARG_DECL); + void update_info(Info_ptr info); + const PortableServer::ObjectId& object_id() const; const CosNaming::Name& name() const; @@ -53,15 +64,12 @@ public: private: GroupInfoPublisherBase(); - bool primary_; CosNaming::NamingContext_var naming_context_; - CORBA::Object_var iogr_; - FtRtecEventChannelAdmin::EventChannel_var successor_; - BackupList backups_; typedef ACE_Vector<TAO_FTEC_Become_Primary_Listener*, 2> Subscribers; Subscribers subscribers_; PortableServer::ObjectId object_id_; CosNaming::Name name_; + Info_ptr info_; }; typedef ACE_Singleton<GroupInfoPublisherBase, ACE_Thread_Mutex> GroupInfoPublisher; diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp new file mode 100644 index 00000000000..167822f3890 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp @@ -0,0 +1,52 @@ +// $Id$ +#include "ObjectGroupManagerHandler.h" + +ObjectGroupManagerHandler::ObjectGroupManagerHandler(ACE_Auto_Event& evt, int num_backups) +: evt_(evt), num_backups_(num_backups) +{ +} + +void ObjectGroupManagerHandler::start (CORBA::Boolean , + const FTRT::Location & ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void ObjectGroupManagerHandler::start_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void ObjectGroupManagerHandler::create_group () + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + + +void ObjectGroupManagerHandler::create_group_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void ObjectGroupManagerHandler::add_member () + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (--num_backups_ ==0) + evt_.signal(); +} + +void ObjectGroupManagerHandler::add_member_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->add_member(); +} + +void ObjectGroupManagerHandler::set_state () + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void ObjectGroupManagerHandler::set_state_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h new file mode 100644 index 00000000000..69c4944a11c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h @@ -0,0 +1,51 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file ObjectGroupManagerHandler.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= +#ifndef OBJECTGROUPMANAGERHANDLER_H +#define OBJECTGROUPMANAGERHANDLER_H + +#include "orbsvcs/orbsvcs/FTRT_GroupManagerS.h" +#include "ace/Auto_Event.h" + +class ObjectGroupManagerHandler : public POA_FTRT::AMI_ObjectGroupManagerHandler +{ +public: + ObjectGroupManagerHandler(ACE_Auto_Event& evt, int num_backups); + virtual void start (CORBA::Boolean ami_return_val, + const FTRT::Location & the_location) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void start_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void create_group () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void create_group_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void add_member () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void add_member_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void set_state () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void set_state_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder) + ACE_THROW_SPEC ((CORBA::SystemException)); +private: + ACE_Auto_Event& evt_; + ACE_Atomic_Op< ACE_Thread_Mutex, int > num_backups_; +}; + +#endif diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp index 807c24d5ac4..fb3a89a15e0 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp @@ -4,6 +4,7 @@ #include "AMI_Replication_Strategy.h" #include "Basic_Replication_Strategy.h" #include "FTEC_ORBInitializer.h" +#include "../Utils/Log.h" #include "tao/ORBInitializer_Registry.h" @@ -17,6 +18,7 @@ namespace FTRTEC { namespace { auto_ptr<Replication_Strategy> replication_strategy; + int threads = 1; Replication_Service* service; } @@ -43,14 +45,30 @@ namespace FTRTEC return 0; initialized = 1; - - Replication_Strategy* strategy; + bool ami = false; // Parse any service configurator parameters. - if (argc > 0 && ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("AMI")) == 0) - ACE_NEW_RETURN (strategy, AMI_Replication_Strategy, -1); - else - ACE_NEW_RETURN (strategy, Basic_Replication_Strategy, -1); + while (argc > 0) { + if (ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("AMI")) ==0 ) + ami = true; + if (ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("-threads")) ==0 && argc > 1) { + FTRTEC::threads = ACE_OS::atoi(argv[1]); + if (FTRTEC::threads ==0 ) + FTRTEC::threads = 1; + ++argv; --argc; + } + ++argv; --argc; + } + + Replication_Strategy* strategy; + if (ami) { + ACE_NEW_RETURN (strategy, AMI_Replication_Strategy(threads() > 1), -1); + TAO_FTRTEC::Log(3, "AMI replication strategy\n"); + } + else { + ACE_NEW_RETURN (strategy, Basic_Replication_Strategy(threads() > 1), -1); + TAO_FTRTEC::Log(3, "Basic replication strategy\n"); + } ACE_AUTO_PTR_RESET (replication_strategy, strategy, Replication_Strategy); @@ -120,27 +138,38 @@ namespace FTRTEC ACE_ENV_ARG_PARAMETER); } + void Replication_Service::add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL) + { + replication_strategy->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER); + } + int Replication_Service::acquire_read (void) { int r = replication_strategy->acquire_read(); - ACE_DEBUG((LM_DEBUG, "Read Lock acquired %d\n", r)); + TAO_FTRTEC::Log(3, "Read Lock acquired %d\n", r); return r; } int Replication_Service::acquire_write (void) { int r= replication_strategy->acquire_write(); - ACE_DEBUG((LM_DEBUG, "Write Lock acqured %d\n", r)); + TAO_FTRTEC::Log(3, "Write Lock acqured %d\n", r); return r; } int Replication_Service::release (void) { int r= replication_strategy->release(); - ACE_DEBUG((LM_DEBUG, "Lock Released %d\n", r)); + TAO_FTRTEC::Log(3, "Lock Released %d\n", r); return r; } + int Replication_Service::threads() const { + return FTRTEC::threads; + } + ACE_FACTORY_DEFINE (TAO_FTRTEC, Replication_Service) ACE_STATIC_SVC_DEFINE (Replication_Service, diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h index ace8f3bbed0..cc204aca9a3 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h @@ -25,8 +25,9 @@ namespace FTRTEC { - class Replication_Service : public TAO_FTEC_Become_Primary_Listener, - public ACE_Service_Object + class TAO_FTRTEC_Export Replication_Service + : public TAO_FTEC_Become_Primary_Listener + , public ACE_Service_Object { public: static Replication_Service* instance(); @@ -39,18 +40,37 @@ namespace FTRTEC virtual void become_primary(); + /** + * Used for checking if the incoming replication message is out of sequence. + */ void check_validity(ACE_ENV_SINGLE_ARG_DECL); typedef void (FtRtecEventChannelAdmin::EventChannelFacade::*RollbackOperation) (const FtRtecEventChannelAdmin::ObjectId& ACE_ENV_ARG_DECL_WITH_DEFAULTS); + /** + * Replicate a request. + * + * @param state The request to be replicated. + * @param rollback The rollback operation when the replication failed. + */ void replicate_request(const FtRtecEventChannelAdmin::Operation& update, RollbackOperation rollback ACE_ENV_ARG_DECL); + /** + * Inform the backup replicas that a new replica has joined the object + * group. + */ + void add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL); + int acquire_read (void); int acquire_write (void); int release (void); + + int threads() const; }; diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h index e3bbb090024..eef6209efc9 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h @@ -19,6 +19,11 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ + +namespace FTEC { + struct ManagerInfo; +}; + class TAO_FTEC_Event_Channel_Impl; class Replication_Strategy { @@ -26,17 +31,36 @@ public: Replication_Strategy(); virtual ~Replication_Strategy(); + /** + * Check if the incoming set_update() request is out of sequence. This is only + * used for basic replication strategy. It throws FTRT::OutOfSequence when the + * incoming request is not valid. + */ virtual void check_validity(ACE_ENV_SINGLE_ARG_DECL); typedef void (FtRtecEventChannelAdmin::EventChannelFacade::*RollbackOperation) (const FtRtecEventChannelAdmin::ObjectId& ACE_ENV_ARG_DECL_WITH_DEFAULTS); - + /** + * Replicate a request. + * + * @param state The request to be replicated. + * @param rollback The rollback operation when the replication failed. + * @param oid The object id used for rollback operation. + */ virtual void replicate_request(const FTRT::State& state, RollbackOperation rollback, const FtRtecEventChannelAdmin::ObjectId& oid ACE_ENV_ARG_DECL)=0; + /** + * Inform the backup replicas that a new replica has joined the object + * group. + */ + virtual void add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL)=0; + virtual Replication_Strategy* make_primary_strategy(); virtual int acquire_read (void)=0; |