From 5fd897d1479642d19e699a81a25156fcdb9aab8f Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Tue, 19 Mar 2013 13:41:09 +0000 Subject: Tue Mar 19 13:31:43 UTC 2013 Phil Mesnier --- TAO/ChangeLog_Asynch_ImR | 16 ++ TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp | 71 ++++- TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h | 26 ++ TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc | 2 + TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 304 ++++++++++++++++++++++ TAO/orbsvcs/ImplRepo_Service/LiveCheck.h | 177 +++++++++++++ 6 files changed, 585 insertions(+), 11 deletions(-) create mode 100644 TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp create mode 100644 TAO/orbsvcs/ImplRepo_Service/LiveCheck.h diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR index b1d50608e83..5d925036e3c 100644 --- a/TAO/ChangeLog_Asynch_ImR +++ b/TAO/ChangeLog_Asynch_ImR @@ -1,3 +1,19 @@ +Tue Mar 19 13:31:43 UTC 2013 Phil Mesnier + + * orbsvcs/ImplRepo_Service/ImR_Locator_i.h: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp: + * orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc: + * orbsvcs/ImplRepo_Service/LiveCheck.h: + * orbsvcs/ImplRepo_Service/LiveCheck.cpp: + + Adding new mechanism for pinging servers asynchronously. When the IMR is + interested in the status of a server, it now registers a callback object and + a ping is scheduled. At the appropriate time, the ping request is sent using + AMI. The reply or exception is then handled asynchronously. + + For testing in an otherwise synchronous locator, a blocking ping reply waiter + is used. + Thu Mar 14 21:36:30 UTC 2013 Phil Mesnier * orbsvcs/ImplRepo_Service/AsyncStartupWaiter_i.cpp: diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp index 8dfa7737424..dc43da3a662 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp @@ -100,6 +100,7 @@ ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts) { this->dsi_forwarder_.init (orb); this->adapter_.init (& this->dsi_forwarder_); + this->pinger_.init (orb,ping_interval_); } else { @@ -435,7 +436,7 @@ ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start) ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - parse_id(name, server_id, serverKey, jacorb_server); + this->parse_id(name, server_id, serverKey, jacorb_server); UpdateableServerInfo info(this->repository_.get(), serverKey); if (info.null ()) { @@ -525,7 +526,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, // Note: We already updated info with StartupInfo in server_is_running () ImplementationRepository::StartupInfo_var si = - start_server (info, manual_start, info.edit()->waiting_clients); + this->start_server (info, manual_start, info.edit()->waiting_clients); } } @@ -539,7 +540,7 @@ ImR_Locator_i::activate_perclient_server_i (UpdateableServerInfo& shared_info, do { ImplementationRepository::StartupInfo* psi = - start_server (info, manual_start, shared_info.edit()->waiting_clients); + this->start_server (info, manual_start, shared_info.edit()->waiting_clients); // waiting_clients will be updated by each call to start_server shared_info.update_repo (); @@ -563,7 +564,8 @@ ImR_Locator_i::activate_perclient_server_i (UpdateableServerInfo& shared_info, } info.edit()->reset (); } - } while (info->start_count < info->start_limit); + } + while (info->start_count < info->start_limit); if (this->debug_ > 0) { @@ -629,8 +631,7 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start, ACE_TEXT ("ImR: Starting server <%C>. Attempt %d/%d.\n"), info->name.c_str (), info->start_count, info->start_limit)); } - ainfo->activator->start_server ( - info->name.c_str (), + ainfo->activator->start_server (info->name.c_str (), info->cmdline.c_str (), info->dir.c_str (), info->env_vars); @@ -806,7 +807,7 @@ ImR_Locator_i::add_or_update_server ( ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - parse_id(server, server_id, serverKey, jacorb_server); + this->parse_id(server, server_id, serverKey, jacorb_server); UpdateableServerInfo info(this->repository_.get(), serverKey); if (info.null ()) { @@ -914,7 +915,7 @@ ImR_Locator_i::remove_server (const char* name) ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - parse_id(name, server_id, serverKey, jacorb_server); + this->parse_id(name, server_id, serverKey, jacorb_server); Server_Info_Ptr info = this->repository_->get_server (serverKey); if (! info.null ()) { @@ -971,7 +972,7 @@ ImR_Locator_i::shutdown_server (const char* server) ACE_CString name; ACE_CString server_id; bool jacorb_server = false; - parse_id(server, server_id, name, jacorb_server); + this->parse_id(server, server_id, name, jacorb_server); UpdateableServerInfo info(this->repository_.get(), name); if (info.null ()) @@ -1044,7 +1045,7 @@ ImR_Locator_i::server_is_running (const char* id, ACE_CString server_id; ACE_CString name; bool jacorb_server = false; - parse_id(id, server_id, name, jacorb_server); + this->parse_id(id, server_id, name, jacorb_server); if (this->debug_ > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server %C is running at %C.\n"), @@ -1060,6 +1061,12 @@ ImR_Locator_i::server_is_running (const char* id, if (this->unregister_if_address_reused_) this->repository_->unregister_if_address_reused (server_id, name, partial_ior); + CORBA::Object_var obj = this->set_timeout_policy (server,ACE_Time_Value (1,0)); + ImplementationRepository::ServerObject_var s = + ImplementationRepository::ServerObject::_narrow (obj.in()); + + this->pinger_.add_server (server_id.c_str(), s); + UpdateableServerInfo info(this->repository_.get(), name); if (info.null ()) { @@ -1156,7 +1163,7 @@ ImR_Locator_i::find (const char* server, ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - parse_id(server, server_id, serverKey, jacorb_server); + this->parse_id(server, server_id, serverKey, jacorb_server); UpdateableServerInfo info(this->repository_.get(), serverKey); if (! info.null ()) { @@ -1648,3 +1655,45 @@ ImR_Locator_i::debug () const { return debug_; } + + +SyncListener::SyncListener (const char *server, + CORBA::ORB_ptr orb, + LiveCheck *pinger) + :LiveListener (server), + orb_ (CORBA::ORB::_duplicate (orb)), + pinger_ (pinger), + status_ (LS_UNKNOWN), + got_it_ (false), + retries_ (10) +{ +} + +bool +SyncListener::is_alive (void) +{ + + this->status_ = this->pinger_->is_alive(this->server().c_str()); + + if (this->status_ == LS_ALIVE) + return true; + + this->pinger_->add_listener (this); + while (!this->got_it_) + { + ACE_Time_Value delay (1,0); + this->orb_->perform_work (delay); + this->pinger_->add_listener (this); + } + this->got_it_ = false; + this->retries_ = 10; + return this->status_ != LS_DEAD; +} + +void +SyncListener::status_changed (LiveStatus status) +{ + this->status_ = status; + this->got_it_ = (status != LS_TRANSIENT) || (--this->retries_ == 0); +} + diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h index 0cf30dd85fa..f2dd9564848 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h @@ -10,6 +10,7 @@ #include "Adapter_Activator.h" #include "Activator_Info.h" #include "Forwarder.h" +#include "LiveCheck.h" #include "Locator_Options.h" #include "Server_Info.h" #include "ace/Auto_Ptr.h" @@ -18,6 +19,7 @@ #include "ImR_LocatorS.h" #include "AsyncStartupWaiterS.h" +#include "LiveCheck.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -142,6 +144,9 @@ private: /// The locator interface for the IORTable IORTable::Locator_var ins_locator_; + /// The asynch server ping adapter + LiveCheck pinger_; + CORBA::ORB_var orb_; PortableServer::POA_var root_poa_; PortableServer::POA_var imr_poa_; @@ -159,5 +164,26 @@ private: bool unregister_if_address_reused_; }; +class Locator_Export SyncListener : public LiveListener +{ + public: + SyncListener (const char *server, CORBA::ORB_ptr orb, LiveCheck *pinger); + + bool is_alive (void); + + void status_changed (LiveStatus status); + + private: + CORBA::ORB_var orb_; + LiveCheck *pinger_; + LiveStatus status_; + bool got_it_; + int retries_; + +}; + + + + #include /**/ "ace/post.h" #endif /* IMR_LOCATOR_I_H */ diff --git a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc index 1c44cbf19df..95616644e5a 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc +++ b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc @@ -71,6 +71,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb after += ImR_Locator_IDL ImR_Activator_IDL libs += TAO_ImR_Locator_IDL TAO_ImR_Activator_IDL avoids += uses_wchar + Source_Files { Activator_Info.cpp Adapter_Activator.cpp @@ -78,6 +79,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb ImR_Locator_i.cpp AsyncStartupWaiter_i.cpp INS_Locator.cpp + LiveCheck.cpp Locator_XMLHandler.cpp Locator_Loader.cpp Locator_Options.cpp diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp new file mode 100644 index 00000000000..09d67c35e66 --- /dev/null +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -0,0 +1,304 @@ +// -*- C++ -*- +// + +#include "LiveCheck.h" +#include "tao/ORB_Core.h" +#include "ace/Reactor.h" + +LiveListener::LiveListener (const char *server) + : server_(server) +{ +} + +const ACE_CString & +LiveListener::server (void) const +{ + return this->server_; +} + +//--------------------------------------------------------------------------- +//--------------------------------------------------------------------------- + +LiveEntry::LiveEntry (LiveCheck *owner, + ImplementationRepository::ServerObject_ptr ref) + : owner_ (owner), + ref_ (ImplementationRepository::ServerObject::_duplicate (ref)), + liveliness_ (LS_UNKNOWN), + next_check_ (ACE_OS::time()), + retry_count_ (0), + ping_away_ (false), + listeners_ (), + lock_ () +{ +} + +LiveEntry::~LiveEntry (void) +{ +} + +void +LiveEntry::add_listener (LiveListener* ll) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->listeners_.push_back (ll); +} + +LiveStatus +LiveEntry::status (void) const +{ + if ( this->liveliness_ == LS_ALIVE && + this->owner_->ping_interval() != ACE_Time_Value::zero ) + { + ACE_Time_Value now (ACE_OS::time()); + if (now >= this->next_check_) + { + return LS_UNKNOWN; + } + } + return this->liveliness_; +} + +void +LiveEntry::status (LiveStatus l) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->liveliness_ = l; + this->ping_away_ = false; + + if (l == LS_ALIVE) + { + this->retry_count_ = 0; + ACE_Time_Value now (ACE_OS::time()); + this->next_check_ = now + owner_->ping_interval(); + } + for (ACE_Vector_Iterator i (this->listeners_); + !i.done(); + i.advance()) + { + LiveListener *item = 0; + LiveListener **ll = &item; + i.next(ll); + if (item != 0) + { + item->status_changed (this->liveliness_); + } + } + this->listeners_.clear(); +} + +const ACE_Time_Value & +LiveEntry::next_check (void) const +{ + return this->next_check_; +} + +bool +LiveEntry::do_ping (PortableServer::POA_ptr poa) +{ + ACE_Time_Value now (ACE_OS::time()); + if (this->next_check_ > now || this->liveliness_ == LS_DEAD || this->ping_away_) + { + return false; + } + + if (this->owner_->ping_interval() == ACE_Time_Value::zero) + return false; + + switch (this->liveliness_) + { + case LS_UNKNOWN: + this->next_check_ = now; + break; + case LS_ALIVE: + case LS_TIMEDOUT: + this->next_check_ = now + owner_->ping_interval(); + break; + case LS_TRANSIENT: + this->next_check_ = now + ACE_Time_Value (0,5000); // retry delay + break; + default:; + } + + this->ping_away_ = true; + this->retry_count_++; + PortableServer::ServantBase_var callback = new PingReceiver (this, poa); + PortableServer::ObjectId_var oid = poa->activate_object (callback.in()); + CORBA::Object_var obj = poa->id_to_reference (oid.in()); + ImplementationRepository::AMI_ServerObjectHandler_var cb = + ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in()); + this->ref_->sendc_ping (cb.in()); + return true; +} + +//--------------------------------------------------------------------------- +//--------------------------------------------------------------------------- + +PingReceiver::PingReceiver (LiveEntry *entry, PortableServer::POA_ptr poa) + :poa_ (PortableServer::POA::_duplicate(poa)), + entry_ (entry) +{ +} + +PingReceiver::~PingReceiver (void) +{ +} + +void +PingReceiver::ping (void) +{ + this->entry_->status (LS_ALIVE); + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + poa_->deactivate_object (oid.in()); +} + +void +PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder) +{ + try + { + excep_holder->raise_exception (); + } + catch (CORBA::TRANSIENT &ex) + { + const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80; + switch (ex.minor () & BITS_5_THRU_12_MASK) + { + case TAO_POA_DISCARDING: + case TAO_POA_HOLDING: + { + this->entry_->status (LS_TRANSIENT); + break; + } + default: //case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: + { + this->entry_->status (LS_DEAD); + } + } + } + catch (CORBA::TIMEOUT &) + { + this->entry_->status (LS_TIMEDOUT); + } + catch (CORBA::Exception &) + { + this->entry_->status (LS_DEAD); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + poa_->deactivate_object (oid.in()); +} + +//--------------------------------------------------------------------------- +//--------------------------------------------------------------------------- + +LiveCheck::LiveCheck () + :ping_interval_() +{ +} + +LiveCheck::~LiveCheck (void) +{ + while (this->entry_map_.current_size() > 0) + { + LiveEntryMap::iterator i (this->entry_map_); + LiveEntryMap::value_type *pair = 0; + i.next (pair); + this->entry_map_.unbind(pair); + delete pair->item(); + delete pair; + } +} + +void +LiveCheck::init (CORBA::ORB_ptr orb, + const ACE_Time_Value &pi ) +{ + this->ping_interval_ = pi; + ACE_Reactor *r = orb->orb_core()->reactor(); + this->reactor (r); + CORBA::Object_var obj = orb->resolve_initial_references ("RootPOA"); + this->poa_ = PortableServer::POA::_narrow (obj.in()); +} + +const ACE_Time_Value & +LiveCheck::ping_interval (void) const +{ + return this->ping_interval_; +} + +int +LiveCheck::handle_timeout (const ACE_Time_Value &, + const void *) +{ + for (LiveEntryMap::iterator i (this->entry_map_); + !i.done(); + i.advance ()) + { + LiveEntryMap::value_type *pair = 0; + i.next(pair); + pair->item()->do_ping(poa_.in()); + } + return 0; +} + +void +LiveCheck::add_server (const char *server, + ImplementationRepository::ServerObject_ptr ref) +{ + ACE_CString s(server); + LiveEntry *entry = 0; + ACE_NEW (entry, LiveEntry(this,ref)); + int result = entry_map_.bind (s, entry); + if (result != 0) + { + LiveEntry *old = 0; + result = entry_map_.rebind (s, entry, old); + delete old; + } +} + +void +LiveCheck::remove_server (const char *server) +{ + ACE_CString s(server); + LiveEntry *entry = 0; + int result = entry_map_.unbind (s, entry); + if (result == 0) + delete entry; +} + +void +LiveCheck::add_listener (LiveListener *l) +{ + LiveEntry *entry = 0; + int result = entry_map_.find (l->server(), entry); + if (result == 0 && entry != 0) + { + entry->add_listener (l); + ACE_Time_Value now (ACE_OS::time()); + ACE_Time_Value next = entry->next_check (); + if (next <= now) + { + this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); + } + else + { + ACE_Time_Value delay = next - now; + this->reactor()->schedule_timer (this, 0, delay); + } + + } +} + +LiveStatus +LiveCheck::is_alive (const char *server) +{ + ACE_CString s(server); + LiveEntry *entry = 0; + int result = entry_map_.find (s, entry); + if (result == 0 && entry != 0) + { + return entry->status (); + } + return LS_DEAD; +} diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h new file mode 100644 index 00000000000..1170a056c03 --- /dev/null +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h @@ -0,0 +1,177 @@ +// -*- C++ -*- +/* + * @file LiveCheck.h + * + * $Id$ + * + * @author Phil Mesnier + */ + +#ifndef IMR_LIVECHECK_H_ +#define IMR_LIVECHECK_H_ + +#include "locator_export.h" + +#include "tao/ImR_Client/ServerObjectS.h" // ServerObject_AMIS.h + +#include "ace/Vector_T.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/SString.h" +#include "ace/Event_Handler.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class LiveCheck; + +/*--------------------------------------------------------------------------- + * @enum LiveStatus + * + * @brief indication of the known condition of a target server + * + * LS_UNKNOWN - The server hasn't yet been pinged + * LS_DEAD - The ping failed for reasons other than POA Activation + * LS_ALIVE - The server positively acknowledged a ping + * LS_TRANSIENT - The server connected, but acively raised a transient + * LS_TIMEDOUT - The server connected, but never returned any result. + */ +enum LiveStatus { + LS_UNKNOWN, + LS_DEAD, + LS_ALIVE, + LS_TRANSIENT, + LS_TIMEDOUT +}; + +/*--------------------------------------------------------------------------- + * @class LiveListener + * + * @brief An interface for receiving asynch liveness status updates + * + * The code waiting on a confirmation of liveness status creates an instance + * of a LiveListener and registers it with the LiveCheck object. + * When the desired ping occurs, the status_changed method is called and the + * listener is unregistered. It is up to the owner of the listener to re- + * register if the ping result was inconclusive, such as a status of TRANSIENT + * or TIMEDOUT. Such a decision is based on configuration settings. + */ +class Locator_Export LiveListener +{ + public: + /// Construct a new listener. The server name suppled is used to + /// look up a listener entry in the LiveCheck map. + LiveListener (const char *server); + + /// called by the asynch ping receiver when a reply or an exception + /// is received. + virtual void status_changed (LiveStatus status) = 0; + + /// accessor for the server name. Used by the LiveCheck to associate a listener + const ACE_CString & server (void) const; + + private: + ACE_CString server_; +}; + +/*--------------------------------------------------------------------------- + * @class LiveEntry + * + * @brief Contains a list of interested listeners for a server + * + * Each server the Locator is interested in has a live entry instance. + * This holds the liveliness status and determines the next allowed time + * for a ping. Instances of the LiveEntry class are retained until the + * locator is no longer interested in the target server. + */ +class Locator_Export LiveEntry +{ + public: + LiveEntry (LiveCheck *owner, ImplementationRepository::ServerObject_ptr ref); + ~LiveEntry (void); + + void add_listener (LiveListener * ll); + LiveStatus status (void) const; + void status (LiveStatus l); + bool do_ping (PortableServer::POA_ptr poa); + const ACE_Time_Value &next_check (void) const; + + private: + LiveCheck *owner_; + ImplementationRepository::ServerObject_var ref_; + LiveStatus liveliness_; + ACE_Time_Value next_check_; + short retry_count_; + bool ping_away_; + ACE_Vector listeners_; + TAO_SYNCH_MUTEX lock_; +}; + +/*--------------------------------------------------------------------------- + * @class PingReceiver + * + * @brief callback handler for asynch ping requests + * + * An instance of the ping receiver is used to handle the reply from a ping + * request. Instances are created for the ping, then destroyed. +`*/ +class Locator_Export PingReceiver : + public virtual POA_ImplementationRepository::AMI_ServerObjectHandler +{ + public: + PingReceiver (LiveEntry * entry, PortableServer::POA_ptr poa); + virtual ~PingReceiver (void); + + void ping (void); + void ping_excep (Messaging::ExceptionHolder * excep_holder); + + private: + PortableServer::POA_var poa_; + LiveEntry * entry_; +}; + +/*--------------------------------------------------------------------------- + * @class LiveCheck + * + * @brief The manager class used for pinging servers as needed. + * + * The LiveCheck class maintains a map of named LiveEntries. When the locator + * needs to determine the liveliness of a server, registers a LiveListener + * for the desired server. A ping to the server is then scheduled, based on the + * limits determined by the entry's state. + */ +class Locator_Export LiveCheck : public ACE_Event_Handler +{ + public: + LiveCheck (); + ~LiveCheck (void); + + void init (CORBA::ORB_ptr orb, const ACE_Time_Value &interval); + + int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); + + void add_server (const char *server, + ImplementationRepository::ServerObject_ptr ref); + void remove_server (const char *server); + + void add_listener (LiveListener *waiter); + + LiveStatus is_alive (const char *server); + + const ACE_Time_Value &ping_interval (void) const; + + private: + typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + TAO_SYNCH_MUTEX> LiveEntryMap; + + LiveEntryMap entry_map_; + PortableServer::POA_var poa_; + ACE_Time_Value ping_interval_; +}; + +#endif /* IMR_LIVECHECK_H_ */ + -- cgit v1.2.1