summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuangming <huangminghuang@users.noreply.github.com>2004-02-15 20:17:45 +0000
committerhuangming <huangminghuang@users.noreply.github.com>2004-02-15 20:17:45 +0000
commite5773a04ac924cb8113a1e26024e70806ef22102 (patch)
tree56d4123842f31b406920a88dab70d1d9adbae30b
parent40f0b3c06f66969453c810fdcabe4f1471098014 (diff)
downloadATCD-e5773a04ac924cb8113a1e26024e70806ef22102.tar.gz
ChangeLogEntry: Sun Feb 15 14:06:33 2004 Huang-Ming Huang <hh1@cse.wustl.edu>
-rw-r--r--TAO/ChangeLog22
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent.mpc2
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp82
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h18
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp17
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h21
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp69
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h20
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp117
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp83
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h20
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp52
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h51
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp47
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h24
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h26
17 files changed, 551 insertions, 128 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 6651921a0fd..3fe9d65acc4 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,25 @@
+Sun Feb 15 14:06:33 2004 Huang-Ming Huang <hh1@cse.wustl.edu>
+
+ * orbsvcs/orbsvcs/FtRtEvent.mpc
+ * orbsvcs/orbsvcs/EventChannel/ObjectGroupManagerHandler.cpp:
+ * orbsvcs/orbsvcs/EventChannel/ObjectGroupManagerHandler.h:
+ * orbsvcs/orbsvcs/EventChannel/AMI_Primary_Replication_Strategy.cpp:
+ * orbsvcs/orbsvcs/EventChannel/AMI_Primary_Replication_Strategy.h:
+ * orbsvcs/orbsvcs/EventChannel/AMI_Replication_Strategy.cpp:
+ * orbsvcs/orbsvcs/EventChannel/AMI_Replication_Strategy.h:
+ * orbsvcs/orbsvcs/EventChannel/Basic_Replication_Strategy.cpp:
+ * orbsvcs/orbsvcs/EventChannel/Basic_Replication_Strategy.h:
+ * orbsvcs/orbsvcs/EventChannel/FTEC_Group_Manager.cpp:
+ * orbsvcs/orbsvcs/EventChannel/FTEC_Group_Manager.h:
+ * orbsvcs/orbsvcs/EventChannel/GroupInfoPublisher.cpp:
+ * orbsvcs/orbsvcs/EventChannel/GroupInfoPublisher.h:
+ * orbsvcs/orbsvcs/EventChannel/Replication_Service.cpp:
+ * orbsvcs/orbsvcs/EventChannel/Replication_Service.h:
+ * orbsvcs/orbsvcs/EventChannel/Replication_Strategy.h:
+ Changed for using AMI calls when adding new replicas to a
+ object group under AMI replication strategy.
+
+
Sun Feb 15 08:09:17 2004 Balachandran Natarajan <bala@dre.vanderbilt.edu>
* orbsvcs/examples/CosEC/Factory/CosEventChannelFactory.idl:
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc b/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc
index ae09f35cd4e..57ba4a816e7 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent.mpc
@@ -11,11 +11,11 @@ project (FtRtEvent) : orbsvcslib, core, ftorbutils, rtevent, naming {
idlflags += -GC
FTRT.idl
FtRtecEventComm.idl
+ FTRT_GroupManager.idl
}
IDL_Files {
FtRtecEventChannelAdmin.idl
- FTRT_GroupManager.idl
}
Source_Files (ORBSVCS_COMPONENTS) {
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
index 2d14b6132ac..deaa02702e9 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.cpp
@@ -3,19 +3,22 @@
#include "AMI_Primary_Replication_Strategy.h"
#include "ace/Synch_T.h"
#include "GroupInfoPublisher.h"
-#include "../Utils/ScopeGuard.h"
#include "Request_Context_Repository.h"
-#include "../Utils/resolve_init.h"
#include "create_persistent_poa.h"
#include "Update_Manager.h"
#include "tao/Utils/PolicyList_Destroyer.h"
+#include "ObjectGroupManagerHandler.h"
+#include "tao/Utils/Implicit_Deactivator.h"
+#include "../Utils/resolve_init.h"
+#include "../Utils/ScopeGuard.h"
+#include "../Utils/Log.h"
ACE_RCSID (EventChannel,
AMI_Primary_Replication_Strategy,
"$Id$")
-AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy()
-: handler_(this)
+AMI_Primary_Replication_Strategy::AMI_Primary_Replication_Strategy(bool mt)
+: handler_(this), mutex_(mt ? new ACE_RW_Thread_Mutex : 0)
{
}
@@ -27,20 +30,24 @@ AMI_Primary_Replication_Strategy::~AMI_Primary_Replication_Strategy()
int AMI_Primary_Replication_Strategy::acquire_read (void)
{
- return mutex_.acquire_read();
+ return mutex_ ? mutex_->acquire_read() : 0;
}
int AMI_Primary_Replication_Strategy::acquire_write (void)
{
- return mutex_.acquire_write();
+ return mutex_ ? mutex_->acquire_write() : 0;
}
int AMI_Primary_Replication_Strategy::release (void)
{
- return mutex_.release();
+ return mutex_ ? mutex_->release() : 0;
}
+int AMI_Primary_Replication_Strategy::init()
+{
+ return this->activate();
+}
int AMI_Primary_Replication_Strategy::svc()
{
@@ -51,12 +58,12 @@ int AMI_Primary_Replication_Strategy::svc()
ACE_TRY_CHECK;
PortableServer::POA_var
- root_poa = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA"
+ root_poa_ = resolve_init<PortableServer::POA>(orb_.in(), "RootPOA"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// create POAManager
- mgr_ = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER);
+ mgr_ = root_poa_->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
@@ -64,7 +71,7 @@ int AMI_Primary_Replication_Strategy::svc()
ACE_TRY_CHECK;
PortableServer::IdUniquenessPolicy_var id_uniqueness_policy =
- root_poa->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID
+ root_poa_->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK
@@ -73,7 +80,7 @@ int AMI_Primary_Replication_Strategy::svc()
policy_list[0] = PortableServer::IdUniquenessPolicy::_duplicate(
id_uniqueness_policy.in()
);
- poa_ = create_persistent_poa(root_poa, mgr_, "AMI_Update", policy_list
+ poa_ = create_persistent_poa(root_poa_, mgr_, "AMI_Update", policy_list
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
@@ -116,8 +123,10 @@ AMI_Primary_Replication_Strategy::replicate_request(
size_t num_backups = backups.length();
- if ((size_t)transaction_depth > num_backups)
+ if ((size_t)transaction_depth > num_backups) {
+ TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
ACE_THROW(FTRT::TransactionDepthTooHigh());
+ }
ACE_NEW_THROW_EX(manager,
Update_Manager(event, backups.length(), transaction_depth-1, success),
@@ -162,8 +171,55 @@ AMI_Primary_Replication_Strategy::replicate_request(
}
ACE_ENDTRY;
}
-
+ TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
ACE_THROW(FTRT::TransactionDepthTooHigh());
}
}
+
+void
+AMI_Primary_Replication_Strategy::add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL)
+{
+ ACE_Auto_Event event;
+ const FtRtecEventChannelAdmin::EventChannelList& backups =
+ GroupInfoPublisher::instance()->backups();
+
+ size_t num_backups = backups.length();
+ ObjectGroupManagerHandler add_member_handler(event, num_backups+1);
+ // The extra one is to prevent the event been signaled prematurely.
+
+ PortableServer::ObjectId_var oid =
+ root_poa_->activate_object(&add_member_handler ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ TAO::Utils::Implicit_Deactivator deactivator(&add_member_handler);
+
+ CORBA::Object_var obj =
+ root_poa_->id_to_reference(oid.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ FTRT::AMI_ObjectGroupManagerHandler_var handler =
+ FTRT::AMI_ObjectGroupManagerHandler::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+
+ for (unsigned i = 0; i < num_backups; ++i) {
+ ACE_TRY {
+ backups[i].in()->sendc_add_member(handler.in(),
+ info,
+ object_group_ref_version
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ add_member_handler.add_member_excep(NULL);
+ }
+ ACE_ENDTRY;
+ }
+ // decrement the number of members so the event can be signaled once
+ // all replys have been received.
+ add_member_handler.add_member_excep(NULL);
+
+ event.wait();
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h
index 3a019954100..aafee54d748 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Primary_Replication_Strategy.h
@@ -22,16 +22,29 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+/**
+ * @class AMI_Primary_Replication_Strategy.
+ *
+ * @brief Used by primary replicas when AMI calls are used for replicating requests.
+ */
+
class AMI_Primary_Replication_Strategy : public Replication_Strategy
, public ACE_Task_Base
{
public:
- AMI_Primary_Replication_Strategy();
+ /**
+ * @param mt Specifies whether multithreaded ORB is used.
+ */
+ AMI_Primary_Replication_Strategy(bool mt);
virtual ~AMI_Primary_Replication_Strategy();
+ virtual int init();
virtual void replicate_request(const FTRT::State& state,
RollbackOperation rollback,
const FtRtecEventChannelAdmin::ObjectId& oid
ACE_ENV_ARG_DECL);
+ virtual void add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL);
virtual int acquire_read (void);
virtual int acquire_write (void);
virtual int release (void);
@@ -40,11 +53,12 @@ public:
private:
virtual int svc (void);
CORBA::ORB_var orb_;
+ PortableServer::POA_var root_poa_;
PortableServer::POA_var poa_;
PortableServer::POAManager_var mgr_;
bool running_;
- ACE_RW_Thread_Mutex mutex_;
UpdateableHandler handler_;
+ ACE_RW_Thread_Mutex* mutex_;
};
#endif // AMI_PRIMARY_REPLICATION_STRATEGY_H
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp
index be41268c19e..b343c0e9af7 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.cpp
@@ -7,7 +7,12 @@ ACE_RCSID (EventChannel,
AMI_Replication_Strategy,
"$Id$")
-AMI_Replication_Strategy::AMI_Replication_Strategy()
+AMI_Replication_Strategy::AMI_Replication_Strategy(bool mt)
+: mt_(mt)
+{
+}
+
+AMI_Replication_Strategy::~AMI_Replication_Strategy()
{
}
@@ -23,12 +28,20 @@ AMI_Replication_Strategy::replicate_request(
ACE_UNUSED_ARG(oid);
}
+void
+AMI_Replication_Strategy::add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ ACE_UNUSED_ARG(info);
+ ACE_UNUSED_ARG(object_group_ref_version);
+}
Replication_Strategy*
AMI_Replication_Strategy::make_primary_strategy()
{
AMI_Primary_Replication_Strategy* result;
- ACE_NEW_RETURN(result, AMI_Primary_Replication_Strategy, 0);
+ ACE_NEW_RETURN(result, AMI_Primary_Replication_Strategy(mt_), 0);
auto_ptr<AMI_Primary_Replication_Strategy> holder(result);
if (result->activate() == 0)
return holder.release();
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h
index d7ef25c58eb..055b3ea22cc 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/AMI_Replication_Strategy.h
@@ -22,21 +22,34 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+/**
+ * @class AMI_Replication_Strategy.
+ *
+ * @brief Used by backup replicas when AMI calls are used for replicating requests.
+ */
+
class AMI_Replication_Strategy: public Replication_Strategy
{
public:
- AMI_Replication_Strategy();
- virtual void replicate_request(
- const FTRT::State& state,
+ /**
+ * @param mt Specifies whether multithreaded ORB is used.
+ */
+ AMI_Replication_Strategy(bool mt);
+ ~AMI_Replication_Strategy();
+ virtual void replicate_request(const FTRT::State& state,
RollbackOperation rollback,
const FtRtecEventChannelAdmin::ObjectId& oid
ACE_ENV_ARG_DECL);
+ virtual void add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL);
virtual Replication_Strategy* make_primary_strategy();
virtual int acquire_read (void);
virtual int acquire_write (void);
virtual int release (void);
-
+private:
+ bool mt_;
};
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp
index 826d60f2b7d..8e2d300a0ab 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp
@@ -4,23 +4,33 @@
#include "GroupInfoPublisher.h"
#include "FTEC_Event_Channel.h"
#include "Request_Context_Repository.h"
+#include "../Utils/Log.h"
ACE_RCSID (EventChannel,
Basic_Replication_Strategy,
"$Id$")
-Basic_Replication_Strategy::Basic_Replication_Strategy()
+/// The mutex has to be recursive; otherwise, if the second replicate_request() is
+/// called while the first replicate_request() is waiting for reply, we will get
+/// a deadlock.
+Basic_Replication_Strategy::Basic_Replication_Strategy(bool mt)
: sequence_num_(0)
+, mutex_(mt ? new ACE_Recursive_Thread_Mutex : 0)
{
}
+Basic_Replication_Strategy::~Basic_Replication_Strategy()
+{
+ delete mutex_;
+}
+
void
Basic_Replication_Strategy::check_validity(ACE_ENV_SINGLE_ARG_DECL)
{
FTRT::SequenceNumber seq_no = Request_Context_Repository().get_sequence_number(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
- ACE_DEBUG((LM_DEBUG, "check_validity : sequence no = %d\n", sequence_num_));
+ TAO_FTRTEC::Log(1 , "check_validity : sequence no = %d\n", sequence_num_);
if (this->sequence_num_ == 0) {
// this is the first set_update received from the primary
@@ -33,6 +43,7 @@ Basic_Replication_Strategy::check_validity(ACE_ENV_SINGLE_ARG_DECL)
// client_interceptor_->sequence_num_--;
FTRT::OutOfSequence exception;
exception.current = this->sequence_num_;
+ TAO_FTRTEC::Log(3, "Throwing FTRT::OutOfSequence (old sequence_num_ = %d)\n", this->sequence_num_);
ACE_THROW(FTRT::OutOfSequence(exception));
}
else
@@ -60,7 +71,7 @@ Basic_Replication_Strategy::replicate_request(
if (info_publisher->is_primary())
this->sequence_num_++;
- ACE_DEBUG((LM_DEBUG, "replicate_request : sequence no = %d\n", sequence_num_));
+ TAO_FTRTEC::Log(1, "replicate_request : sequence no = %d\n", sequence_num_);
Request_Context_Repository().set_sequence_number(sequence_num_
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
@@ -69,37 +80,69 @@ Basic_Replication_Strategy::replicate_request(
ACE_CHECK;
if (transaction_depth > 1) {
- successor->set_update(state
- ACE_ENV_ARG_PARAMETER);
+ bool finished = true;
+ do {
+ ACE_TRY_EX(SET_UPDATE) {
+ successor->set_update(state ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK_EX(SET_UPDATE);
+ }
+ ACE_CATCH(CORBA::COMM_FAILURE, ex) {
+ if (ex.minor() == 6) finished = false;
+ else ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+ } while(!finished);
}
else {
- ACE_TRY {
- successor->oneway_set_update(state
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_TRY_EX(ONEWAY_SET_UPDATE) {
+ successor->oneway_set_update(state ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK_EX(ONEWAY_SET_UPDATE);
}
ACE_CATCHANY {
}
ACE_ENDTRY;
}
}
- else if (transaction_depth > 1)
+ else if (transaction_depth > 1) {
+ TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
ACE_THROW(FTRT::TransactionDepthTooHigh());
+ }
}
+void
+Basic_Replication_Strategy::add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ FtRtecEventChannelAdmin::EventChannel_var successor = GroupInfoPublisher::instance()->successor();
+ bool finished = true;
+ do {
+ ACE_TRY {
+ successor->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH(CORBA::COMM_FAILURE, ex) {
+ if (ex.minor() == 6) finished = false;
+ else ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+ } while (!finished);
+}
int Basic_Replication_Strategy::acquire_read (void)
{
- return mutex_.acquire_read();
+ return mutex_ ? mutex_->acquire_read() : 0;
}
int Basic_Replication_Strategy::acquire_write (void)
{
- return mutex_.acquire_write();
+ return mutex_ ? mutex_->acquire_write() : 0;
}
int Basic_Replication_Strategy::release (void)
{
- return mutex_.release();
+ return mutex_ ? mutex_->release() : 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h
index 282f4b97d56..f3966e0ade1 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h
@@ -18,18 +18,30 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+/**
+ * @class Basic_Replication_Strategy
+ *
+ * @brief Use two-way CORBA call to replicate the state to backup replicas.
+ */
+
class Basic_Replication_Strategy : public Replication_Strategy
{
public:
- Basic_Replication_Strategy();
+ /**
+ * @param mt Specifies whether multithreaded ORB is used.
+ */
+ Basic_Replication_Strategy(bool mt);
+ ~Basic_Replication_Strategy();
virtual void check_validity(ACE_ENV_SINGLE_ARG_DECL);
- virtual void replicate_request(
- const FTRT::State& state,
+ virtual void replicate_request(const FTRT::State& state,
RollbackOperation rollback,
const FtRtecEventChannelAdmin::ObjectId& oid
ACE_ENV_ARG_DECL);
+ virtual void add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL);
virtual int acquire_read (void);
virtual int acquire_write (void);
@@ -37,7 +49,7 @@ public:
private:
FTRT::SequenceNumber sequence_num_;
- ACE_Thread_Mutex mutex_;
+ ACE_Recursive_Thread_Mutex* mutex_;
};
#endif
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp
index ae92bfbb833..7b26102fde7 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.cpp
@@ -6,6 +6,8 @@
#include "Fault_Detector.h"
#include "IOGR_Maker.h"
#include "GroupInfoPublisher.h"
+#include "Replication_Service.h"
+#include "../Utils/Log.h"
ACE_RCSID (EventChannel,
TAO_FTEC_Group_Manager,
@@ -64,7 +66,6 @@ struct TAO_FTEC_Group_Manager_Impl
{
FTRT::ManagerInfoList info_list;
unsigned my_position;
- FTRT::FaultListener_var listener;
};
TAO_FTEC_Group_Manager::TAO_FTEC_Group_Manager()
@@ -87,7 +88,7 @@ CORBA::Boolean TAO_FTEC_Group_Manager::start (
CORBA::SystemException
))
{
- impl_->listener = listener;
+ listener_ = listener;
ACE_NEW_RETURN(cur , FTRT::Location(Fault_Detector::instance()->my_location()), false);
return true;
}
@@ -98,27 +99,21 @@ void TAO_FTEC_Group_Manager::create_group (
CORBA::ULong object_group_ref_version
ACE_ENV_ARG_DECL)
{
- ACE_DEBUG((LM_DEBUG, "create_group\n"));
- IOGR_Maker::instance()->set_ref_version( object_group_ref_version );
+ TAO_FTRTEC::Log(1, "create_group\n");
impl_->info_list = info_list;
impl_->my_position = find_by_location(info_list,
Fault_Detector::instance()->my_location());
GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance();
-
- publisher->update(impl_->info_list, impl_->my_position
+ GroupInfoPublisherBase::Info_ptr info =
+ publisher->setup_info(impl_->info_list, impl_->my_position
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- FtRtecEventChannelAdmin::EventChannel_var successor
- = publisher->successor();
+ publisher->update_info(info);
- if (!CORBA::is_nil(successor.in())) {
- successor->create_group(info_list, object_group_ref_version
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
+ IOGR_Maker::instance()->set_ref_version( object_group_ref_version );
if (impl_->my_position > 0) {
Fault_Detector* detector = Fault_Detector::instance();
@@ -127,13 +122,22 @@ void TAO_FTEC_Group_Manager::create_group (
ACE_THROW(FTRT::PredecessorUnreachable());
}
}
+
+ FtRtecEventChannelAdmin::EventChannel_var successor
+ = publisher->successor();
+
+ if (!CORBA::is_nil(successor.in())) {
+ successor->create_group(info_list, object_group_ref_version
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
}
void TAO_FTEC_Group_Manager::join_group (
const FTRT::ManagerInfo & info
ACE_ENV_ARG_DECL)
{
- ACE_DEBUG((LM_DEBUG, "join group\n"));
+ TAO_FTRTEC::Log(1, "join group\n");
if (impl_->my_position == 0) {
FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance();
ACE_Write_Guard<FTRTEC::Replication_Service> lock(*svc);
@@ -147,44 +151,61 @@ void TAO_FTEC_Group_Manager::add_member (
CORBA::ULong object_group_ref_version
ACE_ENV_ARG_DECL)
{
- ACE_DEBUG((LM_DEBUG, "add_member location = <%s>\n",
- (const char*)info.the_location[0].id));
+ TAO_FTRTEC::Log(1, "add_member location = <%s>\n",
+ (const char*)info.the_location[0].id);
+ auto_ptr<TAO_FTEC_Group_Manager_Impl> new_impl(new TAO_FTEC_Group_Manager_Impl);
+
+ new_impl->my_position = impl_->my_position;
size_t pos = impl_->info_list.length();
- impl_->info_list.length(pos+1);
- impl_->info_list[pos] = info;
+ new_impl->info_list.length(pos+1);
+ for (size_t i = 0; i < pos; ++i) {
+ new_impl->info_list[i] = impl_->info_list[i];
+ }
+ new_impl->info_list[pos] = info;
- IOGR_Maker::instance()->set_ref_version( object_group_ref_version );
GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance();
- publisher->update(impl_->info_list, impl_->my_position
+ GroupInfoPublisherBase::Info_ptr group_info =
+ publisher->setup_info(new_impl->info_list, new_impl->my_position
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- if (impl_->my_position < impl_->info_list.length()-2)
+ int last_one = (impl_->my_position == impl_->info_list.length()-1);
+
+ if (!last_one)
{
// I am not the last of replica, tell my successor that
// a new member has joined in.
ACE_TRY_EX(block) {
- publisher->successor()->add_member(info, object_group_ref_version
+ FTRTEC::Replication_Service::instance()->add_member(info, object_group_ref_version
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK_EX(block);
- return;
}
ACE_CATCHANY {
// Unable to send request to all the successors.
// Now this node become the last replica of the object group.
- }
- ACE_ENDTRY;
+ // update the info list again
+ new_impl->info_list.length(new_impl->my_position+2);
+ new_impl->info_list[new_impl->my_position+1] = info;
- // update the info list again
- impl_->info_list.length(impl_->my_position+2);
- impl_->info_list[impl_->my_position+1] = info;
+ /// group_info = publisher->set_info(..) should be enough.
+ /// However, GCC 2.96 is not happy with that.
- publisher->update(impl_->info_list, impl_->my_position
+ GroupInfoPublisherBase::Info_ptr group_info1 =
+ publisher->setup_info(new_impl->info_list,
+ new_impl->my_position
ACE_ENV_ARG_PARAMETER);
- }
+ ACE_CHECK;
+ group_info.reset(group_info1.release());
+ last_one = true;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+ }
+ if (last_one)
+ {
// this is the last replica in the list
// synchornize the state with the newly joined replica.
FtRtecEventChannelAdmin::EventChannelState state;
@@ -204,10 +225,21 @@ void TAO_FTEC_Group_Manager::add_member (
else
s.replace(cdr.begin()->length(), cdr.begin());
- ACE_DEBUG((LM_DEBUG, "Setting state\n"));
+ TAO_FTRTEC::Log(2, "Setting state\n");
info.ior->set_state(s ACE_ENV_ARG_PARAMETER);
- info.ior->create_group(impl_->info_list, object_group_ref_version);
- ACE_DEBUG((LM_DEBUG, "After create_group\n"));
+ ACE_CHECK;
+ info.ior->create_group(new_impl->info_list,
+ object_group_ref_version
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ TAO_FTRTEC::Log(2, "After create_group\n");
+ }
+
+ // commit the changes
+ IOGR_Maker::instance()->set_ref_version( object_group_ref_version );
+ publisher->update_info(group_info);
+ delete impl_;
+ impl_ = new_impl.release();
}
template <class SEQ>
@@ -224,7 +256,7 @@ void TAO_FTEC_Group_Manager::replica_crashed (
const FTRT::Location & location
ACE_ENV_ARG_DECL)
{
- ACE_DEBUG((LM_DEBUG, "TAO_FTEC_Group_Manager::replica_crashed\n"));
+ TAO_FTRTEC::Log(1, "TAO_FTEC_Group_Manager::replica_crashed\n");
FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance();
ACE_Write_Guard<FTRTEC::Replication_Service> lock(*svc);
remove_member(location, IOGR_Maker::instance()->increment_ref_version()
@@ -250,30 +282,36 @@ void TAO_FTEC_Group_Manager::remove_member (
GroupInfoPublisherBase* publisher = GroupInfoPublisher::instance();
- publisher->update(impl_->info_list, impl_->my_position
+ GroupInfoPublisherBase::Info_ptr info =
+ publisher->setup_info(impl_->info_list, impl_->my_position
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+ publisher->update_info(info);
FtRtecEventChannelAdmin::EventChannel_var successor =
publisher->successor();
IOGR_Maker::instance()->set_ref_version(object_group_ref_version);
if (!CORBA::is_nil(successor.in())) {
-
+ ACE_TRY {
successor->remove_member(crashed_location,
object_group_ref_version
ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ }
+ ACE_ENDTRY;
}
- ACE_DEBUG((LM_DEBUG, "2. my_position = %d, crashed_pos = %d\n", impl_->my_position, crashed_pos));
+ TAO_FTRTEC::Log(3, "my_position = %d, crashed_pos = %d\n", impl_->my_position, crashed_pos);
if (impl_->my_position == crashed_pos && impl_->my_position > 0)
Fault_Detector::instance()->connect(impl_->info_list[impl_->my_position-1].the_location);
}
void TAO_FTEC_Group_Manager::connection_closed()
{
- ACE_DEBUG((LM_DEBUG, "TAO_FTEC_Group_Manager::connection_closed\n"));
+ TAO_FTRTEC::Log(1, "TAO_FTEC_Group_Manager::connection_closed\n");
ACE_ASSERT(impl_->my_position > 0);
// do not use referere here, because the the value pointed by the pointer to
@@ -283,6 +321,7 @@ void TAO_FTEC_Group_Manager::connection_closed()
ACE_DECLARE_NEW_CORBA_ENV;
if (impl_->my_position > 1) {
+ // if I am not the new primary, tell the new primary
ACE_TRY_EX(block1) {
TAO_IOP::TAO_IOR_Manipulation::IORList iors;
iors.length(impl_->my_position-1);
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h
index a8557ae4cdb..e41fc061fe2 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Group_Manager.h
@@ -33,11 +33,8 @@ public:
virtual CORBA::Boolean start (
FTRT::FaultListener_ptr listener,
FTRT::Location_out cur
- ACE_ENV_ARG_DECL
- )
- ACE_THROW_SPEC ((
- CORBA::SystemException
- ));
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
void create_group (
const FTRT::ManagerInfoList & info_list,
@@ -71,6 +68,7 @@ private:
virtual void connection_closed();
protected:
+ FTRT::FaultListener_var listener_;
TAO_FTEC_Group_Manager_Impl* impl_;
};
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp
index f5340424076..1766ace1006 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.cpp
@@ -5,6 +5,7 @@
#include "IOGR_Maker.h"
#include "Identification_Service.h"
#include "FTEC_Become_Primary_Listener.h"
+#include "../Utils/Log.h"
ACE_RCSID (EventChannel,
GroupInfoPublisher,
@@ -13,8 +14,9 @@ ACE_RCSID (EventChannel,
GroupInfoPublisherBase::GroupInfoPublisherBase()
-: primary_(false)
+: info_(new Info)
{
+ info_->primary = false;
}
@@ -33,19 +35,19 @@ void GroupInfoPublisherBase::set_naming_context(CosNaming::NamingContext_var nam
bool
GroupInfoPublisherBase::is_primary() const
{
- return primary_;
+ return info_->primary;
}
CORBA::Object_var
GroupInfoPublisherBase::group_reference() const
{
- return iogr_;
+ return info_->iogr;
}
FtRtecEventChannelAdmin::EventChannel_var
GroupInfoPublisherBase::successor() const
{
- return successor_;
+ return info_->successor;
}
@@ -53,16 +55,17 @@ GroupInfoPublisherBase::successor() const
const GroupInfoPublisherBase::BackupList&
GroupInfoPublisherBase::backups() const
{
- return backups_;
+ return info_->backups;
}
-void
-GroupInfoPublisherBase::update(const FTRT::ManagerInfoList & info_list,
+GroupInfoPublisherBase::Info_ptr
+GroupInfoPublisherBase::setup_info(const FTRT::ManagerInfoList & info_list,
int my_position
ACE_ENV_ARG_DECL)
{
- bool become_primary = (my_position == 0 && !primary_);
- primary_ = (my_position == 0);
+ Info_ptr result(new Info);
+
+ result->primary = (my_position == 0);
/// create the object group
size_t len = info_list.length();
@@ -79,52 +82,78 @@ GroupInfoPublisherBase::update(const FTRT::ManagerInfoList & info_list,
IOGR_Maker::instance()->make_iogr(iors ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- iogr_ =
+ result->iogr =
::FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- if (primary_ && !CORBA::is_nil(naming_context_.in())) {
- ACE_DEBUG((LM_DEBUG, "Registering to the Name Service\n"));
- naming_context_->rebind(FTRTEC::Identification_Service::instance()->name(),
- iogr_.in() ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
-
/// check if sucessor changed
size_t successors_length = info_list.length() - my_position -1;
- if (successors_length != backups_.length()) {
+ if (successors_length != info_->backups.length()) {
// successor changed, update successor
iors.length(successors_length);
for (i = 0; i < successors_length; ++i) {
iors[i] = CORBA::Object::_duplicate(info_list[i+ my_position+1].ior.in());
-
}
obj = IOGR_Maker::instance()->merge_iors(iors
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- successor_ = FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in()
+ result->successor =
+ FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in()
ACE_ENV_ARG_PARAMETER);
- FtRtecEventChannelAdmin::EventChannel_var t = successor_;
ACE_CHECK;
+ }
+ else {
+ result->successor = info_->successor;
+ }
+
+ if (!CORBA::is_nil(result->successor.in()))
+ {
+ CORBA::PolicyList_var pols;
+ result->successor->_validate_connection (pols.out ());
+ }
// update backups
- backups_.length(successors_length);
+ result->backups.length(successors_length);
for (i = 0; i < successors_length; ++i) {
- backups_[i] =
+ result->backups[i] =
FtRtecEventChannelAdmin::EventChannel::_narrow(
info_list[i+ my_position+1].ior.in()
ACE_ENV_ARG_PARAMETER);
+ CORBA::PolicyList_var pols;
+ result->backups[i]->_validate_connection (pols.out ());
ACE_CHECK;
}
+
+ return result;
}
- if (become_primary) {
- // notify the subscribers
- for (i = 0; i < subscribers_.size(); ++i)
- subscribers_[i]->become_primary();
+void
+GroupInfoPublisherBase::update_info(GroupInfoPublisherBase::Info_ptr info)
+{
+ if (info->primary) {
+ if (!info_->primary) {
+ // now we become the primary, notify the subscribers
+ for (size_t i = 0; i < subscribers_.size(); ++i)
+ subscribers_[i]->become_primary();
+ }
+
+ if (!CORBA::is_nil(naming_context_.in())) {
+ TAO_FTRTEC::Log(1, "Registering to the Name Service\n");
+ ACE_TRY_NEW_ENV {
+ naming_context_->rebind(FTRTEC::Identification_Service::instance()->name(),
+ info->iogr.in() ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ /// there's nothing we can do if the naming service is down
+ }
+ ACE_ENDTRY;
+ }
}
+ info_ = info;
}
+
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h
index e4fd4275ce7..baeb1d8b8d4 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h
@@ -28,6 +28,16 @@ class TAO_FTEC_Become_Primary_Listener;
class GroupInfoPublisherBase
{
public:
+ typedef FtRtecEventChannelAdmin::EventChannelList BackupList;
+
+ struct Info {
+ bool primary;
+ CORBA::Object_var iogr;
+ FtRtecEventChannelAdmin::EventChannel_var successor;
+ BackupList backups;
+ };
+
+ typedef auto_ptr<Info> Info_ptr;
friend class ACE_Singleton<GroupInfoPublisherBase, ACE_Thread_Mutex>;
void subscribe(TAO_FTEC_Become_Primary_Listener* listener);
@@ -37,13 +47,14 @@ public:
FtRtecEventChannelAdmin::EventChannel_var successor() const;
- typedef FtRtecEventChannelAdmin::EventChannelList BackupList;
const BackupList& backups() const;
- void update(const FTRT::ManagerInfoList & info_list,
+ Info_ptr setup_info(const FTRT::ManagerInfoList & info_list,
int my_position
ACE_ENV_ARG_DECL);
+ void update_info(Info_ptr info);
+
const PortableServer::ObjectId& object_id() const;
const CosNaming::Name& name() const;
@@ -53,15 +64,12 @@ public:
private:
GroupInfoPublisherBase();
- bool primary_;
CosNaming::NamingContext_var naming_context_;
- CORBA::Object_var iogr_;
- FtRtecEventChannelAdmin::EventChannel_var successor_;
- BackupList backups_;
typedef ACE_Vector<TAO_FTEC_Become_Primary_Listener*, 2> Subscribers;
Subscribers subscribers_;
PortableServer::ObjectId object_id_;
CosNaming::Name name_;
+ Info_ptr info_;
};
typedef ACE_Singleton<GroupInfoPublisherBase, ACE_Thread_Mutex> GroupInfoPublisher;
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp
new file mode 100644
index 00000000000..167822f3890
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.cpp
@@ -0,0 +1,52 @@
+// $Id$
+#include "ObjectGroupManagerHandler.h"
+
+ObjectGroupManagerHandler::ObjectGroupManagerHandler(ACE_Auto_Event& evt, int num_backups)
+: evt_(evt), num_backups_(num_backups)
+{
+}
+
+void ObjectGroupManagerHandler::start (CORBA::Boolean ,
+ const FTRT::Location & )
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void ObjectGroupManagerHandler::start_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * )
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void ObjectGroupManagerHandler::create_group ()
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+
+void ObjectGroupManagerHandler::create_group_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * )
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void ObjectGroupManagerHandler::add_member ()
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (--num_backups_ ==0)
+ evt_.signal();
+}
+
+void ObjectGroupManagerHandler::add_member_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * )
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->add_member();
+}
+
+void ObjectGroupManagerHandler::set_state ()
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void ObjectGroupManagerHandler::set_state_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * )
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h
new file mode 100644
index 00000000000..69c4944a11c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/ObjectGroupManagerHandler.h
@@ -0,0 +1,51 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file ObjectGroupManagerHandler.h
+ *
+ * $Id$
+ *
+ * @author Huang-Ming Huang <hh1@cse.wustl.edu>
+ */
+//=============================================================================
+#ifndef OBJECTGROUPMANAGERHANDLER_H
+#define OBJECTGROUPMANAGERHANDLER_H
+
+#include "orbsvcs/orbsvcs/FTRT_GroupManagerS.h"
+#include "ace/Auto_Event.h"
+
+class ObjectGroupManagerHandler : public POA_FTRT::AMI_ObjectGroupManagerHandler
+{
+public:
+ ObjectGroupManagerHandler(ACE_Auto_Event& evt, int num_backups);
+ virtual void start (CORBA::Boolean ami_return_val,
+ const FTRT::Location & the_location)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void start_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void create_group ()
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void create_group_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void add_member ()
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void add_member_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void set_state ()
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void set_state_excep (FTRT::AMI_ObjectGroupManagerExceptionHolder * excep_holder)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+private:
+ ACE_Auto_Event& evt_;
+ ACE_Atomic_Op< ACE_Thread_Mutex, int > num_backups_;
+};
+
+#endif
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp
index 807c24d5ac4..fb3a89a15e0 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.cpp
@@ -4,6 +4,7 @@
#include "AMI_Replication_Strategy.h"
#include "Basic_Replication_Strategy.h"
#include "FTEC_ORBInitializer.h"
+#include "../Utils/Log.h"
#include "tao/ORBInitializer_Registry.h"
@@ -17,6 +18,7 @@ namespace FTRTEC
{
namespace {
auto_ptr<Replication_Strategy> replication_strategy;
+ int threads = 1;
Replication_Service* service;
}
@@ -43,14 +45,30 @@ namespace FTRTEC
return 0;
initialized = 1;
-
- Replication_Strategy* strategy;
+ bool ami = false;
// Parse any service configurator parameters.
- if (argc > 0 && ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("AMI")) == 0)
- ACE_NEW_RETURN (strategy, AMI_Replication_Strategy, -1);
- else
- ACE_NEW_RETURN (strategy, Basic_Replication_Strategy, -1);
+ while (argc > 0) {
+ if (ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("AMI")) ==0 )
+ ami = true;
+ if (ACE_OS::strcasecmp (argv[0], ACE_LIB_TEXT("-threads")) ==0 && argc > 1) {
+ FTRTEC::threads = ACE_OS::atoi(argv[1]);
+ if (FTRTEC::threads ==0 )
+ FTRTEC::threads = 1;
+ ++argv; --argc;
+ }
+ ++argv; --argc;
+ }
+
+ Replication_Strategy* strategy;
+ if (ami) {
+ ACE_NEW_RETURN (strategy, AMI_Replication_Strategy(threads() > 1), -1);
+ TAO_FTRTEC::Log(3, "AMI replication strategy\n");
+ }
+ else {
+ ACE_NEW_RETURN (strategy, Basic_Replication_Strategy(threads() > 1), -1);
+ TAO_FTRTEC::Log(3, "Basic replication strategy\n");
+ }
ACE_AUTO_PTR_RESET (replication_strategy, strategy, Replication_Strategy);
@@ -120,27 +138,38 @@ namespace FTRTEC
ACE_ENV_ARG_PARAMETER);
}
+ void Replication_Service::add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL)
+ {
+ replication_strategy->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER);
+ }
+
int Replication_Service::acquire_read (void)
{
int r = replication_strategy->acquire_read();
- ACE_DEBUG((LM_DEBUG, "Read Lock acquired %d\n", r));
+ TAO_FTRTEC::Log(3, "Read Lock acquired %d\n", r);
return r;
}
int Replication_Service::acquire_write (void)
{
int r= replication_strategy->acquire_write();
- ACE_DEBUG((LM_DEBUG, "Write Lock acqured %d\n", r));
+ TAO_FTRTEC::Log(3, "Write Lock acqured %d\n", r);
return r;
}
int Replication_Service::release (void)
{
int r= replication_strategy->release();
- ACE_DEBUG((LM_DEBUG, "Lock Released %d\n", r));
+ TAO_FTRTEC::Log(3, "Lock Released %d\n", r);
return r;
}
+ int Replication_Service::threads() const {
+ return FTRTEC::threads;
+ }
+
ACE_FACTORY_DEFINE (TAO_FTRTEC, Replication_Service)
ACE_STATIC_SVC_DEFINE (Replication_Service,
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h
index ace8f3bbed0..cc204aca9a3 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Service.h
@@ -25,8 +25,9 @@
namespace FTRTEC
{
- class Replication_Service : public TAO_FTEC_Become_Primary_Listener,
- public ACE_Service_Object
+ class TAO_FTRTEC_Export Replication_Service
+ : public TAO_FTEC_Become_Primary_Listener
+ , public ACE_Service_Object
{
public:
static Replication_Service* instance();
@@ -39,18 +40,37 @@ namespace FTRTEC
virtual void become_primary();
+ /**
+ * Used for checking if the incoming replication message is out of sequence.
+ */
void check_validity(ACE_ENV_SINGLE_ARG_DECL);
typedef void (FtRtecEventChannelAdmin::EventChannelFacade::*RollbackOperation)
(const FtRtecEventChannelAdmin::ObjectId& ACE_ENV_ARG_DECL_WITH_DEFAULTS);
+ /**
+ * Replicate a request.
+ *
+ * @param state The request to be replicated.
+ * @param rollback The rollback operation when the replication failed.
+ */
void replicate_request(const FtRtecEventChannelAdmin::Operation& update,
RollbackOperation rollback
ACE_ENV_ARG_DECL);
+ /**
+ * Inform the backup replicas that a new replica has joined the object
+ * group.
+ */
+ void add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL);
+
int acquire_read (void);
int acquire_write (void);
int release (void);
+
+ int threads() const;
};
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h
index e3bbb090024..eef6209efc9 100644
--- a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Replication_Strategy.h
@@ -19,6 +19,11 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+namespace FTEC {
+ struct ManagerInfo;
+};
+
class TAO_FTEC_Event_Channel_Impl;
class Replication_Strategy
{
@@ -26,17 +31,36 @@ public:
Replication_Strategy();
virtual ~Replication_Strategy();
+ /**
+ * Check if the incoming set_update() request is out of sequence. This is only
+ * used for basic replication strategy. It throws FTRT::OutOfSequence when the
+ * incoming request is not valid.
+ */
virtual void check_validity(ACE_ENV_SINGLE_ARG_DECL);
typedef void (FtRtecEventChannelAdmin::EventChannelFacade::*RollbackOperation)
(const FtRtecEventChannelAdmin::ObjectId& ACE_ENV_ARG_DECL_WITH_DEFAULTS);
-
+ /**
+ * Replicate a request.
+ *
+ * @param state The request to be replicated.
+ * @param rollback The rollback operation when the replication failed.
+ * @param oid The object id used for rollback operation.
+ */
virtual void replicate_request(const FTRT::State& state,
RollbackOperation rollback,
const FtRtecEventChannelAdmin::ObjectId& oid
ACE_ENV_ARG_DECL)=0;
+ /**
+ * Inform the backup replicas that a new replica has joined the object
+ * group.
+ */
+ virtual void add_member(const FTRT::ManagerInfo & info,
+ CORBA::ULong object_group_ref_version
+ ACE_ENV_ARG_DECL)=0;
+
virtual Replication_Strategy* make_primary_strategy();
virtual int acquire_read (void)=0;