summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp
diff options
context:
space:
mode:
authorstanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-02-05 21:11:03 +0000
committerstanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-02-05 21:11:03 +0000
commit5e030faf84086ab02059fcbcc3faed224bd57b95 (patch)
tree3a62df45ac6ccf599fb07cf6a03d672456ce2e3d /TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp
parent9d296f7fa51116ff7040ecb2ad18612cd94b5fd1 (diff)
downloadATCD-5e030faf84086ab02059fcbcc3faed224bd57b95.tar.gz
Merge in OCI_Reliability_Enhancements branch.
Diffstat (limited to 'TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp')
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp1366
1 files changed, 1366 insertions, 0 deletions
diff --git a/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp b/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp
new file mode 100644
index 00000000000..b4cbb0dafff
--- /dev/null
+++ b/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp
@@ -0,0 +1,1366 @@
+// $Id$
+
+#include "Shared_Backing_Store.h"
+#include "Server_Info.h"
+#include "Activator_Info.h"
+#include "utils.h"
+#include "Locator_XMLHandler.h"
+#include "ImR_LocatorC.h"
+#include "ace/File_Lock.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/OS_NS_ctype.h"
+#include "ace/OS_NS_unistd.h"
+#include "ACEXML/parser/parser/Parser.h"
+#include "ACEXML/common/FileCharStream.h"
+#include "ACEXML/common/XML_Util.h"
+#include "tao/IORManipulation/IORManip_Loader.h"
+
+namespace {
+ class Lockable_File
+ {
+ public:
+ Lockable_File()
+ : file_(0),
+ flags_(0),
+ locked_(false),
+ unlink_in_destructor_(false)
+ {
+ }
+
+ Lockable_File(const ACE_TString& file,
+ const int flags,
+ bool unlink_in_destructor = false)
+ : file_(0),
+ flags_(0),
+ locked_(false),
+ unlink_in_destructor_(false)
+ {
+ init_fl(file, flags, unlink_in_destructor);
+ }
+
+ ~Lockable_File()
+ {
+ release();
+ }
+
+ void release()
+ {
+ if (this->file_ == 0)
+ return;
+
+ close_file();
+ this->file_lock_.reset();
+ this->locked_ = false;
+ }
+
+ FILE* get_file()
+ {
+ lock();
+
+ return this->file_;
+ }
+
+ FILE* get_file(const ACE_TString& file,
+ const int flags,
+ bool unlink_in_destructor = false)
+ {
+ init_fl(file, flags, unlink_in_destructor);
+ return get_file();
+ }
+
+ private:
+ void init_fl(const ACE_TString& file,
+ const int flags,
+ bool unlink_in_destructor = false)
+ {
+ release();
+
+ flags_ = flags | O_CREAT;
+ unlink_in_destructor_ = unlink_in_destructor;
+
+ const ACE_TCHAR* const flags_str =
+ ((flags_ & O_RDWR) != 0) ? ACE_TEXT("r+") :
+ (((flags_ & O_WRONLY) != 0) ? ACE_TEXT("w") : ACE_TEXT("r"));
+#ifdef ACE_WIN32
+ this->filename_ = file;
+ this->file_ = ACE_OS::fopen(file.c_str(), flags_str);
+#else
+ this->file_lock_.reset(
+ new ACE_File_Lock(ACE_TEXT_CHAR_TO_TCHAR(file.c_str ()),
+ flags_,
+ 0666,
+ unlink_in_destructor));
+
+ // Truncating output so this will not allow reading then writing
+
+ ACE_OS::ftruncate(this->file_lock_->get_handle(), 0);
+ this->file_ = ACE_OS::fdopen(this->file_lock_->get_handle(), flags_str);
+#endif
+ }
+
+ void close_file()
+ {
+ if (this->file_ == 0)
+ return;
+
+ ACE_OS::fflush(this->file_);
+ ACE_OS::fclose(this->file_);
+ this->file_ = 0;
+#ifdef ACE_WIN32
+ if (this->unlink_in_destructor_)
+ {
+ ACE_OS::unlink(this->filename_.c_str());
+ this->unlink_in_destructor_ = false;
+ }
+#endif
+ }
+
+ void lock()
+ {
+#ifndef ACE_WIN32
+ if (this->locked_)
+ return;
+
+ if ((this->flags_ & O_RDWR) != 0)
+ file_lock_->acquire();
+ if ((this->flags_ & O_WRONLY) != 0)
+ file_lock_->acquire_write();
+ else
+ file_lock_->acquire_read();
+
+ this->locked_ = true;
+#endif
+ }
+
+ auto_ptr<ACE_File_Lock> file_lock_;
+ FILE* file_;
+ int flags_;
+ bool locked_;
+ bool unlink_in_destructor_;
+ ACE_TString filename_;
+ };
+}
+
+Shared_Backing_Store::Shared_Backing_Store(const Options& opts,
+ CORBA::ORB_ptr orb)
+: XML_Backing_Store(opts, orb, true),
+ listing_file_(opts.persist_file_name() + ACE_TEXT("imr_listing.xml")),
+ seq_num_(0),
+ replica_seq_num_(0),
+ imr_type_(opts.imr_type()),
+ sync_needed_(NO_SYNC),
+ repo_id_(1),
+ repo_values_(2)
+{
+ IMR_REPLICA[Options::PRIMARY_IMR] = ACE_TEXT ("ImR_ReplicaPrimary");
+ IMR_REPLICA[Options::BACKUP_IMR] = ACE_TEXT ("ImR_ReplicaBackup");
+ IMR_REPLICA[Options::STANDALONE_IMR] = ACE_TEXT ("ImR_NoReplica");
+
+ this->repo_values_[REPO_TYPE] =
+ std::make_pair(ACE_CString(ACE_TEXT ("repo_type")),
+ ACE_CString());
+ this->repo_values_[REPO_ID] =
+ std::make_pair(ACE_CString(ACE_TEXT ("repo_id")),
+ ACE_CString());
+}
+
+Shared_Backing_Store::~Shared_Backing_Store()
+{
+}
+
+
+static void replicate(
+ Shared_Backing_Store::Replica_ptr replica,
+ const ImplementationRepository::ServerUpdate& update)
+{
+ // replicate the ServerUpdate to our replicated locator
+ replica->notify_updated_server(update);
+}
+
+static void replicate(
+ Shared_Backing_Store::Replica_ptr replica,
+ const ImplementationRepository::ActivatorUpdate& update)
+{
+ // replicate the ActivatorUpdate to our replicated locator
+ replica->notify_updated_activator(update);
+}
+
+static void set_key(ImplementationRepository::ServerUpdate& update,
+ const Locator_Repository::SIMap::KEY& key)
+{
+ update.name = key.c_str();
+}
+
+static void set_key(ImplementationRepository::ActivatorUpdate& update,
+ const Locator_Repository::AIMap::KEY& key)
+{
+ update.name = key.c_str();
+}
+
+template< typename Update, typename Map>
+static void replicate(
+ Shared_Backing_Store::Replica_ptr replica,
+ const typename Map::ENTRY& entry,
+ const ImplementationRepository::UpdateType type,
+ const ImplementationRepository::SequenceNum seq_num)
+{
+ if (CORBA::is_nil (replica))
+ {
+ return;
+ }
+
+ try
+ {
+ Update update;
+ set_key(update, entry.ext_id_);
+ update.type = type;
+ update.seq_num = seq_num;
+ update.repo_id = entry.int_id_.repo_id;
+ update.repo_type = entry.int_id_.repo_type;
+ replicate(replica, update);
+ }
+ catch (const CORBA::COMM_FAILURE&)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Replicated ImR: COMM_FAILURE Exception\n")));
+ }
+ catch (const CORBA::TRANSIENT&)
+ {
+ ACE_DEBUG ((
+ LM_DEBUG,
+ ACE_TEXT("(%P|%t) Replicated ImR: TRANSIENT Exception\n")));
+ }
+ catch (const CORBA::OBJECT_NOT_EXIST&)
+ {
+ ACE_DEBUG ((
+ LM_DEBUG,
+ ACE_TEXT("(%P|%t) Replicated ImR: OBJECT_NOT_EXIST ")
+ ACE_TEXT("Exception, dropping replica\n")));
+ replica = TAO::Objref_Traits
+ <ImplementationRepository::UpdatePushNotification>::nil ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception (ACE_TEXT("Replicated ImR: notify update"));
+ replica = 0;
+ }
+}
+
+template<typename Map>
+static typename Map::ENTRY& unique_id(
+ const typename Map::KEY& key,
+ Map& unique_ids,
+ const Shared_Backing_Store::UniqueId& id)
+{
+ typename Map::ENTRY* entry = 0;
+ unique_ids.bind(key, id, entry);
+
+ return *entry;
+}
+
+static Shared_Backing_Store::UniqueId create_uid(
+ const Options::ImrType repo_type,
+ const unsigned int repo_id)
+{
+ Shared_Backing_Store::UniqueId id;
+ id.repo_id = repo_id;
+ id.repo_type = repo_type;
+ ACE_TCHAR id_str[50];
+ ACE_OS::itoa((unsigned int)repo_type, id_str, 10);
+
+ id.repo_type_str = id_str;
+
+ size_t current = ACE_OS::strlen(id_str);
+ id_str[current++] = ACE_TEXT('_');
+ ACE_OS::itoa(repo_id, &(id_str[current]), 10);
+
+ id.repo_id_str = &(id_str[current]);
+
+ current = ACE_OS::strlen(id_str);
+ id_str[current++] = ACE_TEXT('.');
+ id_str[current++] = ACE_TEXT('x');
+ id_str[current++] = ACE_TEXT('m');
+ id_str[current++] = ACE_TEXT('l');
+ id_str[current++] = ACE_TEXT('\0');
+ id.unique_filename = id_str;
+
+ return id;
+}
+
+template<typename Map>
+static const typename Map::ENTRY& unique_id(
+ const typename Map::KEY& key,
+ Map& unique_ids,
+ const Options::ImrType type,
+ unsigned int& next_repo_id)
+{
+ typename Map::ENTRY* entry;
+ if (unique_ids.find(key, entry) == 0)
+ return *entry;
+ const unsigned int repo_id = next_repo_id++;
+ Shared_Backing_Store::UniqueId id = create_uid(type, repo_id);
+ return unique_id(key, unique_ids, id);
+}
+
+static ACE_CString identify_key(const ACE_CString& name)
+{
+ ACE_CString id("name=");
+ id += name;
+ return id;
+}
+
+/* TODO: bdj - will need this when server container changed to name and id
+static ACE_CString identify_key(const Shared_Backing_Store::ServerKey& name)
+{
+ ACE_CString id("name=");
+ id += info.name;
+ return id;
+}
+*/
+template<typename Map>
+static const typename Map::ENTRY& unique_id(
+ const typename Map::KEY& key,
+ Map& unique_ids,
+ const Options::ImrType this_repo_type,
+ unsigned int& this_repo_id,
+ Options::ImrType& entry_repo_type,
+ unsigned int& entry_repo_id)
+{
+ typename Map::ENTRY* temp_entry;
+ const bool found = (unique_ids.find(key, temp_entry) == 0);
+
+ Shared_Backing_Store::UniqueId temp_id =
+ create_uid(entry_repo_type, entry_repo_id);
+ typename Map::ENTRY& entry = unique_id(key, unique_ids, temp_id);
+
+ if (entry_repo_id == 0)
+ {
+ // if no repo id provided, treat it like it came from this repo
+ entry_repo_id = this_repo_id++;
+ entry_repo_type = this_repo_type;
+ }
+ else if (found)
+ {
+ Shared_Backing_Store::UniqueId& id = entry.int_id_;
+ if (entry_repo_id != id.repo_id &&
+ entry_repo_type != id.repo_type)
+ {
+ // if already existed, replace the contents
+ ACE_ERROR((
+ LM_ERROR,
+ ACE_TEXT("(%P|%t) ERROR: replacing %C with existing repo_id=%d ")
+ ACE_TEXT("and imr_type=%d, with repo_id=%d and imr_type=%d\n"),
+ identify_key(key).c_str(), id.repo_id, id.repo_type,
+ entry_repo_id, entry_repo_type));
+ id = temp_id;
+ }
+ }
+
+ if (entry_repo_type == this_repo_type && entry_repo_id >= this_repo_id)
+ {
+ // persisting existing entries for this repo, so move the repo_id past
+ // the entries id
+ this_repo_id = entry_repo_id + 1;
+ }
+
+ return entry;
+}
+
+template< typename Update, typename Map>
+static int remove(
+ const typename Map::KEY& key,
+ const Map& unique_ids,
+ const ACE_TString& path,
+ Lockable_File& listing_lf,
+ Shared_Backing_Store::Replica_ptr peer_replica,
+ ImplementationRepository::SequenceNum& seq_num)
+{
+ typename Map::ENTRY* entry;
+ const int err = unique_ids.find(key, entry);
+ if (err != 0)
+ {
+ ACE_ERROR((
+ LM_ERROR,
+ ACE_TEXT("(%P|%t) Couldn't find unique repo id for %C\n"),
+ identify_key(key).c_str()));
+ return err;
+ }
+
+ const ACE_TString fname = path + entry->int_id_.unique_filename;
+
+ {
+ // take the lock, then remove the file
+ Lockable_File file(fname, O_WRONLY, true);
+ }
+ listing_lf.release();
+
+ replicate<Update, Map>
+ (peer_replica, *entry, ImplementationRepository::repo_remove, ++seq_num);
+
+ return 0;
+}
+
+int
+Shared_Backing_Store::persistent_remove (const ACE_CString& name,
+ bool activator)
+{
+ Lockable_File listing_lf;
+ int err = persist_listings(listing_lf);
+ if (err != 0)
+ {
+ return err;
+ }
+
+ if (activator)
+ {
+ err = remove<ImplementationRepository::ActivatorUpdate>
+ (name, this->activator_uids_, this->filename_, listing_lf,
+ this->peer_replica_, this->seq_num_);
+ }
+ else
+ {
+ err = remove<ImplementationRepository::ServerUpdate>
+ (name, this->server_uids_, this->filename_, listing_lf,
+ this->peer_replica_, this->seq_num_);
+ }
+ return err;
+}
+
+int
+Shared_Backing_Store::persistent_update(const Server_Info_Ptr& info, bool add)
+{
+ Lockable_File listing_lf;
+ if (add)
+ {
+ const int err = persist_listings(listing_lf);
+ if (err != 0)
+ {
+ return err;
+ }
+ }
+
+ ACE_CString name = ACEXML_escape_string (info->name);
+
+ const ServerUIMap::ENTRY& entry =
+ unique_id(info->name, this->server_uids_, this->imr_type_, this->repo_id_);
+ const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename;
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO, ACE_TEXT ("Persisting to %s(%C)\n"),
+ fname.c_str(), info->name.c_str()));
+ }
+ Lockable_File server_file(fname, O_WRONLY);
+ const ACE_TString bfname = fname.c_str() + ACE_TString(".bak");
+ FILE* fp = server_file.get_file();
+ if (fp == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"),
+ fname.c_str()));
+ return -1;
+ }
+ // successfully added file (if adding), so release the listing file lock
+ listing_lf.release();
+ ACE_OS::fprintf (fp,"<?xml version=\"1.0\"?>\n");
+
+ this->repo_values_[REPO_TYPE].second = entry.int_id_.repo_type_str;
+ this->repo_values_[REPO_ID].second = entry.int_id_.repo_id_str;
+
+ persist(fp, *info, "", this->repo_values_);
+
+ // Copy the current file to a backup.
+ FILE* bfp = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w"));
+ ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n");
+ persist(bfp, *info, "", this->repo_values_);
+ ACE_OS::fflush(bfp);
+ ACE_OS::fclose(bfp);
+ server_file.release();
+
+ const ImplementationRepository::UpdateType type = add ?
+ ImplementationRepository::repo_add :
+ ImplementationRepository::repo_update;
+ replicate<ImplementationRepository::ServerUpdate, ServerUIMap>
+ (peer_replica_, entry, type, ++seq_num_);
+ return 0;
+}
+
+
+int
+Shared_Backing_Store::persistent_update(const Activator_Info_Ptr& info,
+ bool add)
+{
+ Lockable_File listing_lf;
+ if (add)
+ {
+ const int err = persist_listings(listing_lf);
+ if (err != 0)
+ {
+ return err;
+ }
+ }
+
+ ACE_CString name = lcase (info->name);
+
+ const ActivatorUIMap::ENTRY& entry =
+ unique_id(name, this->activator_uids_, this->imr_type_, this->repo_id_);
+ const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename;
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO, ACE_TEXT ("Persisting to %s(%C)\n"),
+ fname.c_str(), info->name.c_str()));
+ }
+ Lockable_File activator_file(fname, O_WRONLY);
+ const ACE_TString bfname = fname.c_str() + ACE_TString(".bak");
+ FILE* fp = activator_file.get_file();
+ if (fp == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"),
+ fname.c_str()));
+ return -1;
+ }
+ // successfully added file (if adding), so release the listing file lock
+ listing_lf.release();
+ ACE_OS::fprintf (fp,"<?xml version=\"1.0\"?>\n");
+
+ this->repo_values_[REPO_TYPE].second = entry.int_id_.repo_type_str;
+ this->repo_values_[REPO_ID].second = entry.int_id_.repo_id_str;
+
+ persist(fp, *info, "", this->repo_values_);
+
+ // Copy the current file to a backup.
+ FILE* bfp = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w+"));
+ ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n");
+ persist(bfp, *info, "", this->repo_values_);
+ ACE_OS::fflush(bfp);
+ ACE_OS::fclose(bfp);
+ activator_file.release();
+
+ const ImplementationRepository::UpdateType type = add ?
+ ImplementationRepository::repo_add :
+ ImplementationRepository::repo_update;
+ replicate<ImplementationRepository::ActivatorUpdate, ActivatorUIMap>
+ (peer_replica_, entry, type, ++seq_num_);
+ return 0;
+}
+
+const ACE_TCHAR*
+Shared_Backing_Store::repo_mode() const
+{
+ return this->listing_file_.c_str();
+}
+
+int
+Shared_Backing_Store::connect_replicas (Replica_ptr this_replica)
+{
+ const ACE_TString& replica_ior = replica_ior_filename(true);
+ if (this->opts_.debug() > 1)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT("Resolving ImR replica %s\n"), replica_ior.c_str()));
+ }
+
+ CORBA::Object_var obj =
+ this->orb_->string_to_object (replica_ior.c_str());
+
+ if (!CORBA::is_nil (obj.in ()))
+ {
+ bool non_exist = true;
+ try
+ {
+ this->peer_replica_ = ImplementationRepository::
+ UpdatePushNotification::_narrow (obj.in());
+ non_exist = (this->peer_replica_->_non_existent() == 1);
+ }
+ catch (const CORBA::Exception& )
+ {
+ // let error be handled below
+ }
+
+ if (non_exist)
+ {
+ this->peer_replica_ =
+ ImplementationRepository::UpdatePushNotification::_nil();
+ }
+ }
+
+ if (CORBA::is_nil (this->peer_replica_.in()))
+ {
+ if (this->imr_type_ == Options::BACKUP_IMR)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("Error: No primary ImR replica file found <%s>\n"),
+ replica_ior.c_str()), -1);
+ }
+
+ // no connection currently, just wait for backup
+ return 0;
+ }
+
+ if (opts_.debug() > 1)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT("Registering with previously running ImR replica\n")));
+ }
+
+ try
+ {
+ this->peer_replica_->register_replica(this_replica,
+ this->imr_ior_.inout(),
+ this->replica_seq_num_);
+ }
+ catch (const ImplementationRepository::InvalidPeer& ip)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("Error: obj key <%s> is an invalid ImR replica because %s\n"),
+ replica_ior.c_str(), ip.reason.in()), -1);
+ }
+
+ if (opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT("Initializing repository with ft ior=<%C> ")
+ ACE_TEXT("and replica seq number %d\n"),
+ this->imr_ior_.in(), replica_seq_num_));
+ }
+
+ return 0;
+}
+
+int
+Shared_Backing_Store::init_repo(PortableServer::POA_ptr imr_poa)
+{
+ this->non_ft_imr_ior_ = this->imr_ior_;
+ PortableServer::ObjectId_var id =
+ PortableServer::string_to_ObjectId ("ImR_Replica");
+ imr_poa->activate_object_with_id (id.in (), this);
+
+ if (this->imr_type_ != Options::STANDALONE_IMR)
+ {
+ CORBA::Object_var obj = imr_poa->id_to_reference (id.in ());
+
+ Replica_var this_replica =
+ ImplementationRepository::UpdatePushNotification::_narrow (obj.in());
+ const int err = connect_replicas(this_replica.in());
+ if (err != 0)
+ {
+ return err;
+ }
+ }
+
+ // only start the repo clean if no replica is running
+ if (this->opts_.repository_erase() &&
+ CORBA::is_nil (this->peer_replica_.in ()))
+ {
+ Lockable_File listing_lf;
+ const XMLHandler_Ptr listings = get_listings(listing_lf, false);
+ if (listings.null())
+ {
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT ("Persisted Repository already empty\n")));
+ }
+ }
+ else
+ {
+ const ACE_Vector<ACE_TString>& filenames = listings->filenames();
+ CORBA::ULong sz = filenames.size ();
+ for (CORBA::ULong i = 0; i < sz; ++i)
+ {
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO, ACE_TEXT ("Removing %s\n"),
+ filenames[i].c_str()));
+ }
+ ACE_OS::unlink ( filenames[i].c_str () );
+ }
+
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO, ACE_TEXT ("Removing %s\n"),
+ this->listing_file_.c_str()));
+ }
+ ACE_OS::unlink ( this->listing_file_.c_str () );
+ }
+ }
+
+ // ignore persistent_load return since files don't have to exist
+ persistent_load(false);
+
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT ("ImR Repository initialized\n")));
+ }
+
+ return 0;
+}
+
+int
+Shared_Backing_Store::persistent_load (bool only_changes)
+{
+ Lockable_File listing_lf;
+ const XMLHandler_Ptr listings = get_listings(listing_lf, only_changes);
+ if (listings.null())
+ {
+ // failed to retrieve listings
+ return -1;
+ }
+
+ if (only_changes)
+ {
+ listings->remove_unmatched(*this);
+ }
+
+ const ACE_Vector<ACE_TString>& filenames = listings->filenames();
+ CORBA::ULong sz = filenames.size ();
+ if (this->opts_.debug() > 9)
+ {
+ ACE_DEBUG((LM_INFO, ACE_TEXT ("persistent_load %d files\n"), sz));
+ }
+ for (CORBA::ULong i = 0; i < sz; ++i)
+ {
+ const ACE_TString& fname = filenames[i];
+ Lockable_File file(fname, O_RDONLY);
+
+ if(load(fname, file.get_file()) != 0)
+ {
+ load(fname + ".bak");
+ }
+ }
+
+ return 0;
+}
+
+Shared_Backing_Store::XMLHandler_Ptr
+Shared_Backing_Store::get_listings(Lockable_File& listing_lf,
+ bool only_changes) const
+{
+ XMLHandler_Ptr listings_handler;
+ if (only_changes)
+ {
+ listings_handler.reset(new LocatorListings_XMLHandler(
+ this->filename_, servers(), activators()));
+ }
+ else
+ {
+ listings_handler.reset(new LocatorListings_XMLHandler(this->filename_));
+ }
+
+ if (load(this->listing_file_,
+ *listings_handler,
+ this->opts_.debug(),
+ listing_lf.get_file(this->listing_file_, O_RDONLY)) != 0)
+ {
+
+ if (load(this->listing_file_ + ".bak",
+ *listings_handler,
+ this->opts_.debug()) != 0)
+ {
+ listings_handler.reset();
+ }
+ }
+
+ return listings_handler;
+}
+
+int
+Shared_Backing_Store::sync_load ()
+{
+ int err = 0;
+ if (this->opts_.debug() > 5)
+ {
+ ACE_DEBUG((
+ LM_INFO,
+ ACE_TEXT("(%P|%t) sync_load %d, %d\n"),
+ this->sync_needed_, this->sync_files_.size()));
+ }
+ if (this->sync_needed_ == FULL_SYNC)
+ {
+ err = persistent_load(false);
+ }
+ else if (this->sync_needed_ == INC_SYNC)
+ {
+ std::set<ACE_TString>::const_iterator fname = this->sync_files_.begin();
+ for ( ; fname != this->sync_files_.end(); ++fname)
+ {
+ if (this->opts_.debug() > 6)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT("(%P|%t) sync_load %s\n"),
+ fname->c_str()));
+ }
+ Lockable_File file(*fname, O_RDONLY);
+ int ind_err = load(*fname, file.get_file());
+ if (ind_err != 0)
+ {
+ err = ind_err;
+ }
+ }
+ this->sync_files_.clear();
+ }
+
+ this->sync_needed_ = NO_SYNC;
+ return err;
+}
+
+static void write_listing_item(FILE* list, const ACE_TString& fname,
+ const ACE_CString& name, const ACE_TCHAR* tag)
+{
+ ACE_OS::fprintf (list, "\t<%s", tag);
+ ACE_OS::fprintf (list, " fname=\"%s\"", fname.c_str ());
+ ACE_OS::fprintf (list, " name=\"%s\" />\n", name.c_str ());
+}
+
+void
+Shared_Backing_Store::write_listing(FILE* list)
+{
+ ACE_OS::fprintf (list,"<?xml version=\"1.0\"?>\n");
+ ACE_OS::fprintf (list,"<ImRListing>\n");
+
+ // Save servers
+ Locator_Repository::SIMap::ENTRY* sientry = 0;
+ Locator_Repository::SIMap::CONST_ITERATOR siit (this->servers ());
+ for (; siit.next (sientry); siit.advance() )
+ {
+ const Server_Info_Ptr& info = sientry->int_id_;
+
+ const Shared_Backing_Store::ServerUIMap::ENTRY& entry =
+ unique_id(sientry->ext_id_, this->server_uids_,
+ this->imr_type_, this->repo_id_);
+ ACE_CString listing_name = ACEXML_escape_string (info->name);
+ write_listing_item(list, entry.int_id_.unique_filename, listing_name,
+ Locator_XMLHandler::SERVER_INFO_TAG);
+ }
+
+ // Save Activators
+ Locator_Repository::AIMap::ENTRY* aientry = 0;
+ Locator_Repository::AIMap::CONST_ITERATOR aiit (this->activators ());
+ for (; aiit.next (aientry); aiit.advance ())
+ {
+ const ACE_CString& aname = aientry->ext_id_;
+ const ActivatorUIMap::ENTRY& entry =
+ unique_id(aname, this->activator_uids_,
+ this->imr_type_, this->repo_id_);
+ write_listing_item(list, entry.int_id_.unique_filename, aname,
+ Locator_XMLHandler::ACTIVATOR_INFO_TAG);
+ }
+
+ ACE_OS::fprintf (list,"</ImRListing>\n");
+}
+
+int
+Shared_Backing_Store::persist_listings (Lockable_File& listing_lf)
+{
+ FILE* list = listing_lf.get_file(this->listing_file_, O_WRONLY);
+ if (list == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"),
+ this->listing_file_.c_str()));
+ return -1;
+ }
+
+ write_listing(list);
+
+ const ACE_TString bfname = this->listing_file_.c_str() + ACE_TString(".bak");
+
+ // Write backup file
+ FILE* baklist = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w"));
+ if (baklist == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"),
+ bfname.c_str()));
+ return -1;
+ }
+
+ write_listing(baklist);
+ ACE_OS::fflush(baklist);
+ ACE_OS::fclose(baklist);
+
+ return 0;
+}
+
+int
+Shared_Backing_Store::report_ior(PortableServer::POA_ptr imr_poa)
+{
+ if (this->imr_type_ == Options::STANDALONE_IMR)
+ {
+ return Locator_Repository::report_ior(imr_poa);
+ }
+
+ CORBA::Object_var obj = this->orb_->resolve_initial_references ("IORTable");
+ IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ());
+ ACE_ASSERT (! CORBA::is_nil (ior_table.in ()));
+
+ const char* const replica_name(IMR_REPLICA[this->imr_type_]);
+ ACE_TString replica_filename = replica_ior_filename(false);
+ FILE* fp = ACE_OS::fopen (replica_filename.c_str (), "w");
+ if (fp == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("ImR: Could not open file: %s\n"),
+ replica_filename.c_str ()), -1);
+ }
+ obj = imr_poa->servant_to_reference (this);
+ const CORBA::String_var replica_ior = this->orb_->object_to_string (obj.in ());
+ ior_table->bind(replica_name, replica_ior.in());
+ ACE_OS::fprintf (fp, "%s", replica_ior.in ());
+ ACE_OS::fclose (fp);
+
+ int err = 0;
+ // only report the imr ior if the fault tolerant ImR is complete
+ if (!CORBA::is_nil (this->peer_replica_.in()))
+ {
+ err = Locator_Repository::report_ior(imr_poa);
+ }
+
+ return err;
+}
+
+char*
+Shared_Backing_Store::locator_service_ior(const char* peer_ior) const
+{
+ const CORBA::Object_ptr this_obj =
+ this->orb_->string_to_object(this->non_ft_imr_ior_.in());
+ const CORBA::Object_ptr peer_obj =
+ this->orb_->string_to_object(peer_ior);
+ const CORBA::Object_ptr& obj1 =
+ (this->imr_type_ == Options::PRIMARY_IMR) ? this_obj : peer_obj;
+ const CORBA::Object_ptr& obj2 =
+ (this->imr_type_ != Options::PRIMARY_IMR) ? this_obj : peer_obj;
+
+ CORBA::Object_var IORM =
+ this->orb_->resolve_initial_references (TAO_OBJID_IORMANIPULATION, 0);
+
+ TAO_IOP::TAO_IOR_Manipulation_var iorm =
+ TAO_IOP::TAO_IOR_Manipulation::_narrow (IORM.in());
+
+ CORBA::Object_var locator_service = iorm->add_profiles(obj1, obj2);
+
+ char* const combined_ior =
+ this->orb_->object_to_string(locator_service.in());
+ return combined_ior;
+
+}
+
+template<typename Map>
+static const typename Map::ENTRY& unique_id(
+ const typename Map::KEY& key,
+ const XML_Backing_Store::NameValues& repo_values,
+ const XML_Backing_Store::NameValues& extra_params,
+ Map& unique_ids,
+ const Options::ImrType this_repo_type,
+ unsigned int& this_repo_id,
+ const unsigned int debug)
+{
+ const unsigned int size = extra_params.size();
+ if ((size != 2) && (debug > 4))
+ {
+ ACE_ERROR((
+ LM_ERROR,
+ ACE_TEXT("(%P|%t) Persisted server id=%C name=%C doesn't have all ")
+ ACE_TEXT("unique id params. (%d of 2)\n"),
+ size));
+ };
+
+ unsigned int repo_id = 0;
+ // default to this repo
+ Options::ImrType repo_type = this_repo_type;
+ for (unsigned int i = 0; i < size; ++i)
+ {
+ ACE_DEBUG((LM_INFO,
+ ACE_TEXT ("name values %C=%C (%C)\n"),
+ extra_params[i].first.c_str(),
+ extra_params[i].second.c_str(),
+ repo_values[i].first.c_str()));
+ }
+ if ((size > Shared_Backing_Store::REPO_TYPE) &&
+ (extra_params[Shared_Backing_Store::REPO_TYPE].first ==
+ repo_values[Shared_Backing_Store::REPO_TYPE].first))
+ repo_type = (Options::ImrType)ACE_OS::atoi(
+ extra_params[Shared_Backing_Store::REPO_TYPE].second.c_str());
+
+ if ((size > Shared_Backing_Store::REPO_ID) &&
+ (extra_params[Shared_Backing_Store::REPO_ID].first ==
+ repo_values[Shared_Backing_Store::REPO_ID].first))
+ repo_id =
+ ACE_OS::atoi(extra_params[Shared_Backing_Store::REPO_ID].second.c_str());
+ else
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT("(%P|%t) Persisted %C did not supply a repo_id\n"),
+ identify_key(key).c_str()));
+ }
+
+ return
+ unique_id(key, unique_ids, this_repo_type,
+ this_repo_id, repo_type, repo_id);
+}
+
+void
+Shared_Backing_Store::load_server (
+ const ACE_CString& server_id,
+ const ACE_CString& server_name,
+ bool jacorb_server,
+ const ACE_CString& activator_name,
+ const ACE_CString& startup_cmd,
+ const ImplementationRepository::EnvironmentList& env_vars,
+ const ACE_CString& working_dir,
+ const ImplementationRepository::ActivationMode actmode,
+ int start_limit,
+ const ACE_CString& partial_ior,
+ const ACE_CString& ior,
+ bool server_started,
+ const NameValues& extra_params)
+{
+ // ensure there is an entry for this server
+ unique_id(server_name, this->repo_values_, extra_params,
+ this->server_uids_, this->imr_type_, this->repo_id_,
+ this->opts_.debug());
+ Server_Info_Ptr si;
+ const bool new_server = (this->servers ().find (server_name, si) != 0);
+
+ if (new_server)
+ {
+ // create new or replace the existing entry
+ XML_Backing_Store::load_server(
+ server_id, server_name, jacorb_server, activator_name, startup_cmd,
+ env_vars, working_dir, actmode, start_limit, partial_ior, ior,
+ server_started, extra_params);
+ return;
+ }
+
+ // is the server object new
+ const bool new_ior = si->ior != ior;
+ if (new_ior)
+ si->ior = ior;
+
+ si->server_id = server_id;
+ si->jacorb_server = jacorb_server;
+ si->activator = activator_name;
+ si->cmdline = startup_cmd;
+ si->env_vars = env_vars;
+ si->dir = working_dir;
+ si->activation_mode = actmode;
+ si->start_limit = start_limit;
+ si->partial_ior = partial_ior;
+
+ if (!server_started)
+ si->server = ImplementationRepository::ServerObject::_nil();
+ else
+ // will create a new server below if no previous server
+ // or the ior has changed
+ server_started = CORBA::is_nil(si->server) || new_ior;
+
+ create_server(server_started, si);
+}
+
+void
+Shared_Backing_Store::load_activator (const ACE_CString& activator_name,
+ long token,
+ const ACE_CString& ior,
+ const NameValues& extra_params)
+{
+ // use this to make sure an unique id entry is created
+ unique_id(activator_name, this->repo_values_, extra_params,
+ this->activator_uids_, this->imr_type_, this->repo_id_,
+ this->opts_.debug());
+ XML_Backing_Store::load_activator(activator_name, token, ior, extra_params);
+}
+
+void
+Shared_Backing_Store::notify_updated_server(
+ const ImplementationRepository::ServerUpdate& server)
+{
+ if (this->opts_.debug() > 5)
+ {
+ ACE_DEBUG((
+ LM_INFO,
+ ACE_TEXT("(%P|%t) notify_updated_server=%C\n"),
+ server.name.in()));
+ }
+ if ((this->sync_needed_ == FULL_SYNC) ||
+ (++this->replica_seq_num_ != server.seq_num))
+ {
+ this->replica_seq_num_ = server.seq_num;
+ this->sync_needed_ = FULL_SYNC;
+ this->sync_files_.clear();
+ return;
+ }
+
+ const ACE_CString name = server.name.in();
+ if (server.type == ImplementationRepository::repo_remove)
+ {
+ // sync_needed_ doesn't change, since we handle the change
+ // imme;diately
+ this->servers().unbind (name);
+ return;
+ }
+
+ this->sync_needed_ = INC_SYNC;
+ Options::ImrType repo_type = (Options::ImrType)server.repo_type;
+ unsigned int repo_id = server.repo_id;
+ const ServerUIMap::ENTRY& entry =
+ unique_id(name, this->server_uids_, this->imr_type_, this->repo_id_,
+ repo_type, repo_id);
+ const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename;
+ this->sync_files_.insert(fname);
+}
+
+void
+Shared_Backing_Store::notify_updated_activator(
+ const ImplementationRepository::ActivatorUpdate& activator)
+{
+ if (this->opts_.debug() > 5)
+ {
+ ACE_DEBUG((
+ LM_INFO,
+ ACE_TEXT("(%P|%t) notify_updated_activator=%C\n"),
+ activator.name.in()));
+ }
+ if ((this->sync_needed_ == FULL_SYNC) ||
+ (++this->replica_seq_num_ != activator.seq_num))
+ {
+ this->replica_seq_num_ = activator.seq_num;
+ this->sync_needed_ = FULL_SYNC;
+ this->sync_files_.clear();
+ return;
+ }
+
+ const ACE_CString name = lcase(activator.name.in());
+ if (activator.type == ImplementationRepository::repo_remove)
+ {
+ // sync_needed_ doesn't change, since we handle the change
+ // immediately
+ this->activators().unbind (name);
+ return;
+ }
+
+ this->sync_needed_ = INC_SYNC;
+ Options::ImrType repo_type = (Options::ImrType)activator.repo_type;
+ unsigned int repo_id = activator.repo_id;
+ const ActivatorUIMap::ENTRY& entry =
+ unique_id(name, this->activator_uids_, this->imr_type_, this->repo_id_,
+ repo_type, repo_id);
+ const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename;
+ this->sync_files_.insert(fname);
+}
+
+void
+Shared_Backing_Store::register_replica(
+ ImplementationRepository::UpdatePushNotification_ptr replica,
+ char*& ft_imr_ior,
+ ImplementationRepository::SequenceNum_out seq_num)
+{
+ ACE_ASSERT (! CORBA::is_nil (replica));
+ this->peer_replica_ =
+ ImplementationRepository::UpdatePushNotification::_duplicate (replica);
+
+ seq_num = this->seq_num_;
+ if (this->imr_type_ == Options::STANDALONE_IMR)
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT("Error: Non-replicated ImR receiving replica ")
+ ACE_TEXT("registration <%s>\n"),
+ ft_imr_ior));
+ return;
+ }
+
+ this->replica_seq_num_ = 0;
+
+ // store off original char* to ensure memory cleanup
+ CORBA::String_var ior = ft_imr_ior;
+
+ // if we already have the fault tolerant ImR ior
+ // then just copy it
+ if (registered())
+ {
+ if (this->opts_.debug() > 2)
+ {
+ ACE_DEBUG((
+ LM_INFO,
+ ACE_TEXT("(%P|%t) Already registered <%C>\n"),
+ this->imr_ior_.in()));
+ }
+ // make a copy
+ ior = this->imr_ior_.in();
+ // handoff memory
+ ft_imr_ior = ior._retn();
+ return;
+ }
+
+ // otherwise we need to combine the primary and backup ior to make
+ // the fault tolerant ImR ior
+ char* combined_ior = 0;
+ CORBA::String_var reason;
+ try
+ {
+ combined_ior = locator_service_ior(ft_imr_ior);
+ }
+ catch (const TAO_IOP::Invalid_IOR& )
+ {
+ reason = "invalid ior";
+ }
+ catch (const TAO_IOP::EmptyProfileList& )
+ {
+ reason = "no profiles";
+ }
+ catch (const TAO_IOP::Duplicate& )
+ {
+ reason = "duplicate profile";
+ }
+
+ if (combined_ior == 0)
+ {
+ // give back the original pointer and don't clean it up
+ ft_imr_ior = ior._retn();
+ ACE_ERROR((LM_ERROR,
+ "ERROR: Failed to create Fault Tolerant ImR, reason=%s\n",
+ reason.in()));
+ throw ImplementationRepository::InvalidPeer(reason.in());
+ }
+
+ ft_imr_ior = combined_ior;
+ // pass as const char* to make sure string is copied
+ this->imr_ior_ = (const char*)ft_imr_ior;
+
+ PortableServer::POA_var null_poa;
+ Locator_Repository::report_ior(null_poa);
+}
+
+ACE_CString
+Shared_Backing_Store::replica_ior_filename(bool peer_ior_file) const
+{
+ Options::ImrType desired_type = this->imr_type_;
+ if (peer_ior_file)
+ {
+ desired_type = (desired_type == Options::PRIMARY_IMR) ?
+ Options::BACKUP_IMR :
+ Options::PRIMARY_IMR;
+ }
+ ACE_CString ior =
+ this->filename_ + IMR_REPLICA[desired_type] + ACE_TEXT(".ior");
+ if (peer_ior_file)
+ {
+ // the peer ior file needs the file prefix
+ ior = "file://" + ior;
+ }
+
+ return ior;
+}
+
+Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
+ const ACE_TString& dir)
+: dir_(dir),
+ only_changes_(false)
+{
+}
+
+Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
+ const ACE_TString& dir,
+ const Locator_Repository::SIMap& servers,
+ const Locator_Repository::AIMap& activators)
+: dir_(dir),
+ only_changes_(true)
+{
+ Locator_Repository::SIMap::ENTRY* sientry = 0;
+ Locator_Repository::SIMap::CONST_ITERATOR siit (servers);
+ for (; siit.next (sientry); siit.advance() )
+ {
+ unmatched_servers_.bind (sientry->ext_id_, sientry->int_id_);
+ }
+
+ Locator_Repository::AIMap::ENTRY* aientry = 0;
+ Locator_Repository::AIMap::CONST_ITERATOR aiit (activators);
+ for (; aiit.next (aientry); aiit.advance() )
+ {
+ unmatched_activators_.bind (aientry->ext_id_, aientry->int_id_);
+ }
+}
+
+void
+Shared_Backing_Store::LocatorListings_XMLHandler::startElement (
+ const ACEXML_Char* ,
+ const ACEXML_Char* ,
+ const ACEXML_Char* qName,
+ ACEXML_Attributes* attrs)
+{
+ const bool server =
+ (ACE_OS::strcasecmp (qName, Locator_XMLHandler::SERVER_INFO_TAG) == 0);
+ if (!server &&
+ (ACE_OS::strcasecmp (qName, Locator_XMLHandler::ACTIVATOR_INFO_TAG) != 0))
+ {
+ return;
+ }
+
+ if (attrs != 0 && attrs->getLength () == 2)
+ {
+ ACE_TString fname = attrs->getValue ((size_t)0);
+ bool store_fname = !only_changes_;
+ if (only_changes_)
+ {
+ ACE_CString name = ACE_TEXT_ALWAYS_CHAR(attrs->getValue ((size_t)1));
+ // if the name is not present, then this is an add, so store it
+ store_fname = server ?
+ (unmatched_servers_.unbind (name) != 0) :
+ (unmatched_activators_.unbind (name) != 0);
+ }
+
+ if (store_fname)
+ {
+ filenames_.push_back(dir_ + fname);
+ }
+ }
+ else
+ ACE_DEBUG((
+ LM_INFO,
+ ACE_TEXT ("LocatorListings_XMLHandler::startElement ")
+ ACE_TEXT ("incorrect number of attrs (%d)\n"),
+ attrs->getLength ()));
+
+}
+
+void
+Shared_Backing_Store::LocatorListings_XMLHandler::endElement (
+ const ACEXML_Char* ,
+ const ACEXML_Char* ,
+ const ACEXML_Char* )
+{
+}
+
+void
+Shared_Backing_Store::LocatorListings_XMLHandler::remove_unmatched(
+ Locator_Repository& repo)
+{
+ Locator_Repository::SIMap::ENTRY* sientry = 0;
+ Locator_Repository::SIMap::CONST_ITERATOR siit (this->unmatched_servers_);
+ for (; siit.next (sientry); siit.advance() )
+ {
+ int ret = repo.servers().unbind (sientry->ext_id_);
+ if (ret != 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT ("ERROR: could not remove server: %s\n"),
+ sientry->int_id_->name.c_str()));
+ }
+ }
+
+ Locator_Repository::AIMap::ENTRY* aientry = 0;
+ Locator_Repository::AIMap::CONST_ITERATOR aiit (this->unmatched_activators_);
+ for (; aiit.next (aientry); aiit.advance ())
+ {
+ int ret = repo.activators().unbind (aientry->ext_id_);
+ if (ret != 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT ("ERROR: could not remove activator: %s\n"),
+ aientry->int_id_->name.c_str()));
+ }
+ }
+}
+
+const ACE_Vector<ACE_TString>&
+Shared_Backing_Store::LocatorListings_XMLHandler::filenames() const
+{
+ return this->filenames_;
+}