summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp82
1 files changed, 69 insertions, 13 deletions
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
index 2d14b6132ac..deaa02702e9 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
@@ -3,19 +3,22 @@
#include "AMI_Primary_Replication_Strategy.h"
#include "ace/Synch_T.h"
#include "GroupInfoPublisher.h"
-#include "../Utils/ScopeGuard.h"
#include "Request_Context_Repository.h"
-#include "../Utils/resolve_init.h"
#include "create_persistent_poa.h"
#include "Update_Manager.h"
#include "tao/Utils/PolicyList_Destroyer.h"
+#include "ObjectGroupManagerHandler.h"
+#include "tao/Utils/Implicit_Deactivator.h"
+#include "../Utils/resolve_init.h"
+#include "../Utils/ScopeGuard.h"
+#include "../Utils/Log.h"
ACE_RCSID (EventChannel,
AMI_Primary_Replication_Strategy,
"$Id$")
-AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy()
-: handler_(this)
+AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy(bool mt)
+: handler_(this), mutex_(mt ? new ACE_RW_Thread_Mutex : 0)
{
}
@@ -27,20 +30,24 @@ AMI_Primary_Replication_Strategy::~AMI_Primary_Replication_Strategy()
int AMI_Primary_Replication_Strategy::acquire_read (void)
{
- return mutex_.acquire_read();
+ return mutex_ ? mutex_->acquire_read() : 0;
}
int AMI_Primary_Replication_Strategy::acquire_write (void)
{
- return mutex_.acquire_write();
+ return mutex_ ? mutex_->acquire_write() : 0;
}
int AMI_Primary_Replication_Strategy::release (void)
{
- return mutex_.release();
+ return mutex_ ? mutex_->release() : 0;
}
+int AMI_Primary_Replication_Strategy::init()
+{
+ return this->activate();
+}
int AMI_Primary_Replication_Strategy::svc()
{
@@ -51,12 +58,12 @@ int AMI_Primary_Replication_Strategy::svc()
ACE_TRY_CHECK;
PortableServer::POA_var
- root_poa = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA"
+ root_poa_ = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// create POAManager
- mgr_ = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER);
+ mgr_ = root_poa_->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
@@ -64,7 +71,7 @@ int AMI_Primary_Replication_Strategy::svc()
ACE_TRY_CHECK;
PortableServer::IdUniquenessPolicy_var id_uniqueness_policy =
- root_poa->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID
+ root_poa_->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK
@@ -73,7 +80,7 @@ int AMI_Primary_Replication_Strategy::svc()
policy_list[0] = PortableServer::IdUniquenessPolicy::_duplicate(
id_uniqueness_policy.in()
);
- poa_ = create_persistent_poa(root_poa, mgr_, "AMI_Update", policy_list
+ poa_ = create_persistent_poa(root_poa_, mgr_, "AMI_Update", policy_list
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
@@ -116,8 +123,10 @@ AMI_Primary_Replication_Strategy::replicate_request(
size_t num_backups = backups.length();
- if ((size_t)transaction_depth > num_backups)
+ if ((size_t)transaction_depth > num_backups) {
+ TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
ACE_THROW(FTRT::TransactionDepthTooHigh());
+ }
ACE_NEW_THROW_EX(manager,
Update_Manager(event, backups.length(), transaction_depth-1, success),
@@ -162,8 +171,55 @@ AMI_Primary_Replication_Strategy::replicate_request(
}
ACE_ENDTRY;
}
-
+ TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
ACE_THROW(FTRT::TransactionDepthTooHigh());
}
}
+
+void
+AMI_Primary_Replication_Strategy::add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL)
+{
+ ACE_Auto_Event event;
+ const FtRtecEventChannelAdmin::EventChannelList& backups =
+ GroupInfoPublisher::instance()->backups();
+
+ size_t num_backups = backups.length();
+ ObjectGroupManagerHandler add_member_handler(event, num_backups+1);
+ // The extra one is to prevent the event been signaled prematurely.
+
+ PortableServer::ObjectId_var oid =
+ root_poa_->activate_object(&add_member_handler ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ TAO::Utils::Implicit_Deactivator deactivator(&add_member_handler);
+
+ CORBA::Object_var obj =
+ root_poa_->id_to_reference(oid.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ FTRT::AMI_ObjectGroupManagerHandler_var handler =
+ FTRT::AMI_ObjectGroupManagerHandler::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+
+ for (unsigned i = 0; i < num_backups; ++i) {
+ ACE_TRY {
+ backups[i].in()->sendc_add_member(handler.in(),
+ info,
+ object_group_ref_version
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ add_member_handler.add_member_excep(NULL);
+ }
+ ACE_ENDTRY;
+ }
+ // decrement the number of members so the event can be signaled once
+ // all replys have been received.
+ add_member_handler.add_member_excep(NULL);
+
+ event.wait();
+}
+