summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp556
1 files changed, 429 insertions, 127 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp b/TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp
index 5e18dcd5421..ce69843d5a6 100644
--- a/TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp
+++ b/TAO/orbsvcs/orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.cpp
@@ -5,172 +5,474 @@
* @file FT_Naming_Replication_Manager.cpp
*
* @author Kevin Stanley <stanleyk@ociweb.com>
+ * @author Phil Mesnier
*/
//=============================================================================
#include "orbsvcs/Log_Macros.h"
#include "orbsvcs/Naming/FaultTolerant/FT_Naming_Replication_Manager.h"
#include "orbsvcs/Naming/FaultTolerant/FT_Naming_Server.h"
-#include "tao/corba.h"
-#include "ace/SStringfwd.h"
+#include "tao/ORB_Core.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/OS_NS_ctype.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Log_Msg.h"
-::FT_Naming::ReplicationManager_var
-TAO_FT_Naming_Replication_Manager::peer_replica_ (0);
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-TAO_FT_Naming_Replication_Manager::TAO_FT_Naming_Replication_Manager (
- TAO_FT_Naming_Server *naming_svr,
- const char* repl_mgr_name)
- : naming_svr_ (naming_svr),
- repl_mgr_name_ (repl_mgr_name)
+FT_Update_Replicant_i::FT_Update_Replicant_i (TAO_FT_Naming_Replication_Manager &owner)
+ :owner_ (owner)
{
}
-
-TAO_FT_Naming_Replication_Manager::~TAO_FT_Naming_Replication_Manager(void)
+void
+FT_Update_Replicant_i::notify_update (CORBA::ULongLong seq_num,
+ FT_Naming::UpdateInfoSeq& info)
{
- this->reference_ = ::FT_Naming::ReplicationManager::_nil ();
+ CORBA::ULongLong expected = ++this->owner_.replica_seq_num_;
+ if (expected < seq_num)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) FT_Update_Replicant_i::notify_updated_entity ")
+ ACE_TEXT ("expected %Lu got %Lu\n"),
+ expected, seq_num ));
+ }
+ this->owner_.replica_seq_num_ = seq_num;
+ }
+ else if (expected > seq_num)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) FT_Update_Replicant_i::notify_updated_entity ")
+ ACE_TEXT ("expected %Lu got %Lu\n"),
+ expected, seq_num ));
+ }
+ --this->owner_.replica_seq_num_;
+ }
+ this->owner_.server_.update_info (info);
}
void
-TAO_FT_Naming_Replication_Manager::initialize (CORBA::ORB_ptr orb,
- PortableServer::POA_ptr repl_mgr_poa)
+FT_Update_Replicant_i::register_peer (FT_Naming::UpdatePushNotifier_ptr peer,
+ const FT_Naming::ReplicaInfo& iors,
+ CORBA::ULongLong seq_num)
{
- ACE_UNUSED_ARG (orb);
- repl_mgr_poa_ = PortableServer::POA::_duplicate (repl_mgr_poa);
- PortableServer::ObjectId_var id =
- PortableServer::string_to_ObjectId (this->repl_mgr_name_.c_str ());
- CORBA::Object_var obj = repl_mgr_poa_->id_to_reference (id.in ());
- this->reference_ = ::FT_Naming::ReplicationManager::_narrow (obj.in ());
+ this->owner_.peer_ = FT_Naming::UpdatePushNotifier::_duplicate (peer);
+ this->owner_.replica_seq_num_ = seq_num;
+ this->owner_.server_.update_iors (iors);
}
-::FT_Naming::ReplicaInfo*
-TAO_FT_Naming_Replication_Manager::register_replica (
- ::FT_Naming::ReplicationManager_ptr replica,
- const ::FT_Naming::ReplicaInfo & replica_info)
+//---------------------------------------------------------------------------
+
+TAO_FT_Naming_Replication_Manager::TAO_FT_Naming_Replication_Manager (TAO_FT_Naming_Server &owner)
+ : me_ (),
+ peer_ (),
+ seq_num_ (0),
+ replica_seq_num_ (0),
+ server_ (owner),
+ orb_ (),
+ reactor_ (0),
+ lock_ (),
+ notified_ (false),
+ to_send_ (10),
+ endpoint_ (owner.ft_endpoint ()),
+ update_delay_ (owner.ft_update_delay ()),
+ replica_ior_ ()
{
- ACE_TRACE ( ACE_TEXT("TAO_FT_Naming_Replication_Manager::register_replica"));
+}
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX,
- ace_mon,
- this->lock_,
- CORBA::INTERNAL ());
+TAO_FT_Naming_Replication_Manager::~TAO_FT_Naming_Replication_Manager(void)
+{
+}
- // Store a copy of the provided reference and other ReplicaInfo
- peer_replica_ = ::FT_Naming::ReplicationManager::_duplicate (replica);
+void
+TAO_FT_Naming_Replication_Manager::init_orb (void)
+{
+ int argc = 8;
+ ACE_TCHAR *argv[11];
+ argv[0] = ACE_OS::strdup (ACE_TEXT (""));
+ argv[1] = ACE_OS::strdup (ACE_TEXT ("-ORBIgnoreDefaultSvcConfFile"));
+ argv[2] = ACE_OS::strdup (ACE_TEXT ("-ORBGestalt"));
+ argv[3] = ACE_OS::strdup (ACE_TEXT ("Local"));
+ argv[4] = ACE_OS::strdup (ACE_TEXT ("-ORBSvcConfDirective"));
+ argv[5] = ACE_OS::strdup (ACE_TEXT ("static Client_Strategy_Factory \"-ORBConnectStrategy Blocked -ORBDefaultSyncScope server\""));
+ argv[6] = ACE_OS::strdup (ACE_TEXT ("-ORBDottedDecimalAddresses"));
+ argv[7] = ACE_OS::strdup (ACE_TEXT ("1"));
+ if (endpoint_.length ())
+ {
+ argv[8] = ACE_OS::strdup (ACE_TEXT ("-ORBListenEndpoints"));
+ argv[9] = ACE_OS::strdup (ACE_TEXT_CHAR_TO_TCHAR (endpoint_.c_str ()));
+ argc += 2;
+ }
+ else
+ {
+ argv[8] = 0;
+ argv[9] = 0;
+ }
+ argv[10] = 0;
+
+ this->orb_ = CORBA::ORB_init (argc, argv, "replicator_orb");
+ this->reactor_ = this->orb_->orb_core ()->reactor ();
+ CORBA::Object_var obj = this->orb_->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var poa = PortableServer::POA::_narrow (obj.in ());
+ PortableServer::ServantBase_var servant;
+ ACE_NEW (servant, FT_Update_Replicant_i (*this));
+ PortableServer::ObjectId_var oid = poa->activate_object (servant.in ());
+ obj = poa->id_to_reference (oid.in ());
+ this->me_ =
+ FT_Naming::UpdatePushNotifier::_narrow (obj.in ());
+ PortableServer::POAManager_var mgr = poa->the_POAManager ();
+ mgr->activate ();
+}
+
+char *
+TAO_FT_Naming_Replication_Manager::ior (void)
+{
+ return this->orb_->object_to_string (this->me_.in ());
+}
- // Store the provided peer references
- this->naming_svr_->peer_root_context (replica_info.root_context);
- this->naming_svr_->peer_naming_manager (replica_info.naming_manager);
+bool
+TAO_FT_Naming_Replication_Manager::peer_available (void)
+{
+ return !CORBA::is_nil (this->peer_.in ());
+}
- // Return my references to the peer
- ::FT_Naming::ReplicaInfo* my_info = new ::FT_Naming::ReplicaInfo;
+void
+TAO_FT_Naming_Replication_Manager::stop (void)
+{
+ if (this->reactor_ == 0)
+ {
+ return;
+ }
+ this->orb_->shutdown (true);
+ this->wait();
+}
- my_info->root_context = this->naming_svr_->my_root_context ();
+int
+TAO_FT_Naming_Replication_Manager::svc (void)
+{
+ if (this->reactor_ == 0)
+ {
+ return 0;
+ }
+ try
+ {
+
+ this->orb_->run ();
+ }
+ catch (const CORBA::SystemException &ex)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) TAO_FT_Naming_Replication_Manager::svc caught %C\n"),
+ ex._name ()));
+ }
+ }
+ return 0;
+}
- my_info->naming_manager = this->naming_svr_->my_naming_manager ();
- return my_info;
+int
+TAO_FT_Naming_Replication_Manager::handle_exception (ACE_HANDLE )
+{
+ ACE_OS::sleep (this->update_delay_);
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, -1);
+ if (!this->peer_available())
+ {
+ mon.release ();
+ if (!this->peer_init_i ())
+ {
+ this->server_.no_replica();
+ }
+ mon.acquire();
+ this->notified_ = false;
+ return 0;
+ }
+ if (this->refresh_peer_)
+ {
+ this->refresh_peer_ = false;
+ mon.release ();
+ if (!this->update_peer_registration())
+ {
+ mon.acquire ();
+ this->notified_ = false;
+ return 0;
+ }
+ mon.acquire ();
+ }
+
+ if (this->to_send_.length () == 0)
+ {
+ this->notified_ = false;
+ return 0;
+ }
+ if (CORBA::is_nil (this->peer_))
+ {
+ this->to_send_.length (0);
+ this->notified_ = false;
+ return 0;
+ }
+ try
+ {
+ CORBA::Long len = this->to_send_.length ();
+ FT_Naming::UpdateInfoSeq payload (len);
+ payload.length (len);
+ CORBA::Long p = 0;
+ for (CORBA::Long l = 0; l < len; l++)
+ {
+ payload[p++] = this->to_send_[l];
+ }
+ payload.length (p);
+ this->to_send_.length (0);
+ CORBA::ULongLong seq = ++this->seq_num_;
+ mon.release ();
+
+ this->peer_->notify_update (seq, payload);
+ }
+ catch (const CORBA::Exception &)
+ {
+ this->peer_ = FT_Naming::UpdatePushNotifier::_nil ();
+ }
+ this->notified_ = false;
+ return 0;
}
void
-TAO_FT_Naming_Replication_Manager::notify_updated_object_group (
- const ::FT_Naming::ObjectGroupUpdate & group_info)
+TAO_FT_Naming_Replication_Manager::send_objgrp_update (PortableGroup::ObjectGroupId id,
+ FT_Naming::ChangeType change)
{
- ACE_TRACE ( ACE_TEXT ("TAO_FT_Naming_Replication_Manager::")
- ACE_TEXT ("notify_updated_object_group"));
-
- // Make sure that we have a valid naming server
- ACE_ASSERT (naming_svr_);
- int result = this->naming_svr_->update_object_group (group_info);
+ if (this->reactor_ == 0)
+ {
+ return;
+ }
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ CORBA::Long len = this->to_send_.length ();
+ bool found = false;
+ for (CORBA::Long i = len -1; !found && i >= 0; i--)
+ {
+ if (this->to_send_[i]._d() == FT_Naming::obj_grp &&
+ this->to_send_[i].grp().id == id)
+ {
+ found = true;
+ FT_Naming::ChangeType pending = this->to_send_[i].grp().change_type;
+ if (pending == FT_Naming::DELETED)
+ {
+ if (change == FT_Naming::UPDATED)
+ {
+ // shouldn't have an update after a deleted
+ change = FT_Naming::DELETED;
+ }
+ else if (change == FT_Naming::NEW)
+ {// put both in cueue
+ found = false;
+ break;
+ }
+ }
+ this->to_send_[i].grp().change_type = change;
+ }
+ }
+ if (!found)
+ {
+ this->to_send_.length (len+1);
+ FT_Naming::ObjectGroupUpdate ogu;
+ ogu.id = id;
+ ogu.change_type = change;
+ this->to_send_[len].grp(ogu);
+ }
+ if (!this->notified_)
+ {
+ this->notified_ = true;
+ this->reactor_->notify (this);
+ }
+}
- if (result != 0)
- ORBSVCS_ERROR ((LM_ERROR,
- ACE_TEXT ("Unable to update object group.\n")));
+void
+TAO_FT_Naming_Replication_Manager::send_context_update (const ACE_CString & name,
+ FT_Naming::ChangeType change)
+{
+ if (this->reactor_ == 0)
+ {
+ return;
+ }
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ CORBA::Long len = this->to_send_.length ();
+ bool found = false;
+
+ for (CORBA::Long i = len -1; !found && i >= 0; i--)
+ {
+ if (this->to_send_[i]._d() == FT_Naming::ctx_name &&
+ ACE_OS::strcmp (this->to_send_[i].ctx().context_name,
+ name.c_str()) == 0)
+ {
+ found = true;
+ FT_Naming::ChangeType pending = this->to_send_[i].ctx().change_type;
+ if (pending == FT_Naming::DELETED)
+ {
+ if (change == FT_Naming::UPDATED)
+ {
+ // shouldn't have an update after a deleted
+ change = FT_Naming::DELETED;
+ }
+ else if (change == FT_Naming::NEW)
+ {// put both in cueue
+ found = false;
+ break;
+ }
+ }
+ this->to_send_[i].ctx().change_type = change;
+ }
+ }
+
+ if (!found)
+ {
+ this->to_send_.length (len+1);
+ FT_Naming::NamingContextUpdate ncu;
+ ncu.context_name = CORBA::string_dup (name.c_str());
+ ncu.change_type = change;
+ this->to_send_[len].ctx(ncu);
+ }
+ if (!this->notified_)
+ {
+ this->notified_ = true;
+ this->reactor_->notify (this);
+ }
}
void
-TAO_FT_Naming_Replication_Manager::notify_updated_context (
- const ::FT_Naming::NamingContextUpdate & context_info)
+TAO_FT_Naming_Replication_Manager::send_registration (bool use_combo_refs)
{
- ACE_TRACE (ACE_TEXT ("TAO_FT_Naming_Replication_Manager::")
- ACE_TEXT ("notify_updated_context"));
- ACE_ASSERT (naming_svr_);
- int result = this->naming_svr_->update_naming_context (context_info);
- if (result != 0)
- {
- ORBSVCS_ERROR ((LM_ERROR,
- ACE_TEXT ("Error while updating naming context.\n")));
- }
+ if (TAO_debug_level > 1)
+ {
+ ORBSVCS_DEBUG((LM_INFO,
+ ACE_TEXT("(%P|%t) Registering with peer naming server \n")));
+ }
+ if (this->reactor_ == 0)
+ {
+ return;
+ }
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->send_combos_ = use_combo_refs;
+ if (!this->notified_)
+ {
+ this->refresh_peer_ = true;
+ this->notified_ = true;
+ this->reactor_->notify (this);
+ }
}
-::FT_Naming::ReplicationManager_ptr
-TAO_FT_Naming_Replication_Manager::peer_replica (void)
+bool
+TAO_FT_Naming_Replication_Manager::peer_init_i (void)
{
- ACE_TRACE (ACE_TEXT ("TAO_FT_Naming_Replication_Manager::peer_replica"));
- // Return a copy of the stored peer to the requester
- return ::FT_Naming::ReplicationManager::_duplicate (peer_replica_.in ());
+ CORBA::Object_var obj = this->orb_->string_to_object (replica_ior_.c_str());
+ bool non_exist = true;
+ bool was_nil = CORBA::is_nil (this->peer_.in());
+ if (!CORBA::is_nil (obj.in ()))
+ {
+ try
+ {
+ this->peer_ = FT_Naming::UpdatePushNotifier::_narrow (obj.in());
+ non_exist = (this->peer_->_non_existent() == 1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) FT_Repl_mgr::peer_init_i ")
+ ACE_TEXT ("caught %C\n"), ex._name()));
+ }
+ }
+ }
+ if (non_exist)
+ {
+ this->peer_ = FT_Naming::UpdatePushNotifier::_nil();
+ return was_nil;
+ }
+
+ if (this->peer_available ())
+ {
+ if (!this->update_peer_registration ())
+ {
+ this->peer_ = FT_Naming::UpdatePushNotifier::_nil ();
+ return was_nil;
+ }
+ }
+ else
+ {
+ return was_nil;
+ }
+
+ if (this->to_send_.length () > 0)
+ {
+ this->notified_ = true;
+ this->reactor_->notify (this);
+ }
+ return true;
}
-int
-TAO_FT_Naming_Replication_Manager::register_with_peer_replica (
- ::FT_Naming::ReplicationManager_ptr replica,
- CosNaming::NamingContext_ptr nc,
- ::FT_Naming::NamingManager_ptr nm)
-{
- ACE_TRACE (ACE_TEXT ("TAO_FT_Naming_Replication_Manager::")
- ACE_TEXT ("register_with_peer_replica"));
-
- int result = 0;
- ::FT_Naming::ReplicaInfo my_info;
- { // Guard the access to the Replication Manager state
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX,
- ace_mon,
- this->lock_,
- CORBA::INTERNAL ());
-
- // Store a copy of the peer reference for future access
- this->peer_replica_ =
- ::FT_Naming::ReplicationManager::_duplicate (replica);
-
- my_info.root_context = CosNaming::NamingContext::_duplicate (nc);
- my_info.naming_manager = ::FT_Naming::NamingManager::_duplicate (nm);
- }
-
- try {
- ::FT_Naming::ReplicationManager_var my_ref =
- this->reference ();
-
- // Register with the peer replica
- ::FT_Naming::ReplicaInfo_var peer_info =
- this->peer_replica_->register_replica (my_ref.in (),
- my_info);
-
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX,
- ace_mon,
- this->lock_,
- CORBA::INTERNAL ());
-
- // Store the returned references locally
- this->naming_svr_->peer_root_context (peer_info->root_context. in ());
- this->naming_svr_->peer_naming_manager (peer_info->naming_manager.in ());
- }
- catch (const CORBA::Exception& ex) {
- // Unable to contact the peer replica.
- if (TAO_debug_level > 1)
- ex._tao_print_exception (
- ACE_TEXT ("TAO_FT_Naming_Replication_Manager::")
- ACE_TEXT ("register_with_peer_replica\n"));
- result = -1;
- }
-
- return result;
-
-}
-
-::FT_Naming::ReplicationManager_ptr
-TAO_FT_Naming_Replication_Manager::reference (void)
-{
- ACE_TRACE (ACE_TEXT ("TAO_FT_Naming_Replication_Manager::reference"));
- return ::FT_Naming::ReplicationManager::_duplicate (reference_.in ());
+bool
+TAO_FT_Naming_Replication_Manager::update_peer_registration (void)
+{
+ FT_Naming::ReplicaInfo info;
+ info.combined = this->send_combos_;
+ if (this->send_combos_)
+ {
+ info.root_context = this->server_.ft_root_context ();
+ info.naming_manager = this->server_.ft_naming_manager ();
+ this->send_combos_ = false;
+ }
+ else
+ {
+ info.root_context = this->server_.my_root_context ();
+ info.naming_manager = this->server_.my_naming_manager ();
+ }
+ bool success = false;
+ try
+ {
+ this->peer_->register_peer (this->me_, info, this->seq_num_);
+ success = true;
+ }
+ catch (const CORBA::Exception &)
+ {
+ }
+ return success;
+}
+
+
+bool
+TAO_FT_Naming_Replication_Manager::init_peer (void)
+{
+ ACE_TString replica_ior_file = this->server_.replica_ior_filename (true);
+
+ if (TAO_debug_level > 1)
+ {
+ ORBSVCS_DEBUG ((LM_INFO,
+ ACE_TEXT("Resolving Naming replica from file %C\n"),
+ replica_ior_file.c_str()));
+ }
+
+ if (ACE_OS::access (replica_ior_file.c_str (), F_OK) != 0)
+ {
+ this->peer_ =
+ FT_Naming::UpdatePushNotifier::_nil();
+ return false;
+ }
+
+ this->replica_ior_ = ACE_CString ("file://") +
+#if defined (ACE_USES_WCHAR)
+ ACE_CString (ACE_TEXT_ALWAYS_CHAR (replica_ior_file.c_str()));
+#else
+ replica_ior_file;
+#endif /* ACE_USES_WCHAR */
+ return true;
}
+
+TAO_END_VERSIONED_NAMESPACE_DECL