diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp | 82 |
1 files changed, 69 insertions, 13 deletions
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp index 2d14b6132ac..deaa02702e9 100644 --- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp @@ -3,19 +3,22 @@ #include "AMI_Primary_Replication_Strategy.h" #include "ace/Synch_T.h" #include "GroupInfoPublisher.h" -#include "../Utils/ScopeGuard.h" #include "Request_Context_Repository.h" -#include "../Utils/resolve_init.h" #include "create_persistent_poa.h" #include "Update_Manager.h" #include "tao/Utils/PolicyList_Destroyer.h" +#include "ObjectGroupManagerHandler.h" +#include "tao/Utils/Implicit_Deactivator.h" +#include "../Utils/resolve_init.h" +#include "../Utils/ScopeGuard.h" +#include "../Utils/Log.h" ACE_RCSID (EventChannel, AMI_Primary_Replication_Strategy, "$Id$") -AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy() -: handler_(this) +AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy(bool mt) +: handler_(this), mutex_(mt ? new ACE_RW_Thread_Mutex : 0) { } @@ -27,20 +30,24 @@ AMI_Primary_Replication_Strategy::~AMI_Primary_Replication_Strategy() int AMI_Primary_Replication_Strategy::acquire_read (void) { - return mutex_.acquire_read(); + return mutex_ ? mutex_->acquire_read() : 0; } int AMI_Primary_Replication_Strategy::acquire_write (void) { - return mutex_.acquire_write(); + return mutex_ ? mutex_->acquire_write() : 0; } int AMI_Primary_Replication_Strategy::release (void) { - return mutex_.release(); + return mutex_ ? mutex_->release() : 0; } +int AMI_Primary_Replication_Strategy::init() +{ + return this->activate(); +} int AMI_Primary_Replication_Strategy::svc() { @@ -51,12 +58,12 @@ int AMI_Primary_Replication_Strategy::svc() ACE_TRY_CHECK; PortableServer::POA_var - root_poa = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA" + root_poa_ = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA" ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; // create POAManager - mgr_ = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + mgr_ = root_poa_->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; @@ -64,7 +71,7 @@ int AMI_Primary_Replication_Strategy::svc() ACE_TRY_CHECK; PortableServer::IdUniquenessPolicy_var id_uniqueness_policy = - root_poa->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID + root_poa_->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK @@ -73,7 +80,7 @@ int AMI_Primary_Replication_Strategy::svc() policy_list[0] = PortableServer::IdUniquenessPolicy::_duplicate( id_uniqueness_policy.in() ); - poa_ = create_persistent_poa(root_poa, mgr_, "AMI_Update", policy_list + poa_ = create_persistent_poa(root_poa_, mgr_, "AMI_Update", policy_list ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; @@ -116,8 +123,10 @@ AMI_Primary_Replication_Strategy::replicate_request( size_t num_backups = backups.length(); - if ((size_t)transaction_depth > num_backups) + if ((size_t)transaction_depth > num_backups) { + TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n"); ACE_THROW(FTRT::TransactionDepthTooHigh()); + } ACE_NEW_THROW_EX(manager, Update_Manager(event, backups.length(), transaction_depth-1, success), @@ -162,8 +171,55 @@ AMI_Primary_Replication_Strategy::replicate_request( } ACE_ENDTRY; } - + TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n"); ACE_THROW(FTRT::TransactionDepthTooHigh()); } } + +void +AMI_Primary_Replication_Strategy::add_member(const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL) +{ + ACE_Auto_Event event; + const FtRtecEventChannelAdmin::EventChannelList& backups = + GroupInfoPublisher::instance()->backups(); + + size_t num_backups = backups.length(); + ObjectGroupManagerHandler add_member_handler(event, num_backups+1); + // The extra one is to prevent the event been signaled prematurely. + + PortableServer::ObjectId_var oid = + root_poa_->activate_object(&add_member_handler ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + TAO::Utils::Implicit_Deactivator deactivator(&add_member_handler); + + CORBA::Object_var obj = + root_poa_->id_to_reference(oid.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + FTRT::AMI_ObjectGroupManagerHandler_var handler = + FTRT::AMI_ObjectGroupManagerHandler::_narrow(obj.in() ACE_ENV_ARG_PARAMETER); + + for (unsigned i = 0; i < num_backups; ++i) { + ACE_TRY { + backups[i].in()->sendc_add_member(handler.in(), + info, + object_group_ref_version + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL { + add_member_handler.add_member_excep(NULL); + } + ACE_ENDTRY; + } + // decrement the number of members so the event can be signaled once + // all replys have been received. + add_member_handler.add_member_excep(NULL); + + event.wait(); +} + |