From 2f975d3f5493837eb0429e07d31495638f93625f Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 28 Mar 2013 17:08:01 +0000 Subject: Thu Mar 28 17:05:52 UTC 2013 Phil Mesnier --- TAO/ChangeLog_Asynch_ImR | 14 + .../ImplRepo_Service/AsyncAccessManager.cpp | 334 ++++++++++++++++++--- TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h | 68 +++-- TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp | 230 +++++++++++--- TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h | 40 ++- TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc | 2 +- TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 118 +++++--- TAO/orbsvcs/ImplRepo_Service/LiveCheck.h | 10 +- TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h | 12 +- 9 files changed, 662 insertions(+), 166 deletions(-) diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR index f5a9875a436..f2fd175d59a 100644 --- a/TAO/ChangeLog_Asynch_ImR +++ b/TAO/ChangeLog_Asynch_ImR @@ -1,3 +1,17 @@ +Thu Mar 28 17:05:52 UTC 2013 Phil Mesnier + + * orbsvcs/ImplRepo_Service/AsyncAccessManager.h: + * orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.h: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp: + * orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc: + * orbsvcs/ImplRepo_Service/LiveCheck.h: + * orbsvcs/ImplRepo_Service/LiveCheck.cpp: + * orbsvcs/ImplRepo_Service/Locator_Repository.h: + + Further improvments to the implementation. At the point where the basic ImplRepo + test passes. + Tue Mar 26 23:58:50 UTC 2013 Phil Mesnier * orbsvcs/ImplRepo_Service/Adapter_Activator.h: diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp index daa4efdc025..1368c1bbdac 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp @@ -3,84 +3,318 @@ #include "AsyncAccessManager.h" #include "ImR_Locator_i.h" +#include "Locator_Repository.h" + //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- -#if 0 -class AsyncAccessManager +AsyncAccessManager::AsyncAccessManager (const Server_Info &info, + bool manual, + ImR_Locator_i &locator) + :info_(0), + manual_start_ (manual), + locator_(locator), + poa_(locator.root_poa()), + rh_list_(), + status_(AAM_INIT), + refcount_(1), + lock_() { - public: - AsyncAccessManager (UpdateableServerInfo &info, ImR_Locator_i &locator); - ~AsyncAccessManager (void); + ACE_DEBUG ((LM_DEBUG,"New AAM: this = %x, name = %s\n", + this, info.name.c_str())); + this->info_ = new Server_Info (info); +} - void add_interest (ImR_ReplyHandler *rh); - AAM_Status status (void) const; +AsyncAccessManager::~AsyncAccessManager (void) +{ + delete this->info_; +} - void activator_replied (void); - void server_is_running (void); - void ping_replied (bool is_alive); - void +bool +AsyncAccessManager::has_server (const char *s) +{ + return ACE_OS::strcmp (this->info_->name.c_str(),s) == 0; +} - void add_ref (void); - void remove_ref (void); +void +AsyncAccessManager::add_interest (ImR_ReplyHandler *rh) +{ + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->rh_list_.push_back (rh); + } + if (this->locator_.debug() > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncAccessManager::add_interest: ") + ACE_TEXT ("server = <%C>, status = %d count = %d\n"), + this->info_->name.c_str(), this->status_, this->rh_list_.size())); + } - private: - UpdateableServerInfo &info_; - ImR_Locator_i &locator_; + if (this->status_ == AAM_SERVER_READY) + { + if (this->locator_.pinger().is_alive (this->info_->name.c_str())) + { + this->final_state(); + return; + } + } - ACE_Stack rh_list_; + if (this->status_ == AAM_INIT || this->status_ == AAM_SERVER_READY) + { + // This is not a leak. The listener registers with + // the pinger and will delete itself when done. + AsyncLiveListener *l = 0; + ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(), + *this, + this->locator_.pinger())); + if (!l->start()) + { + if (!this->send_start_request()) + { + this->final_state(); + } + } + else + { + this->status (AAM_WAIT_FOR_PING); + } + } +} - AAM_Status status_; +void +AsyncAccessManager::final_state (void) +{ + if (this->locator_.debug() > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncAccessManager::final_state: ") + ACE_TEXT ("status = %d, pior = <%C>\n"), + this->status_, this->info_->partial_ior.c_str())); + } - int refcount_; - TAO_SYNCH_MUTEX lock_; + for (size_t i = 0; i < this->rh_list_.size(); i++) + { + ImR_ReplyHandler *rh = this->rh_list_[i]; + if (rh != 0) + { + if (this->status_ == AAM_SERVER_READY) + { + rh->send_ior (this->info_->partial_ior.c_str()); + } + else + { + rh->send_exception (); + } + } + } + this->rh_list_.clear (); + if (this->info_->activation_mode == ImplementationRepository::PER_CLIENT || + this->status_ != AAM_SERVER_READY) + { + this->locator_.remove_aam (this); + } +} -}; -#endif +AAM_Status +AsyncAccessManager::status (void) const +{ + return this->status_; +} + +void +AsyncAccessManager::status (AAM_Status s) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->status_ = s; +} + +void +AsyncAccessManager::activator_replied (bool success) +{ + if (this->locator_.debug() > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncAccessManager::activator_replied: ") + ACE_TEXT ("success = %d, status = %d\n"), + success, this->status_)); + } + if (success) + { + this->status (AAM_WAIT_FOR_RUNNING); + } + else + { + this->status (AAM_NO_ACTIVATOR); + this->final_state (); + } +} + +void +AsyncAccessManager::server_is_running (const char *partial_ior) +{ + if (this->locator_.debug() > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncAccessManager::server_is_running: ") + ACE_TEXT ("name = %C, status = %d\n"), + this->info_->name.c_str(), this->status_)); + } + this->status (AAM_WAIT_FOR_ALIVE); + this->info_->partial_ior = partial_ior; + + if (this->locator_.pinger().is_alive (this->info_->name.c_str())) + { + this->status (AAM_SERVER_READY); + this->final_state (); + } + + // This is not a leak. The listener registers with + // the pinger and will delete itself when done. + AsyncLiveListener *l = 0; + ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(), + *this, + this->locator_.pinger())); + if (!l->start()) + { + this->status (AAM_SERVER_DEAD); + this->final_state (); + } +} + +void +AsyncAccessManager::ping_replied (bool is_alive) +{ + if (is_alive) + { + this->status (AAM_SERVER_READY); + } + else if (this->status_ == AAM_WAIT_FOR_PING) + { + if (this->send_start_request ()) + { + return; + } + } + else + { + this->status (AAM_SERVER_DEAD); + } + this->final_state(); +} + +bool +AsyncAccessManager::send_start_request (void) +{ + if (this->info_->activation_mode == ImplementationRepository::MANUAL && + !this->manual_start_) + { + this->status (AAM_NOT_MANUAL); + return false; + } + + Activator_Info_Ptr ainfo = + this->locator_.get_activator (this->info_->activator); + + if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ())) + { + this->status (AAM_NO_ACTIVATOR); + return false; + } + + PortableServer::ServantBase_var callback = new ActivatorReceiver (this, + this->poa_.in()); + PortableServer::ObjectId_var oid = this->poa_->activate_object (callback.in()); + CORBA::Object_var obj = this->poa_->id_to_reference (oid.in()); + ImplementationRepository::AMI_ActivatorHandler_var cb = + ImplementationRepository::AMI_ActivatorHandler::_narrow (obj.in()); + + ainfo->activator->sendc_start_server (cb.in(), + this->info_->name.c_str (), + this->info_->cmdline.c_str (), + this->info_->dir.c_str (), + this->info_->env_vars); + this->status (AAM_ACTIVATION_SENT); + return true; +} + +void +AsyncAccessManager::add_ref (void) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + ++this->refcount_; +} + +void +AsyncAccessManager::remove_ref (void) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + if (--this->refcount_ == 0) + { + delete this; + } +} //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- -ImR_Loc_ReplyHandler::ImR_Loc_ReplyHandler (AMH_ImplementationRepository::LocatorResponseHandler_ptr rh) - :rh_ (AMH_ImplementationRepository::LocatorResponseHandler::_duplicate(rh)) +ActivatorReceiver::ActivatorReceiver (AsyncAccessManager *aam, + PortableServer::POA_ptr poa) + :aam_ (aam), + poa_ (PortableServer::POA::_duplicate (poa)) { + this->aam_->add_ref (); } -ImR_Loc_ReplyHandler::~ImR_Loc_ReplyHandler (void) + +ActivatorReceiver::~ActivatorReceiver (void) { + this->aam_->remove_ref (); } void -ImR_Loc_ReplyHandler::send_ior (const char *) +ActivatorReceiver::start_server (void) { - rh_->activate_server (); // void return - delete this; + this->aam_->activator_replied (true); + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + poa_->deactivate_object (oid.in()); +} +void +ActivatorReceiver::start_server_excep (Messaging::ExceptionHolder *) +{ + this->aam_->activator_replied (false); + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + poa_->deactivate_object (oid.in()); +} + +void +ActivatorReceiver::shutdown (void) +{ + // no-op, just satisfy virtual function } void -ImR_Loc_ReplyHandler::send_exception (void) +ActivatorReceiver::shutdown_excep (Messaging::ExceptionHolder * ) { - CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code - ( TAO_IMPLREPO_MINOR_CODE, 0), - CORBA::COMPLETED_NO); - TAO_AMH_DSI_Exception_Holder h(&ex); - resp_->invoke_excep(&h); - delete this; + // no-op, just satisfy virtual function } + //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- -AsyncLiveListener::AsyncLiveListener (AsyncAccessManager &aam, LiveCheck &pinger) - :aam_ (aam), +AsyncLiveListener::AsyncLiveListener (const char *server, + AsyncAccessManager &aam, + LiveCheck &pinger) + :LiveListener (server), + aam_ (aam), pinger_ (pinger), - status_ (LS_UNKNOWN), + status_ (LS_UNKNOWN) { this->aam_.add_ref (); - this->pinger_.add_listener (this); } AsyncLiveListener::~AsyncLiveListener (void) @@ -88,14 +322,34 @@ AsyncLiveListener::~AsyncLiveListener (void) this->aam_.remove_ref (); } +bool +AsyncLiveListener::start (void) +{ + bool rtn = this->pinger_.add_listener (this); + if (!rtn) + delete this; + return rtn; +} + void AsyncLiveListener::status_changed (LiveStatus status, bool may_retry) { this->status_ = status; if (status == LS_TRANSIENT && may_retry) - this->pinger_.add_listener (this); + { + if (!this->pinger_.add_listener (this)) + { + ACE_DEBUG ((LM_DEBUG, + "AsyncLiveListener::status_changed, deleting(1)\n")); + this->aam_.ping_replied (false); + delete this; + } + } else { + ACE_DEBUG ((LM_DEBUG, + "AsyncLiveListener::status_changed, status = %d, deleting(2)\n", status)); this->aam_.ping_replied (status != LS_DEAD); + delete this; } } diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h index f915419d896..e450440cb2d 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h @@ -12,11 +12,12 @@ #include "locator_export.h" -#include "tao/ImR_Client/ServerObjectS.h" // ServerObject_AMIS.h - +#include "ImR_ActivatorS.h" // ImR_Activator_AMIS.h #include "ace/Vector_T.h" #include "ace/SString.h" +#include "Forwarder.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) #pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -25,7 +26,7 @@ class ImR_Locator_i; class ImR_ReplyHandler; -class UpdateableServerInfo; +struct Server_Info; //---------------------------------------------------------------------------- /* @@ -53,59 +54,77 @@ enum AAM_Status AAM_ACTIVATION_SENT, AAM_WAIT_FOR_RUNNING, AAM_WAIT_FOR_PING, + AAM_WAIT_FOR_ALIVE, AAM_SERVER_READY, - AAM_SERVER_DEAD + AAM_SERVER_DEAD, + AAM_NOT_MANUAL, + AAM_NO_ACTIVATOR }; class AsyncAccessManager { public: - AsyncAccessManager (UpdateableServerInfo &info, ImR_Locator_i &locator); + AsyncAccessManager (const Server_Info &info, + bool manual, + ImR_Locator_i &locator); + ~AsyncAccessManager (void); + bool has_server (const char *name); + void add_interest (ImR_ReplyHandler *rh); AAM_Status status (void) const; - void activator_replied (void); - void server_is_running (void); + void activator_replied (bool success); + void server_is_running (const char *partial_ior); void ping_replied (bool is_alive); void add_ref (void); void remove_ref (void); private: - UpdateableServerInfo &info_; - ImR_Locator_i &locator_; + void final_state (void); + void status (AAM_Status s); + bool send_start_request (void); + Server_Info *info_; + bool manual_start_; + ImR_Locator_i &locator_; + PortableServer::POA_var poa_; ACE_Vector rh_list_; AAM_Status status_; int refcount_; TAO_SYNCH_MUTEX lock_; - }; + + //---------------------------------------------------------------------------- /* - * @class ImR_Loc_ReplyHandler + * @class ActivatorReceiver + * + * @brief callback for handling asynch server startup requests * - * @brief specialized reply handler for Locator interface calls which have a - * void return. */ -class ImR_Loc_ReplyHandler : public ImR_ReplyHandler +class ActivatorReceiver : + public virtual POA_ImplementationRepository::AMI_ActivatorHandler { public: - ImR_Loc_ReplyHandler (ImplementationRepository::AMH_LocatorResponseHandler_ptr rh); - virtual ~ImR_Loc_ReplyHandler (void); + ActivatorReceiver (AsyncAccessManager *aam, PortableServer::POA_ptr poa); + virtual ~ActivatorReceiver (void); - virtual void send_ior (const char *pior); - virtual void send_exception (void); + void start_server (void); + void start_server_excep (Messaging::ExceptionHolder * excep_holder); -private: - ImplementationRepository::AMH_LocatorResponseHandler_var rh_; + void shutdown (void); + void shutdown_excep (Messaging::ExceptionHolder * excep_holder); +private: + AsyncAccessManager *aam_; + PortableServer::POA_var poa_; }; @@ -116,14 +135,17 @@ private: class AsyncLiveListener : public LiveListener { public: - AsyncLiveListener (AsyncAccessManager &aam, LiveCheck *pinger); - ~AsyncLiveListener (void); + AsyncLiveListener (const char * server, + AsyncAccessManager &aam, + LiveCheck &pinger); + virtual ~AsyncLiveListener (void); + bool start (void); void status_changed (LiveStatus status, bool may_retry); private: AsyncAccessManager &aam_; - LiveCheck *pinger_; + LiveCheck &pinger_; LiveStatus status_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp index 621a85f3efe..f1ad8da100f 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp @@ -55,6 +55,7 @@ createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) { ImR_Locator_i::ImR_Locator_i (void) : dsi_forwarder_ (*this) , ins_locator_ (0) + , aam_set_ () , debug_ (0) , read_only_ (false) , unregister_if_address_reused_ (false) @@ -497,6 +498,31 @@ ImR_Locator_i::activate_server_by_object (const char* object_name) } +void +ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, + bool manual_start, + ImR_ReplyHandler *rh) +{ + AsyncAccessManager *aam = 0; + if (info->activation_mode == ImplementationRepository::PER_CLIENT) + { + ACE_NEW (aam, AsyncAccessManager (*info, manual_start, *this)); + this->aam_set_.insert (aam); + } + else + { + aam = this->find_aam (info->name.c_str()); + if (aam == 0) + { + ACE_NEW (aam, AsyncAccessManager (*info, manual_start, *this)); + this->aam_set_.insert (aam); + } + } + aam->add_interest (rh); +} + + +#if 0 void ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, bool manual_start, @@ -518,6 +544,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, rh->send_exception (); } } +#endif char* ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, @@ -656,7 +683,7 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start, "No command line registered for server."); } - Activator_Info_Ptr ainfo = get_activator (info->activator); + Activator_Info_Ptr ainfo = this->get_activator (info->activator); if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ())) { @@ -668,7 +695,6 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start, "No activator registered for server."); } - ImplementationRepository::StartupInfo_var si; try { ++waiting_clients; @@ -1126,28 +1152,30 @@ ImR_Locator_i::server_is_running (const char* id, if (info.null ()) { if (this->debug_ > 0) - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: Auto adding NORMAL server <%C>.\n"), - name.c_str ())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ImR: Auto adding NORMAL server <%C>.\n"), + name.c_str ())); + } ImplementationRepository::EnvironmentList env (0); this->repository_->add_server (server_id, - name, - jacorb_server, - "", // no activator - "", // no cmdline - ImplementationRepository::EnvironmentList (), - "", // no working dir - ImplementationRepository::NORMAL, - DEFAULT_START_LIMIT, - partial_ior, - ior.in (), - ImplementationRepository::ServerObject::_nil () // Will connect at first access - ); + name, + jacorb_server, + "", // no activator + "", // no cmdline + ImplementationRepository::EnvironmentList (), + "", // no working dir + ImplementationRepository::NORMAL, + DEFAULT_START_LIMIT, + partial_ior, + ior.in (), + ImplementationRepository::ServerObject::_nil () + ); } else { +#if 0 if (info->server_id != server_id) { if (! info->server_id.empty()) @@ -1158,29 +1186,58 @@ ImR_Locator_i::server_is_running (const char* id, info.edit ()->server_id = server_id; } - if (info->activation_mode != ImplementationRepository::PER_CLIENT) { - info.edit ()->ior = ior.in (); - info.edit ()->partial_ior = partial_ior; - // Will connect at first access - info.edit ()->server = ImplementationRepository::ServerObject::_nil (); + if (info->activation_mode != ImplementationRepository::PER_CLIENT) + { + info.edit ()->ior = ior.in (); + info.edit ()->partial_ior = partial_ior; + // Will connect at first access + info.edit ()->server = ImplementationRepository::ServerObject::_nil (); - info.update_repo(); + info.update_repo(); - waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), false); - } else { - // Note : There's no need to unblock all the waiting request until - // we know the final status of the server. - if (info->waiting_clients > 0) - { - waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), true); + waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), false); } - else if (this->debug_ > 1) + else { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("ImR - Ignoring server_is_running due to no ") - ACE_TEXT ("waiting PER_CLIENT clients.\n"))); + // Note : There's no need to unblock all the waiting request until + // we know the final status of the server. + if (info->waiting_clients > 0) + { + waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), true); + } + else if (this->debug_ > 1) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ImR - Ignoring server_is_running due to no ") + ACE_TEXT ("waiting PER_CLIENT clients.\n"))); + } } +#else + + if (info->server_id != server_id) + { + if (! info->server_id.empty()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ImR - WARNING: server \"%C\" changed server id from ") + ACE_TEXT ("\"%C\" to \"%C\" waiting PER_CLIENT clients.\n"), + name.c_str (), info->server_id.c_str (), server_id.c_str ())); + info.edit ()->server_id = server_id; } + + if (info->activation_mode != ImplementationRepository::PER_CLIENT) + { + info.edit ()->ior = ior.in (); + info.edit ()->partial_ior = partial_ior; + info.edit ()->server = s; + + info.update_repo(); + + } + + AsyncAccessManager *aam = this->find_aam (name.c_str()); + if (aam != 0) + aam->server_is_running (partial_ior); +#endif } } @@ -1206,6 +1263,13 @@ ImR_Locator_i::server_is_shutting_down (const char* server) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server <%C> is shutting down.\n"), server)); + this->pinger_.remove_server (server); + AsyncAccessManager *aam = this->find_aam (server); + if (aam != 0) + { + this->remove_aam (aam); + } + info.edit ()->reset (); } @@ -1489,7 +1553,57 @@ ImR_Locator_i::is_alive (UpdateableServerInfo& info) int ImR_Locator_i::debug () const { - return debug_; + return this->debug_; +} + +LiveCheck& +ImR_Locator_i::pinger (void) +{ + return this->pinger_; +} + +PortableServer::POA_ptr +ImR_Locator_i::root_poa (void) +{ + return PortableServer::POA::_duplicate (this->root_poa_.in()); +} + +void +ImR_Locator_i::remove_aam (AsyncAccessManager *aam) +{ + this->aam_set_.remove (aam); + aam->remove_ref(); +} + +AsyncAccessManager * +ImR_Locator_i::find_aam (const char *name) +{ + + if (debug_ > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) find_aam: name = %s, aam_set_.size = %d\n"), + name, aam_set_.size())); + } + + for (AAM_Set::ITERATOR i(this->aam_set_); + !i.done(); + i.advance ()) + { + AsyncAccessManager **entry = 0; + i.next(entry); + if (*entry != 0 && (*entry)->has_server (name)) + { + return (*entry); + } + } + if (debug_ > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) find_aam: did not find\n"))); + } + + return 0; } //------------------------------------------------------------------------- @@ -1511,10 +1625,6 @@ SyncListener::is_alive (void) { this->status_ = this->pinger_.is_alive(this->server()); - // ACE_DEBUG ((LM_DEBUG, - // "ImR: SyncListener::is_alive() this = %x, server = <%C>, status = %d\n", - // this, this->server(), status_)); - if (this->status_ == LS_ALIVE) return true; else if (this->status_ == LS_DEAD) @@ -1525,7 +1635,10 @@ SyncListener::is_alive (void) { if (this->callback_) { - this->pinger_.add_listener (this); + if (!this->pinger_.add_listener (this)) + { + return false; + } } this->callback_ = false; ACE_Time_Value delay (10,0); @@ -1541,8 +1654,37 @@ SyncListener::status_changed (LiveStatus status, bool may_retry) this->callback_ = true; this->status_ = status; this->got_it_ = (status != LS_TRANSIENT) || (! may_retry); - // ACE_DEBUG ((LM_DEBUG, - // "SynchLisener(%x)::status_changed, got it = %d, status = %d\n", - // this, got_it_, status)); } +//--------------------------------------------------------------------------- +//--------------------------------------------------------------------------- +#if 0 +ImR_Loc_ReplyHandler::ImR_Loc_ReplyHandler (AMH_ImplementationRepository::LocatorResponseHandler_ptr rh) + :rh_ (AMH_ImplementationRepository::LocatorResponseHandler::_duplicate(rh)) +{ +} + +ImR_Loc_ReplyHandler::~ImR_Loc_ReplyHandler (void) +{ +} + +void +ImR_Loc_ReplyHandler::send_ior (const char *) +{ + rh_->activate_server (); // void return + delete this; + +} + +void +ImR_Loc_ReplyHandler::send_exception (void) +{ + CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code + ( TAO_IMPLREPO_MINOR_CODE, 0), + CORBA::COMPLETED_NO); + TAO_AMH_DSI_Exception_Holder h(&ex); + resp_->invoke_excep(&h); + delete this; +} + +#endif diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h index 742dfac7013..132a9459abc 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h @@ -15,12 +15,12 @@ #include "Server_Info.h" #include "ace/Auto_Ptr.h" #include "AsyncStartupWaiter_i.h" +#include "AsyncAccessManager.h" #include "tao/IORTable/IORTable.h" #include "ImR_LocatorS.h" #include "AsyncStartupWaiterS.h" #include "LiveCheck.h" -//#include "AsyncAccessManager.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -100,6 +100,13 @@ public: bool manual_start, ImR_ReplyHandler *rh); + LiveCheck &pinger (void); + PortableServer::POA_ptr root_poa (void); + Activator_Info_Ptr get_activator (const ACE_CString& name); + + void remove_aam (AsyncAccessManager *aam); + AsyncAccessManager *find_aam (const char *name); + private: char* activate_server_i (UpdateableServerInfo& info, @@ -126,7 +133,6 @@ private: void unregister_activator_i(const char* activator); - Activator_Info_Ptr get_activator (const ACE_CString& name); void connect_activator (Activator_Info& info); void auto_start_servers(void); @@ -138,7 +144,11 @@ private: PortableServer::POA_ptr findPOA(const char* name); - void parse_id(const char* id, ACE_CString& server_id, ACE_CString& name, bool& jacorb_server); + void parse_id(const char* id, + ACE_CString& server_id, + ACE_CString& name, + bool& jacorb_server); + private: // The class that handles the forwarding. @@ -154,6 +164,10 @@ private: /// The asynch server ping adapter LiveCheck pinger_; + /// A collection of asynch activator instances + typedef ACE_Unbounded_Set AAM_Set; + AAM_Set aam_set_; + CORBA::ORB_var orb_; PortableServer::POA_var root_poa_; PortableServer::POA_var imr_poa_; @@ -192,6 +206,26 @@ class SyncListener : public LiveListener bool callback_; }; +//---------------------------------------------------------------------------- +/* + * @class ImR_Loc_ReplyHandler + * + * @brief specialized reply handler for Locator interface calls which have a + * void return. + */ +class ImR_Loc_ReplyHandler : public ImR_ReplyHandler +{ +public: + ImR_Loc_ReplyHandler (ImplementationRepository::AMH_LocatorResponseHandler_ptr rh); + virtual ~ImR_Loc_ReplyHandler (void); + + virtual void send_ior (const char *pior); + virtual void send_exception (void); + +private: + ImplementationRepository::AMH_LocatorResponseHandler_var rh_; + +}; #include /**/ "ace/post.h" #endif /* IMR_LOCATOR_I_H */ diff --git a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc index d553232971f..2c12b0b3d85 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc +++ b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc @@ -76,7 +76,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb Activator_Info.cpp Adapter_Activator.cpp AsyncStartupWaiter_i.cpp -// AsyncAccessManager.cpp + AsyncAccessManager.cpp Forwarder.cpp ImR_Locator_i.cpp INS_Locator.cpp diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp index 3539b4548b0..c9ca79c4368 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -102,32 +102,14 @@ LiveEntry::status (LiveStatus l) ACE_Time_Value now (ACE_OS::time()); this->next_check_ = now + owner_->ping_interval(); } -#if 0 - for (ACE_Vector_Iterator i (this->listeners_); - !i.done(); - i.advance()) - { - LiveListener **ll = 0; - i.next(ll); - if (*ll != 0) - { - (*ll)->status_changed (this->liveliness_, this->reping_available()); - } - } -#else - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveEntry::status(%d), server = %s,") - // ACE_TEXT (" listeners.size = %d\n"), - // l, this->server_.c_str(), listeners_.size())); for (size_t i = 0; i < this->listeners_.size(); i++) { LiveListener *ll = this->listeners_[i]; if (ll != 0) { - (ll)->status_changed (this->liveliness_, this->reping_available()); + ll->status_changed (this->liveliness_, this->reping_available()); } } -#endif this->listeners_.clear(); } @@ -141,7 +123,12 @@ bool LiveEntry::do_ping (PortableServer::POA_ptr poa) { ACE_Time_Value now (ACE_OS::time()); - if (this->next_check_ > now || this->liveliness_ == LS_DEAD || this->ping_away_) + if (this->ping_away_) + { + return true; + } + + if (this->next_check_ > now || this->liveliness_ == LS_DEAD) { return false; } @@ -200,7 +187,6 @@ PingReceiver::~PingReceiver (void) void PingReceiver::ping (void) { - // ACE_DEBUG ((LM_DEBUG,"ping received\n")); this->entry_->status (LS_ALIVE); PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); poa_->deactivate_object (oid.in()); @@ -284,14 +270,28 @@ int LiveCheck::handle_timeout (const ACE_Time_Value &, const void *) { - for (LiveEntryMap::iterator i (this->entry_map_); - !i.done(); - i.advance ()) + for (LiveEntryMap::iterator le (this->entry_map_); + !le.done (); + le.advance ()) { LiveEntryMap::value_type *pair = 0; - i.next(pair); - pair->item()->do_ping(poa_.in()); + le.next(pair); + pair->item()->do_ping (poa_.in()); } + for (PerClientStack::ITERATOR pe (this->per_client_); + !pe.done (); + pe.advance ()) + { + LiveEntry *entry = 0; + LiveEntry **n = &entry; + pe.next(n); + if (entry != 0 && !entry->do_ping (poa_.in())) + { + this->per_client_.remove (entry); + } + + } + return 0; } @@ -322,37 +322,59 @@ LiveCheck::remove_server (const char *server) } void +LiveCheck::remove_per_client_entry (LiveEntry *e) +{ + this->per_client_.remove (e); +} + +bool +LiveCheck::add_per_client_listener (LiveListener *l, + ImplementationRepository::ServerObject_ptr ref) +{ + LiveEntry *entry = 0; + ACE_NEW_RETURN (entry, LiveEntry (this, 0, ref), false); + if (this->per_client_.push(entry) == 0) + { + entry->add_listener (l); + this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); + return true; + } + return false; +} + +bool LiveCheck::add_listener (LiveListener *l) { LiveEntry *entry = 0; ACE_CString key (l->server()); int result = entry_map_.find (key, entry); - - if (result == 0 && entry != 0) + if (result == -1 || entry == 0) { - entry->add_listener (l); - ACE_Time_Value now (ACE_OS::time()); - ACE_Time_Value next = entry->next_check (); + return false; + } - if (next <= now) - { - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("immediate callback for <%C>\n"), - // l, l->server())); - this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); - } - else - { - ACE_Time_Value delay = next - now; - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("callback in %d ms for <%C>\n"), - // l, delay.sec() * 1000 + delay.usec() / 1000, l->server())); - this->reactor()->schedule_timer (this, 0, delay); - } + entry->add_listener (l); + ACE_Time_Value now (ACE_OS::time()); + ACE_Time_Value next = entry->next_check (); + if (next <= now) + { + // ACE_DEBUG ((LM_DEBUG, + // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") + // ACE_TEXT ("immediate callback for <%C>\n"), + // l, l->server())); + this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); } + else + { + ACE_Time_Value delay = next - now; + // ACE_DEBUG ((LM_DEBUG, + // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") + // ACE_TEXT ("callback in %d ms for <%C>\n"), + // l, delay.sec() * 1000 + delay.usec() / 1000, l->server())); + this->reactor()->schedule_timer (this, 0, delay); + } + return true; } LiveStatus diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h index f914662b3e8..4778cc5451c 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h @@ -168,9 +168,15 @@ class Locator_Export LiveCheck : public ACE_Event_Handler void add_server (const char *server, ImplementationRepository::ServerObject_ptr ref); + void remove_server (const char *server); - void add_listener (LiveListener *waiter); + void remove_per_client_entry (LiveEntry *entry); + + bool add_listener (LiveListener *listener); + + bool add_per_client_listener (LiveListener *listener, + ImplementationRepository::ServerObject_ptr ref); LiveStatus is_alive (const char *server); @@ -182,8 +188,10 @@ class Locator_Export LiveCheck : public ACE_Event_Handler ACE_Hash, ACE_Equal_To, TAO_SYNCH_MUTEX> LiveEntryMap; + typedef ACE_Unbounded_Stack PerClientStack; LiveEntryMap entry_map_; + PerClientStack per_client_; PortableServer::POA_var poa_; ACE_Time_Value ping_interval_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h b/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h index 187debe8335..f9ab1888e63 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h +++ b/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h @@ -196,7 +196,6 @@ private: virtual int persistent_remove(const ACE_CString& name, bool activator); }; - /** * @class UpdateableServerInfo * @@ -222,10 +221,10 @@ public: UpdateableServerInfo(const Server_Info& si); /// destructor (updates repo if needed) - ~UpdateableServerInfo(); + ~UpdateableServerInfo(void); /// explicitly update repo if needed - void update_repo(); + void update_repo(void); /// const Server_Info access const Server_Info* operator->() const; @@ -235,13 +234,14 @@ public: /// retrieve smart pointer to non-const Server_Info /// and indicate repo update required - const Server_Info_Ptr& edit(); + const Server_Info_Ptr& edit(void); /// force indication of update needed - void needs_update(); + void needs_update(void); /// indicate it Server_Info_Ptr is null - bool null() const; + bool null(void) const; + private: UpdateableServerInfo(const UpdateableServerInfo& ); const UpdateableServerInfo& operator=(const UpdateableServerInfo& ); -- cgit v1.2.1