diff options
author | stanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2013-02-05 21:11:03 +0000 |
---|---|---|
committer | stanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2013-02-05 21:11:03 +0000 |
commit | 5e030faf84086ab02059fcbcc3faed224bd57b95 (patch) | |
tree | 3a62df45ac6ccf599fb07cf6a03d672456ce2e3d /TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp | |
parent | 9d296f7fa51116ff7040ecb2ad18612cd94b5fd1 (diff) | |
download | ATCD-5e030faf84086ab02059fcbcc3faed224bd57b95.tar.gz |
Merge in OCI_Reliability_Enhancements branch.
Diffstat (limited to 'TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp')
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp | 1366 |
1 files changed, 1366 insertions, 0 deletions
diff --git a/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp b/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp new file mode 100644 index 00000000000..b4cbb0dafff --- /dev/null +++ b/TAO/orbsvcs/ImplRepo_Service/Shared_Backing_Store.cpp @@ -0,0 +1,1366 @@ +// $Id$ + +#include "Shared_Backing_Store.h" +#include "Server_Info.h" +#include "Activator_Info.h" +#include "utils.h" +#include "Locator_XMLHandler.h" +#include "ImR_LocatorC.h" +#include "ace/File_Lock.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_strings.h" +#include "ace/OS_NS_ctype.h" +#include "ace/OS_NS_unistd.h" +#include "ACEXML/parser/parser/Parser.h" +#include "ACEXML/common/FileCharStream.h" +#include "ACEXML/common/XML_Util.h" +#include "tao/IORManipulation/IORManip_Loader.h" + +namespace { + class Lockable_File + { + public: + Lockable_File() + : file_(0), + flags_(0), + locked_(false), + unlink_in_destructor_(false) + { + } + + Lockable_File(const ACE_TString& file, + const int flags, + bool unlink_in_destructor = false) + : file_(0), + flags_(0), + locked_(false), + unlink_in_destructor_(false) + { + init_fl(file, flags, unlink_in_destructor); + } + + ~Lockable_File() + { + release(); + } + + void release() + { + if (this->file_ == 0) + return; + + close_file(); + this->file_lock_.reset(); + this->locked_ = false; + } + + FILE* get_file() + { + lock(); + + return this->file_; + } + + FILE* get_file(const ACE_TString& file, + const int flags, + bool unlink_in_destructor = false) + { + init_fl(file, flags, unlink_in_destructor); + return get_file(); + } + + private: + void init_fl(const ACE_TString& file, + const int flags, + bool unlink_in_destructor = false) + { + release(); + + flags_ = flags | O_CREAT; + unlink_in_destructor_ = unlink_in_destructor; + + const ACE_TCHAR* const flags_str = + ((flags_ & O_RDWR) != 0) ? ACE_TEXT("r+") : + (((flags_ & O_WRONLY) != 0) ? ACE_TEXT("w") : ACE_TEXT("r")); +#ifdef ACE_WIN32 + this->filename_ = file; + this->file_ = ACE_OS::fopen(file.c_str(), flags_str); +#else + this->file_lock_.reset( + new ACE_File_Lock(ACE_TEXT_CHAR_TO_TCHAR(file.c_str ()), + flags_, + 0666, + unlink_in_destructor)); + + // Truncating output so this will not allow reading then writing + + ACE_OS::ftruncate(this->file_lock_->get_handle(), 0); + this->file_ = ACE_OS::fdopen(this->file_lock_->get_handle(), flags_str); +#endif + } + + void close_file() + { + if (this->file_ == 0) + return; + + ACE_OS::fflush(this->file_); + ACE_OS::fclose(this->file_); + this->file_ = 0; +#ifdef ACE_WIN32 + if (this->unlink_in_destructor_) + { + ACE_OS::unlink(this->filename_.c_str()); + this->unlink_in_destructor_ = false; + } +#endif + } + + void lock() + { +#ifndef ACE_WIN32 + if (this->locked_) + return; + + if ((this->flags_ & O_RDWR) != 0) + file_lock_->acquire(); + if ((this->flags_ & O_WRONLY) != 0) + file_lock_->acquire_write(); + else + file_lock_->acquire_read(); + + this->locked_ = true; +#endif + } + + auto_ptr<ACE_File_Lock> file_lock_; + FILE* file_; + int flags_; + bool locked_; + bool unlink_in_destructor_; + ACE_TString filename_; + }; +} + +Shared_Backing_Store::Shared_Backing_Store(const Options& opts, + CORBA::ORB_ptr orb) +: XML_Backing_Store(opts, orb, true), + listing_file_(opts.persist_file_name() + ACE_TEXT("imr_listing.xml")), + seq_num_(0), + replica_seq_num_(0), + imr_type_(opts.imr_type()), + sync_needed_(NO_SYNC), + repo_id_(1), + repo_values_(2) +{ + IMR_REPLICA[Options::PRIMARY_IMR] = ACE_TEXT ("ImR_ReplicaPrimary"); + IMR_REPLICA[Options::BACKUP_IMR] = ACE_TEXT ("ImR_ReplicaBackup"); + IMR_REPLICA[Options::STANDALONE_IMR] = ACE_TEXT ("ImR_NoReplica"); + + this->repo_values_[REPO_TYPE] = + std::make_pair(ACE_CString(ACE_TEXT ("repo_type")), + ACE_CString()); + this->repo_values_[REPO_ID] = + std::make_pair(ACE_CString(ACE_TEXT ("repo_id")), + ACE_CString()); +} + +Shared_Backing_Store::~Shared_Backing_Store() +{ +} + + +static void replicate( + Shared_Backing_Store::Replica_ptr replica, + const ImplementationRepository::ServerUpdate& update) +{ + // replicate the ServerUpdate to our replicated locator + replica->notify_updated_server(update); +} + +static void replicate( + Shared_Backing_Store::Replica_ptr replica, + const ImplementationRepository::ActivatorUpdate& update) +{ + // replicate the ActivatorUpdate to our replicated locator + replica->notify_updated_activator(update); +} + +static void set_key(ImplementationRepository::ServerUpdate& update, + const Locator_Repository::SIMap::KEY& key) +{ + update.name = key.c_str(); +} + +static void set_key(ImplementationRepository::ActivatorUpdate& update, + const Locator_Repository::AIMap::KEY& key) +{ + update.name = key.c_str(); +} + +template< typename Update, typename Map> +static void replicate( + Shared_Backing_Store::Replica_ptr replica, + const typename Map::ENTRY& entry, + const ImplementationRepository::UpdateType type, + const ImplementationRepository::SequenceNum seq_num) +{ + if (CORBA::is_nil (replica)) + { + return; + } + + try + { + Update update; + set_key(update, entry.ext_id_); + update.type = type; + update.seq_num = seq_num; + update.repo_id = entry.int_id_.repo_id; + update.repo_type = entry.int_id_.repo_type; + replicate(replica, update); + } + catch (const CORBA::COMM_FAILURE&) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%P|%t) Replicated ImR: COMM_FAILURE Exception\n"))); + } + catch (const CORBA::TRANSIENT&) + { + ACE_DEBUG (( + LM_DEBUG, + ACE_TEXT("(%P|%t) Replicated ImR: TRANSIENT Exception\n"))); + } + catch (const CORBA::OBJECT_NOT_EXIST&) + { + ACE_DEBUG (( + LM_DEBUG, + ACE_TEXT("(%P|%t) Replicated ImR: OBJECT_NOT_EXIST ") + ACE_TEXT("Exception, dropping replica\n"))); + replica = TAO::Objref_Traits + <ImplementationRepository::UpdatePushNotification>::nil (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception (ACE_TEXT("Replicated ImR: notify update")); + replica = 0; + } +} + +template<typename Map> +static typename Map::ENTRY& unique_id( + const typename Map::KEY& key, + Map& unique_ids, + const Shared_Backing_Store::UniqueId& id) +{ + typename Map::ENTRY* entry = 0; + unique_ids.bind(key, id, entry); + + return *entry; +} + +static Shared_Backing_Store::UniqueId create_uid( + const Options::ImrType repo_type, + const unsigned int repo_id) +{ + Shared_Backing_Store::UniqueId id; + id.repo_id = repo_id; + id.repo_type = repo_type; + ACE_TCHAR id_str[50]; + ACE_OS::itoa((unsigned int)repo_type, id_str, 10); + + id.repo_type_str = id_str; + + size_t current = ACE_OS::strlen(id_str); + id_str[current++] = ACE_TEXT('_'); + ACE_OS::itoa(repo_id, &(id_str[current]), 10); + + id.repo_id_str = &(id_str[current]); + + current = ACE_OS::strlen(id_str); + id_str[current++] = ACE_TEXT('.'); + id_str[current++] = ACE_TEXT('x'); + id_str[current++] = ACE_TEXT('m'); + id_str[current++] = ACE_TEXT('l'); + id_str[current++] = ACE_TEXT('\0'); + id.unique_filename = id_str; + + return id; +} + +template<typename Map> +static const typename Map::ENTRY& unique_id( + const typename Map::KEY& key, + Map& unique_ids, + const Options::ImrType type, + unsigned int& next_repo_id) +{ + typename Map::ENTRY* entry; + if (unique_ids.find(key, entry) == 0) + return *entry; + const unsigned int repo_id = next_repo_id++; + Shared_Backing_Store::UniqueId id = create_uid(type, repo_id); + return unique_id(key, unique_ids, id); +} + +static ACE_CString identify_key(const ACE_CString& name) +{ + ACE_CString id("name="); + id += name; + return id; +} + +/* TODO: bdj - will need this when server container changed to name and id +static ACE_CString identify_key(const Shared_Backing_Store::ServerKey& name) +{ + ACE_CString id("name="); + id += info.name; + return id; +} +*/ +template<typename Map> +static const typename Map::ENTRY& unique_id( + const typename Map::KEY& key, + Map& unique_ids, + const Options::ImrType this_repo_type, + unsigned int& this_repo_id, + Options::ImrType& entry_repo_type, + unsigned int& entry_repo_id) +{ + typename Map::ENTRY* temp_entry; + const bool found = (unique_ids.find(key, temp_entry) == 0); + + Shared_Backing_Store::UniqueId temp_id = + create_uid(entry_repo_type, entry_repo_id); + typename Map::ENTRY& entry = unique_id(key, unique_ids, temp_id); + + if (entry_repo_id == 0) + { + // if no repo id provided, treat it like it came from this repo + entry_repo_id = this_repo_id++; + entry_repo_type = this_repo_type; + } + else if (found) + { + Shared_Backing_Store::UniqueId& id = entry.int_id_; + if (entry_repo_id != id.repo_id && + entry_repo_type != id.repo_type) + { + // if already existed, replace the contents + ACE_ERROR(( + LM_ERROR, + ACE_TEXT("(%P|%t) ERROR: replacing %C with existing repo_id=%d ") + ACE_TEXT("and imr_type=%d, with repo_id=%d and imr_type=%d\n"), + identify_key(key).c_str(), id.repo_id, id.repo_type, + entry_repo_id, entry_repo_type)); + id = temp_id; + } + } + + if (entry_repo_type == this_repo_type && entry_repo_id >= this_repo_id) + { + // persisting existing entries for this repo, so move the repo_id past + // the entries id + this_repo_id = entry_repo_id + 1; + } + + return entry; +} + +template< typename Update, typename Map> +static int remove( + const typename Map::KEY& key, + const Map& unique_ids, + const ACE_TString& path, + Lockable_File& listing_lf, + Shared_Backing_Store::Replica_ptr peer_replica, + ImplementationRepository::SequenceNum& seq_num) +{ + typename Map::ENTRY* entry; + const int err = unique_ids.find(key, entry); + if (err != 0) + { + ACE_ERROR(( + LM_ERROR, + ACE_TEXT("(%P|%t) Couldn't find unique repo id for %C\n"), + identify_key(key).c_str())); + return err; + } + + const ACE_TString fname = path + entry->int_id_.unique_filename; + + { + // take the lock, then remove the file + Lockable_File file(fname, O_WRONLY, true); + } + listing_lf.release(); + + replicate<Update, Map> + (peer_replica, *entry, ImplementationRepository::repo_remove, ++seq_num); + + return 0; +} + +int +Shared_Backing_Store::persistent_remove (const ACE_CString& name, + bool activator) +{ + Lockable_File listing_lf; + int err = persist_listings(listing_lf); + if (err != 0) + { + return err; + } + + if (activator) + { + err = remove<ImplementationRepository::ActivatorUpdate> + (name, this->activator_uids_, this->filename_, listing_lf, + this->peer_replica_, this->seq_num_); + } + else + { + err = remove<ImplementationRepository::ServerUpdate> + (name, this->server_uids_, this->filename_, listing_lf, + this->peer_replica_, this->seq_num_); + } + return err; +} + +int +Shared_Backing_Store::persistent_update(const Server_Info_Ptr& info, bool add) +{ + Lockable_File listing_lf; + if (add) + { + const int err = persist_listings(listing_lf); + if (err != 0) + { + return err; + } + } + + ACE_CString name = ACEXML_escape_string (info->name); + + const ServerUIMap::ENTRY& entry = + unique_id(info->name, this->server_uids_, this->imr_type_, this->repo_id_); + const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename; + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, ACE_TEXT ("Persisting to %s(%C)\n"), + fname.c_str(), info->name.c_str())); + } + Lockable_File server_file(fname, O_WRONLY); + const ACE_TString bfname = fname.c_str() + ACE_TString(".bak"); + FILE* fp = server_file.get_file(); + if (fp == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"), + fname.c_str())); + return -1; + } + // successfully added file (if adding), so release the listing file lock + listing_lf.release(); + ACE_OS::fprintf (fp,"<?xml version=\"1.0\"?>\n"); + + this->repo_values_[REPO_TYPE].second = entry.int_id_.repo_type_str; + this->repo_values_[REPO_ID].second = entry.int_id_.repo_id_str; + + persist(fp, *info, "", this->repo_values_); + + // Copy the current file to a backup. + FILE* bfp = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w")); + ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n"); + persist(bfp, *info, "", this->repo_values_); + ACE_OS::fflush(bfp); + ACE_OS::fclose(bfp); + server_file.release(); + + const ImplementationRepository::UpdateType type = add ? + ImplementationRepository::repo_add : + ImplementationRepository::repo_update; + replicate<ImplementationRepository::ServerUpdate, ServerUIMap> + (peer_replica_, entry, type, ++seq_num_); + return 0; +} + + +int +Shared_Backing_Store::persistent_update(const Activator_Info_Ptr& info, + bool add) +{ + Lockable_File listing_lf; + if (add) + { + const int err = persist_listings(listing_lf); + if (err != 0) + { + return err; + } + } + + ACE_CString name = lcase (info->name); + + const ActivatorUIMap::ENTRY& entry = + unique_id(name, this->activator_uids_, this->imr_type_, this->repo_id_); + const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename; + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, ACE_TEXT ("Persisting to %s(%C)\n"), + fname.c_str(), info->name.c_str())); + } + Lockable_File activator_file(fname, O_WRONLY); + const ACE_TString bfname = fname.c_str() + ACE_TString(".bak"); + FILE* fp = activator_file.get_file(); + if (fp == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"), + fname.c_str())); + return -1; + } + // successfully added file (if adding), so release the listing file lock + listing_lf.release(); + ACE_OS::fprintf (fp,"<?xml version=\"1.0\"?>\n"); + + this->repo_values_[REPO_TYPE].second = entry.int_id_.repo_type_str; + this->repo_values_[REPO_ID].second = entry.int_id_.repo_id_str; + + persist(fp, *info, "", this->repo_values_); + + // Copy the current file to a backup. + FILE* bfp = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w+")); + ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n"); + persist(bfp, *info, "", this->repo_values_); + ACE_OS::fflush(bfp); + ACE_OS::fclose(bfp); + activator_file.release(); + + const ImplementationRepository::UpdateType type = add ? + ImplementationRepository::repo_add : + ImplementationRepository::repo_update; + replicate<ImplementationRepository::ActivatorUpdate, ActivatorUIMap> + (peer_replica_, entry, type, ++seq_num_); + return 0; +} + +const ACE_TCHAR* +Shared_Backing_Store::repo_mode() const +{ + return this->listing_file_.c_str(); +} + +int +Shared_Backing_Store::connect_replicas (Replica_ptr this_replica) +{ + const ACE_TString& replica_ior = replica_ior_filename(true); + if (this->opts_.debug() > 1) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT("Resolving ImR replica %s\n"), replica_ior.c_str())); + } + + CORBA::Object_var obj = + this->orb_->string_to_object (replica_ior.c_str()); + + if (!CORBA::is_nil (obj.in ())) + { + bool non_exist = true; + try + { + this->peer_replica_ = ImplementationRepository:: + UpdatePushNotification::_narrow (obj.in()); + non_exist = (this->peer_replica_->_non_existent() == 1); + } + catch (const CORBA::Exception& ) + { + // let error be handled below + } + + if (non_exist) + { + this->peer_replica_ = + ImplementationRepository::UpdatePushNotification::_nil(); + } + } + + if (CORBA::is_nil (this->peer_replica_.in())) + { + if (this->imr_type_ == Options::BACKUP_IMR) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("Error: No primary ImR replica file found <%s>\n"), + replica_ior.c_str()), -1); + } + + // no connection currently, just wait for backup + return 0; + } + + if (opts_.debug() > 1) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT("Registering with previously running ImR replica\n"))); + } + + try + { + this->peer_replica_->register_replica(this_replica, + this->imr_ior_.inout(), + this->replica_seq_num_); + } + catch (const ImplementationRepository::InvalidPeer& ip) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("Error: obj key <%s> is an invalid ImR replica because %s\n"), + replica_ior.c_str(), ip.reason.in()), -1); + } + + if (opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT("Initializing repository with ft ior=<%C> ") + ACE_TEXT("and replica seq number %d\n"), + this->imr_ior_.in(), replica_seq_num_)); + } + + return 0; +} + +int +Shared_Backing_Store::init_repo(PortableServer::POA_ptr imr_poa) +{ + this->non_ft_imr_ior_ = this->imr_ior_; + PortableServer::ObjectId_var id = + PortableServer::string_to_ObjectId ("ImR_Replica"); + imr_poa->activate_object_with_id (id.in (), this); + + if (this->imr_type_ != Options::STANDALONE_IMR) + { + CORBA::Object_var obj = imr_poa->id_to_reference (id.in ()); + + Replica_var this_replica = + ImplementationRepository::UpdatePushNotification::_narrow (obj.in()); + const int err = connect_replicas(this_replica.in()); + if (err != 0) + { + return err; + } + } + + // only start the repo clean if no replica is running + if (this->opts_.repository_erase() && + CORBA::is_nil (this->peer_replica_.in ())) + { + Lockable_File listing_lf; + const XMLHandler_Ptr listings = get_listings(listing_lf, false); + if (listings.null()) + { + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT ("Persisted Repository already empty\n"))); + } + } + else + { + const ACE_Vector<ACE_TString>& filenames = listings->filenames(); + CORBA::ULong sz = filenames.size (); + for (CORBA::ULong i = 0; i < sz; ++i) + { + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, ACE_TEXT ("Removing %s\n"), + filenames[i].c_str())); + } + ACE_OS::unlink ( filenames[i].c_str () ); + } + + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, ACE_TEXT ("Removing %s\n"), + this->listing_file_.c_str())); + } + ACE_OS::unlink ( this->listing_file_.c_str () ); + } + } + + // ignore persistent_load return since files don't have to exist + persistent_load(false); + + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT ("ImR Repository initialized\n"))); + } + + return 0; +} + +int +Shared_Backing_Store::persistent_load (bool only_changes) +{ + Lockable_File listing_lf; + const XMLHandler_Ptr listings = get_listings(listing_lf, only_changes); + if (listings.null()) + { + // failed to retrieve listings + return -1; + } + + if (only_changes) + { + listings->remove_unmatched(*this); + } + + const ACE_Vector<ACE_TString>& filenames = listings->filenames(); + CORBA::ULong sz = filenames.size (); + if (this->opts_.debug() > 9) + { + ACE_DEBUG((LM_INFO, ACE_TEXT ("persistent_load %d files\n"), sz)); + } + for (CORBA::ULong i = 0; i < sz; ++i) + { + const ACE_TString& fname = filenames[i]; + Lockable_File file(fname, O_RDONLY); + + if(load(fname, file.get_file()) != 0) + { + load(fname + ".bak"); + } + } + + return 0; +} + +Shared_Backing_Store::XMLHandler_Ptr +Shared_Backing_Store::get_listings(Lockable_File& listing_lf, + bool only_changes) const +{ + XMLHandler_Ptr listings_handler; + if (only_changes) + { + listings_handler.reset(new LocatorListings_XMLHandler( + this->filename_, servers(), activators())); + } + else + { + listings_handler.reset(new LocatorListings_XMLHandler(this->filename_)); + } + + if (load(this->listing_file_, + *listings_handler, + this->opts_.debug(), + listing_lf.get_file(this->listing_file_, O_RDONLY)) != 0) + { + + if (load(this->listing_file_ + ".bak", + *listings_handler, + this->opts_.debug()) != 0) + { + listings_handler.reset(); + } + } + + return listings_handler; +} + +int +Shared_Backing_Store::sync_load () +{ + int err = 0; + if (this->opts_.debug() > 5) + { + ACE_DEBUG(( + LM_INFO, + ACE_TEXT("(%P|%t) sync_load %d, %d\n"), + this->sync_needed_, this->sync_files_.size())); + } + if (this->sync_needed_ == FULL_SYNC) + { + err = persistent_load(false); + } + else if (this->sync_needed_ == INC_SYNC) + { + std::set<ACE_TString>::const_iterator fname = this->sync_files_.begin(); + for ( ; fname != this->sync_files_.end(); ++fname) + { + if (this->opts_.debug() > 6) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT("(%P|%t) sync_load %s\n"), + fname->c_str())); + } + Lockable_File file(*fname, O_RDONLY); + int ind_err = load(*fname, file.get_file()); + if (ind_err != 0) + { + err = ind_err; + } + } + this->sync_files_.clear(); + } + + this->sync_needed_ = NO_SYNC; + return err; +} + +static void write_listing_item(FILE* list, const ACE_TString& fname, + const ACE_CString& name, const ACE_TCHAR* tag) +{ + ACE_OS::fprintf (list, "\t<%s", tag); + ACE_OS::fprintf (list, " fname=\"%s\"", fname.c_str ()); + ACE_OS::fprintf (list, " name=\"%s\" />\n", name.c_str ()); +} + +void +Shared_Backing_Store::write_listing(FILE* list) +{ + ACE_OS::fprintf (list,"<?xml version=\"1.0\"?>\n"); + ACE_OS::fprintf (list,"<ImRListing>\n"); + + // Save servers + Locator_Repository::SIMap::ENTRY* sientry = 0; + Locator_Repository::SIMap::CONST_ITERATOR siit (this->servers ()); + for (; siit.next (sientry); siit.advance() ) + { + const Server_Info_Ptr& info = sientry->int_id_; + + const Shared_Backing_Store::ServerUIMap::ENTRY& entry = + unique_id(sientry->ext_id_, this->server_uids_, + this->imr_type_, this->repo_id_); + ACE_CString listing_name = ACEXML_escape_string (info->name); + write_listing_item(list, entry.int_id_.unique_filename, listing_name, + Locator_XMLHandler::SERVER_INFO_TAG); + } + + // Save Activators + Locator_Repository::AIMap::ENTRY* aientry = 0; + Locator_Repository::AIMap::CONST_ITERATOR aiit (this->activators ()); + for (; aiit.next (aientry); aiit.advance ()) + { + const ACE_CString& aname = aientry->ext_id_; + const ActivatorUIMap::ENTRY& entry = + unique_id(aname, this->activator_uids_, + this->imr_type_, this->repo_id_); + write_listing_item(list, entry.int_id_.unique_filename, aname, + Locator_XMLHandler::ACTIVATOR_INFO_TAG); + } + + ACE_OS::fprintf (list,"</ImRListing>\n"); +} + +int +Shared_Backing_Store::persist_listings (Lockable_File& listing_lf) +{ + FILE* list = listing_lf.get_file(this->listing_file_, O_WRONLY); + if (list == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"), + this->listing_file_.c_str())); + return -1; + } + + write_listing(list); + + const ACE_TString bfname = this->listing_file_.c_str() + ACE_TString(".bak"); + + // Write backup file + FILE* baklist = ACE_OS::fopen(bfname.c_str(),ACE_TEXT("w")); + if (baklist == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Couldn't write to file %s\n"), + bfname.c_str())); + return -1; + } + + write_listing(baklist); + ACE_OS::fflush(baklist); + ACE_OS::fclose(baklist); + + return 0; +} + +int +Shared_Backing_Store::report_ior(PortableServer::POA_ptr imr_poa) +{ + if (this->imr_type_ == Options::STANDALONE_IMR) + { + return Locator_Repository::report_ior(imr_poa); + } + + CORBA::Object_var obj = this->orb_->resolve_initial_references ("IORTable"); + IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ()); + ACE_ASSERT (! CORBA::is_nil (ior_table.in ())); + + const char* const replica_name(IMR_REPLICA[this->imr_type_]); + ACE_TString replica_filename = replica_ior_filename(false); + FILE* fp = ACE_OS::fopen (replica_filename.c_str (), "w"); + if (fp == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ImR: Could not open file: %s\n"), + replica_filename.c_str ()), -1); + } + obj = imr_poa->servant_to_reference (this); + const CORBA::String_var replica_ior = this->orb_->object_to_string (obj.in ()); + ior_table->bind(replica_name, replica_ior.in()); + ACE_OS::fprintf (fp, "%s", replica_ior.in ()); + ACE_OS::fclose (fp); + + int err = 0; + // only report the imr ior if the fault tolerant ImR is complete + if (!CORBA::is_nil (this->peer_replica_.in())) + { + err = Locator_Repository::report_ior(imr_poa); + } + + return err; +} + +char* +Shared_Backing_Store::locator_service_ior(const char* peer_ior) const +{ + const CORBA::Object_ptr this_obj = + this->orb_->string_to_object(this->non_ft_imr_ior_.in()); + const CORBA::Object_ptr peer_obj = + this->orb_->string_to_object(peer_ior); + const CORBA::Object_ptr& obj1 = + (this->imr_type_ == Options::PRIMARY_IMR) ? this_obj : peer_obj; + const CORBA::Object_ptr& obj2 = + (this->imr_type_ != Options::PRIMARY_IMR) ? this_obj : peer_obj; + + CORBA::Object_var IORM = + this->orb_->resolve_initial_references (TAO_OBJID_IORMANIPULATION, 0); + + TAO_IOP::TAO_IOR_Manipulation_var iorm = + TAO_IOP::TAO_IOR_Manipulation::_narrow (IORM.in()); + + CORBA::Object_var locator_service = iorm->add_profiles(obj1, obj2); + + char* const combined_ior = + this->orb_->object_to_string(locator_service.in()); + return combined_ior; + +} + +template<typename Map> +static const typename Map::ENTRY& unique_id( + const typename Map::KEY& key, + const XML_Backing_Store::NameValues& repo_values, + const XML_Backing_Store::NameValues& extra_params, + Map& unique_ids, + const Options::ImrType this_repo_type, + unsigned int& this_repo_id, + const unsigned int debug) +{ + const unsigned int size = extra_params.size(); + if ((size != 2) && (debug > 4)) + { + ACE_ERROR(( + LM_ERROR, + ACE_TEXT("(%P|%t) Persisted server id=%C name=%C doesn't have all ") + ACE_TEXT("unique id params. (%d of 2)\n"), + size)); + }; + + unsigned int repo_id = 0; + // default to this repo + Options::ImrType repo_type = this_repo_type; + for (unsigned int i = 0; i < size; ++i) + { + ACE_DEBUG((LM_INFO, + ACE_TEXT ("name values %C=%C (%C)\n"), + extra_params[i].first.c_str(), + extra_params[i].second.c_str(), + repo_values[i].first.c_str())); + } + if ((size > Shared_Backing_Store::REPO_TYPE) && + (extra_params[Shared_Backing_Store::REPO_TYPE].first == + repo_values[Shared_Backing_Store::REPO_TYPE].first)) + repo_type = (Options::ImrType)ACE_OS::atoi( + extra_params[Shared_Backing_Store::REPO_TYPE].second.c_str()); + + if ((size > Shared_Backing_Store::REPO_ID) && + (extra_params[Shared_Backing_Store::REPO_ID].first == + repo_values[Shared_Backing_Store::REPO_ID].first)) + repo_id = + ACE_OS::atoi(extra_params[Shared_Backing_Store::REPO_ID].second.c_str()); + else + { + ACE_ERROR((LM_ERROR, + ACE_TEXT("(%P|%t) Persisted %C did not supply a repo_id\n"), + identify_key(key).c_str())); + } + + return + unique_id(key, unique_ids, this_repo_type, + this_repo_id, repo_type, repo_id); +} + +void +Shared_Backing_Store::load_server ( + const ACE_CString& server_id, + const ACE_CString& server_name, + bool jacorb_server, + const ACE_CString& activator_name, + const ACE_CString& startup_cmd, + const ImplementationRepository::EnvironmentList& env_vars, + const ACE_CString& working_dir, + const ImplementationRepository::ActivationMode actmode, + int start_limit, + const ACE_CString& partial_ior, + const ACE_CString& ior, + bool server_started, + const NameValues& extra_params) +{ + // ensure there is an entry for this server + unique_id(server_name, this->repo_values_, extra_params, + this->server_uids_, this->imr_type_, this->repo_id_, + this->opts_.debug()); + Server_Info_Ptr si; + const bool new_server = (this->servers ().find (server_name, si) != 0); + + if (new_server) + { + // create new or replace the existing entry + XML_Backing_Store::load_server( + server_id, server_name, jacorb_server, activator_name, startup_cmd, + env_vars, working_dir, actmode, start_limit, partial_ior, ior, + server_started, extra_params); + return; + } + + // is the server object new + const bool new_ior = si->ior != ior; + if (new_ior) + si->ior = ior; + + si->server_id = server_id; + si->jacorb_server = jacorb_server; + si->activator = activator_name; + si->cmdline = startup_cmd; + si->env_vars = env_vars; + si->dir = working_dir; + si->activation_mode = actmode; + si->start_limit = start_limit; + si->partial_ior = partial_ior; + + if (!server_started) + si->server = ImplementationRepository::ServerObject::_nil(); + else + // will create a new server below if no previous server + // or the ior has changed + server_started = CORBA::is_nil(si->server) || new_ior; + + create_server(server_started, si); +} + +void +Shared_Backing_Store::load_activator (const ACE_CString& activator_name, + long token, + const ACE_CString& ior, + const NameValues& extra_params) +{ + // use this to make sure an unique id entry is created + unique_id(activator_name, this->repo_values_, extra_params, + this->activator_uids_, this->imr_type_, this->repo_id_, + this->opts_.debug()); + XML_Backing_Store::load_activator(activator_name, token, ior, extra_params); +} + +void +Shared_Backing_Store::notify_updated_server( + const ImplementationRepository::ServerUpdate& server) +{ + if (this->opts_.debug() > 5) + { + ACE_DEBUG(( + LM_INFO, + ACE_TEXT("(%P|%t) notify_updated_server=%C\n"), + server.name.in())); + } + if ((this->sync_needed_ == FULL_SYNC) || + (++this->replica_seq_num_ != server.seq_num)) + { + this->replica_seq_num_ = server.seq_num; + this->sync_needed_ = FULL_SYNC; + this->sync_files_.clear(); + return; + } + + const ACE_CString name = server.name.in(); + if (server.type == ImplementationRepository::repo_remove) + { + // sync_needed_ doesn't change, since we handle the change + // imme;diately + this->servers().unbind (name); + return; + } + + this->sync_needed_ = INC_SYNC; + Options::ImrType repo_type = (Options::ImrType)server.repo_type; + unsigned int repo_id = server.repo_id; + const ServerUIMap::ENTRY& entry = + unique_id(name, this->server_uids_, this->imr_type_, this->repo_id_, + repo_type, repo_id); + const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename; + this->sync_files_.insert(fname); +} + +void +Shared_Backing_Store::notify_updated_activator( + const ImplementationRepository::ActivatorUpdate& activator) +{ + if (this->opts_.debug() > 5) + { + ACE_DEBUG(( + LM_INFO, + ACE_TEXT("(%P|%t) notify_updated_activator=%C\n"), + activator.name.in())); + } + if ((this->sync_needed_ == FULL_SYNC) || + (++this->replica_seq_num_ != activator.seq_num)) + { + this->replica_seq_num_ = activator.seq_num; + this->sync_needed_ = FULL_SYNC; + this->sync_files_.clear(); + return; + } + + const ACE_CString name = lcase(activator.name.in()); + if (activator.type == ImplementationRepository::repo_remove) + { + // sync_needed_ doesn't change, since we handle the change + // immediately + this->activators().unbind (name); + return; + } + + this->sync_needed_ = INC_SYNC; + Options::ImrType repo_type = (Options::ImrType)activator.repo_type; + unsigned int repo_id = activator.repo_id; + const ActivatorUIMap::ENTRY& entry = + unique_id(name, this->activator_uids_, this->imr_type_, this->repo_id_, + repo_type, repo_id); + const ACE_TString fname = this->filename_ + entry.int_id_.unique_filename; + this->sync_files_.insert(fname); +} + +void +Shared_Backing_Store::register_replica( + ImplementationRepository::UpdatePushNotification_ptr replica, + char*& ft_imr_ior, + ImplementationRepository::SequenceNum_out seq_num) +{ + ACE_ASSERT (! CORBA::is_nil (replica)); + this->peer_replica_ = + ImplementationRepository::UpdatePushNotification::_duplicate (replica); + + seq_num = this->seq_num_; + if (this->imr_type_ == Options::STANDALONE_IMR) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT("Error: Non-replicated ImR receiving replica ") + ACE_TEXT("registration <%s>\n"), + ft_imr_ior)); + return; + } + + this->replica_seq_num_ = 0; + + // store off original char* to ensure memory cleanup + CORBA::String_var ior = ft_imr_ior; + + // if we already have the fault tolerant ImR ior + // then just copy it + if (registered()) + { + if (this->opts_.debug() > 2) + { + ACE_DEBUG(( + LM_INFO, + ACE_TEXT("(%P|%t) Already registered <%C>\n"), + this->imr_ior_.in())); + } + // make a copy + ior = this->imr_ior_.in(); + // handoff memory + ft_imr_ior = ior._retn(); + return; + } + + // otherwise we need to combine the primary and backup ior to make + // the fault tolerant ImR ior + char* combined_ior = 0; + CORBA::String_var reason; + try + { + combined_ior = locator_service_ior(ft_imr_ior); + } + catch (const TAO_IOP::Invalid_IOR& ) + { + reason = "invalid ior"; + } + catch (const TAO_IOP::EmptyProfileList& ) + { + reason = "no profiles"; + } + catch (const TAO_IOP::Duplicate& ) + { + reason = "duplicate profile"; + } + + if (combined_ior == 0) + { + // give back the original pointer and don't clean it up + ft_imr_ior = ior._retn(); + ACE_ERROR((LM_ERROR, + "ERROR: Failed to create Fault Tolerant ImR, reason=%s\n", + reason.in())); + throw ImplementationRepository::InvalidPeer(reason.in()); + } + + ft_imr_ior = combined_ior; + // pass as const char* to make sure string is copied + this->imr_ior_ = (const char*)ft_imr_ior; + + PortableServer::POA_var null_poa; + Locator_Repository::report_ior(null_poa); +} + +ACE_CString +Shared_Backing_Store::replica_ior_filename(bool peer_ior_file) const +{ + Options::ImrType desired_type = this->imr_type_; + if (peer_ior_file) + { + desired_type = (desired_type == Options::PRIMARY_IMR) ? + Options::BACKUP_IMR : + Options::PRIMARY_IMR; + } + ACE_CString ior = + this->filename_ + IMR_REPLICA[desired_type] + ACE_TEXT(".ior"); + if (peer_ior_file) + { + // the peer ior file needs the file prefix + ior = "file://" + ior; + } + + return ior; +} + +Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler( + const ACE_TString& dir) +: dir_(dir), + only_changes_(false) +{ +} + +Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler( + const ACE_TString& dir, + const Locator_Repository::SIMap& servers, + const Locator_Repository::AIMap& activators) +: dir_(dir), + only_changes_(true) +{ + Locator_Repository::SIMap::ENTRY* sientry = 0; + Locator_Repository::SIMap::CONST_ITERATOR siit (servers); + for (; siit.next (sientry); siit.advance() ) + { + unmatched_servers_.bind (sientry->ext_id_, sientry->int_id_); + } + + Locator_Repository::AIMap::ENTRY* aientry = 0; + Locator_Repository::AIMap::CONST_ITERATOR aiit (activators); + for (; aiit.next (aientry); aiit.advance() ) + { + unmatched_activators_.bind (aientry->ext_id_, aientry->int_id_); + } +} + +void +Shared_Backing_Store::LocatorListings_XMLHandler::startElement ( + const ACEXML_Char* , + const ACEXML_Char* , + const ACEXML_Char* qName, + ACEXML_Attributes* attrs) +{ + const bool server = + (ACE_OS::strcasecmp (qName, Locator_XMLHandler::SERVER_INFO_TAG) == 0); + if (!server && + (ACE_OS::strcasecmp (qName, Locator_XMLHandler::ACTIVATOR_INFO_TAG) != 0)) + { + return; + } + + if (attrs != 0 && attrs->getLength () == 2) + { + ACE_TString fname = attrs->getValue ((size_t)0); + bool store_fname = !only_changes_; + if (only_changes_) + { + ACE_CString name = ACE_TEXT_ALWAYS_CHAR(attrs->getValue ((size_t)1)); + // if the name is not present, then this is an add, so store it + store_fname = server ? + (unmatched_servers_.unbind (name) != 0) : + (unmatched_activators_.unbind (name) != 0); + } + + if (store_fname) + { + filenames_.push_back(dir_ + fname); + } + } + else + ACE_DEBUG(( + LM_INFO, + ACE_TEXT ("LocatorListings_XMLHandler::startElement ") + ACE_TEXT ("incorrect number of attrs (%d)\n"), + attrs->getLength ())); + +} + +void +Shared_Backing_Store::LocatorListings_XMLHandler::endElement ( + const ACEXML_Char* , + const ACEXML_Char* , + const ACEXML_Char* ) +{ +} + +void +Shared_Backing_Store::LocatorListings_XMLHandler::remove_unmatched( + Locator_Repository& repo) +{ + Locator_Repository::SIMap::ENTRY* sientry = 0; + Locator_Repository::SIMap::CONST_ITERATOR siit (this->unmatched_servers_); + for (; siit.next (sientry); siit.advance() ) + { + int ret = repo.servers().unbind (sientry->ext_id_); + if (ret != 0) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("ERROR: could not remove server: %s\n"), + sientry->int_id_->name.c_str())); + } + } + + Locator_Repository::AIMap::ENTRY* aientry = 0; + Locator_Repository::AIMap::CONST_ITERATOR aiit (this->unmatched_activators_); + for (; aiit.next (aientry); aiit.advance ()) + { + int ret = repo.activators().unbind (aientry->ext_id_); + if (ret != 0) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("ERROR: could not remove activator: %s\n"), + aientry->int_id_->name.c_str())); + } + } +} + +const ACE_Vector<ACE_TString>& +Shared_Backing_Store::LocatorListings_XMLHandler::filenames() const +{ + return this->filenames_; +} |