diff options
author | Phil Mesnier <mesnier_p@ociweb.com> | 2013-03-27 00:01:12 +0000 |
---|---|---|
committer | Phil Mesnier <mesnier_p@ociweb.com> | 2013-03-27 00:01:12 +0000 |
commit | 35ef2426915e032ab1f294c95f6199a0cf24cf81 (patch) | |
tree | 59f8f24dd2ef48d54d4fc2c5944871d7a1c3b6d4 | |
parent | 0cb590dc04eabfdfa492d4511871a9d3b5a0cf3d (diff) | |
download | ATCD-35ef2426915e032ab1f294c95f6199a0cf24cf81.tar.gz |
Tue Mar 26 23:58:50 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
-rw-r--r-- | TAO/ChangeLog_Asynch_ImR | 19 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.cpp | 43 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.h | 10 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp | 9 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h | 3 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp | 139 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Forwarder.h | 47 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp | 296 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h | 7 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 74 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/LiveCheck.h | 18 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Locator_Options.cpp | 24 | ||||
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/Locator_Options.h | 5 |
13 files changed, 147 insertions, 547 deletions
diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR index 8edf3c4d262..f5a9875a436 100644 --- a/TAO/ChangeLog_Asynch_ImR +++ b/TAO/ChangeLog_Asynch_ImR @@ -1,3 +1,22 @@ +Tue Mar 26 23:58:50 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com> + + * orbsvcs/ImplRepo_Service/Adapter_Activator.h: + * orbsvcs/ImplRepo_Service/Adapter_Activator.cpp: + * orbsvcs/ImplRepo_Service/AsyncAccessManager.h: + * orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp: + * orbsvcs/ImplRepo_Service/Forwarder.h: + * orbsvcs/ImplRepo_Service/Forwarder.cpp: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.h: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp: + * orbsvcs/ImplRepo_Service/LiveCheck.h: + * orbsvcs/ImplRepo_Service/LiveCheck.cpp: + * orbsvcs/ImplRepo_Service/Locator_Options.h: + * orbsvcs/ImplRepo_Service/Locator_Options.cpp: + + completed asynch ping tracking and event delivery. Removed sync/asynch + behavior option, it didn't work and it was temporary anyway. Stripped the + old code that was optional. + Tue Mar 26 16:35:26 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com> * orbsvcs/ImplRepo_Service/Forwarder.cpp: diff --git a/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.cpp b/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.cpp index c1929827e24..7901744ad84 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.cpp @@ -15,20 +15,14 @@ #include "tao/PortableServer/Servant_Base.h" ImR_Adapter::ImR_Adapter (void) - : servant_locator_ (0), - default_servant_ (0) + : default_servant_ (0) { } void -ImR_Adapter::init (PortableServer::ServantLocator_ptr locator) -{ - servant_locator_ = locator; -} - -void ImR_Adapter::init (TAO_ServantBase * servant) { + ACE_DEBUG ((LM_DEBUG, "ImR_Adapter::init with default servant\n")); default_servant_ = servant; } @@ -41,8 +35,7 @@ ImR_Adapter::unknown_adapter (PortableServer::POA_ptr parent, CORBA::PolicyList policies (3); const char *exception_message = "Null Message"; - bool use_loc = this->servant_locator_ != 0; - policies.length (use_loc ? 2 : 3); + policies.length (3); try { // Servant Retention Policy @@ -53,19 +46,10 @@ ImR_Adapter::unknown_adapter (PortableServer::POA_ptr parent, // Request Processing Policy exception_message = "While PortableServer::POA::create_request_processing_policy"; - if (use_loc) - { - policies[1] = - parent->create_request_processing_policy (PortableServer::USE_SERVANT_MANAGER); - } - else - { - policies[1] = - parent->create_request_processing_policy (PortableServer::USE_DEFAULT_SERVANT); - policies[2] = - parent->create_id_uniqueness_policy (PortableServer::MULTIPLE_ID); - } - + policies[1] = + parent->create_request_processing_policy (PortableServer::USE_DEFAULT_SERVANT); + policies[2] = + parent->create_id_uniqueness_policy (PortableServer::MULTIPLE_ID); PortableServer::POAManager_var poa_manager = parent->the_POAManager (); @@ -86,17 +70,8 @@ ImR_Adapter::unknown_adapter (PortableServer::POA_ptr parent, exception_message = "While child->the_activator"; child->the_activator (this); - if (use_loc) - { - exception_message = "While set_servant_manager"; - child->set_servant_manager (this->servant_locator_); - } - else - { - exception_message = "While set_servant"; - child->set_servant (this->default_servant_); - } - + exception_message = "While set_servant"; + child->set_servant (this->default_servant_); } catch (const CORBA::Exception& ex) { diff --git a/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.h b/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.h index 3ff4da1e789..8a56836c0f1 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.h +++ b/TAO/orbsvcs/ImplRepo_Service/Adapter_Activator.h @@ -16,7 +16,6 @@ #include "tao/PortableServer/PortableServer.h" #include "tao/PortableServer/AdapterActivatorC.h" -#include "tao/PortableServer/ServantLocatorC.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -32,10 +31,10 @@ class TAO_ServantBase; * * @brief Implementation Repository Adapter Activator * - * Part of the ServantLocator/AdapterActivator combination that is used to + * Part of the DefaultServant/AdapterActivator combination that is used to * receive forwardable requests from clients. The Adapter Activator creates * the POA structure that the request expects. For each POA created, the - * same ServantLocator will be registered in each one. + * same DefaultServant will be registered in each one. */ class ImR_Adapter : public PortableServer::AdapterActivator, @@ -50,12 +49,9 @@ public: const char *name ); - void init(PortableServer::ServantLocator_ptr locator); void init(TAO_ServantBase * servant); private: - /// The ServantLocator registered in each new POA. - PortableServer::ServantLocator_ptr servant_locator_; - /// Alternatively, the default servant used. + /// the default servant used. TAO_ServantBase *default_servant_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp index cc09a4d0687..daa4efdc025 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp @@ -78,7 +78,6 @@ AsyncLiveListener::AsyncLiveListener (AsyncAccessManager &aam, LiveCheck &pinger :aam_ (aam), pinger_ (pinger), status_ (LS_UNKNOWN), - retries_ (10) { this->aam_.add_ref (); this->pinger_.add_listener (this); @@ -90,11 +89,13 @@ AsyncLiveListener::~AsyncLiveListener (void) } void -AsyncLiveListener::status_changed (LiveStatus status) +AsyncLiveListener::status_changed (LiveStatus status, bool may_retry) { this->status_ = status; - if (status == LS_TRANSIENT && --this->retries_ > 0) + if (status == LS_TRANSIENT && may_retry) this->pinger_.add_listener (this); else - this->aam_.ping_replied (status != LS_DEAD); + { + this->aam_.ping_replied (status != LS_DEAD); + } } diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h index 2fa92ad6c0e..f915419d896 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h @@ -119,13 +119,12 @@ class AsyncLiveListener : public LiveListener AsyncLiveListener (AsyncAccessManager &aam, LiveCheck *pinger); ~AsyncLiveListener (void); - void status_changed (LiveStatus status); + void status_changed (LiveStatus status, bool may_retry); private: AsyncAccessManager &aam_; LiveCheck *pinger_; LiveStatus status_; - int retries_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp index 55c92f9e6da..6eed13839b0 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp @@ -28,145 +28,6 @@ #include <tao/DynamicInterface/AMH_DSI_Response_Handler.h> #include <tao/Messaging/AMH_Response_Handler.h> -/** -* This constructor takes in orb and ImR_Locator_i pointers to store for later -* use. It also grabs a reference to the POACurrent object for use in -* preinvoke. -*/ -ImR_Forwarder::ImR_Forwarder (ImR_Locator_i& imr_impl) - : locator_ (imr_impl) - , orb_ (0) -{ -} - -void -ImR_Forwarder::init (CORBA::ORB_ptr orb) -{ - ACE_ASSERT (! CORBA::is_nil(orb)); - this->orb_ = orb; - try - { - CORBA::Object_var tmp = - orb->resolve_initial_references ("POACurrent"); - - this->poa_current_var_ = - PortableServer::Current::_narrow (tmp.in ()); - } - catch (const CORBA::Exception&) - { - ACE_DEBUG ((LM_DEBUG, "ImR_Forwarder::init() Exception ignored.\n")); - } - ACE_ASSERT (!CORBA::is_nil (this->poa_current_var_.in ())); -} - -/** -* We figure out the intended recipient from the POA name. After activating -* the server, we throw a forwarding exception to the correct server. -* -* The big complicated thing here is that we have to create the forwarding -* ior based on what we already have. So we combine the endpoint received -* from activate_server_i and append the objectid from the request to it. -*/ -PortableServer::Servant -ImR_Forwarder::preinvoke (const PortableServer::ObjectId &, - PortableServer::POA_ptr poa, - const char *, - PortableServer::ServantLocator::Cookie &) -{ - ACE_ASSERT (! CORBA::is_nil(poa)); - CORBA::Object_var forward_obj; - - try - { - CORBA::String_var server_name = poa->the_name(); - - if (locator_.debug() > 1) - ACE_DEBUG ((LM_DEBUG, "ImR: Activating server <%s>.\n", server_name.in())); - - // The activator stores a partial ior with each server. We can - // just tack on the current ObjectKey to get a valid ior for - // the desired server. - CORBA::String_var pior = locator_.activate_server_by_name (server_name.in (), false); - - ACE_CString ior = pior.in (); - - // Check that the returned ior is the expected partial ior with - // missing ObjectKey. - if (ior.find ("corbaloc:") != 0 || ior[ior.length () - 1] != '/') - { - ACE_ERROR ((LM_ERROR, "ImR_Forwarder::preinvoke () Invalid corbaloc ior.\n\t<%s>\n", ior.c_str())); - throw CORBA::OBJECT_NOT_EXIST ( - CORBA::SystemException::_tao_minor_code ( - TAO_IMPLREPO_MINOR_CODE, - 0), - CORBA::COMPLETED_NO); - } - - CORBA::String_var key_str; - // Unlike POA Current, this implementation cannot be cached. - TAO::Portable_Server::POA_Current* tao_current = - dynamic_cast <TAO::Portable_Server::POA_Current*> (this->poa_current_var_.in ()); - - ACE_ASSERT(tao_current != 0); - TAO::Portable_Server::POA_Current_Impl* impl = tao_current->implementation (); - TAO::ObjectKey::encode_sequence_to_string (key_str.out (), impl->object_key ()); - - ior += key_str.in(); - - if (locator_.debug() > 0) - ACE_DEBUG ((LM_DEBUG, "ImR: Forwarding invocation on <%s> to <%s>\n", server_name.in(), ior.c_str())); - - forward_obj = - this->orb_->string_to_object (ior.c_str ()); - } - catch (const ImplementationRepository::CannotActivate&) - { - throw CORBA::TRANSIENT ( - CORBA::SystemException::_tao_minor_code ( - TAO_IMPLREPO_MINOR_CODE, - 0), - CORBA::COMPLETED_NO); - } - catch (const ImplementationRepository::NotFound&) - { - throw CORBA::TRANSIENT ( - CORBA::SystemException::_tao_minor_code ( - TAO_IMPLREPO_MINOR_CODE, - 0), - CORBA::COMPLETED_NO); - } - catch (const CORBA::Exception& ex) - { - ex._tao_print_exception ("Forwarder"); - throw CORBA::TRANSIENT ( - CORBA::SystemException::_tao_minor_code ( - TAO_IMPLREPO_MINOR_CODE, - 0), - CORBA::COMPLETED_NO); - } - - if (!CORBA::is_nil (forward_obj.in ())) - throw PortableServer::ForwardRequest (forward_obj.in ()); - - ACE_ERROR ((LM_ERROR, "Error: Forward_to reference is nil.\n")); - throw CORBA::OBJECT_NOT_EXIST ( - CORBA::SystemException::_tao_minor_code ( - TAO_IMPLREPO_MINOR_CODE, - 0), - CORBA::COMPLETED_NO); -} - -void -ImR_Forwarder::postinvoke (const PortableServer::ObjectId &, - PortableServer::POA_ptr, - const char *, - PortableServer::ServantLocator::Cookie, - PortableServer::Servant) -{ -} - -//------------------------------------------------------------------------------------- - ImR_DSI_Forwarder::ImR_DSI_Forwarder (ImR_Locator_i& imr_impl) : locator_ (imr_impl) , orb_ (0) diff --git a/TAO/orbsvcs/ImplRepo_Service/Forwarder.h b/TAO/orbsvcs/ImplRepo_Service/Forwarder.h index 23932226962..e83271b31c4 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Forwarder.h +++ b/TAO/orbsvcs/ImplRepo_Service/Forwarder.h @@ -30,52 +30,7 @@ class ImR_Locator_i; -//--------------------------------------------------------------------------- -/** - * @class ImR_Forwarder: - * - * @brief Implementation Repository Forwarder - * - * This class provides a ServantLocator implementation that - * is used to handle arbitrary calls and forward them to the - * correct place. - */ -class ImR_Forwarder - : public PortableServer::ServantLocator, - public CORBA::LocalObject -{ -public: - ImR_Forwarder (ImR_Locator_i& imr_impl); - - /// Called before the invocation begins. - virtual PortableServer::Servant preinvoke ( - const PortableServer::ObjectId &oid, - PortableServer::POA_ptr poa, - const char * operation, - PortableServer::ServantLocator::Cookie &cookie); - - virtual void postinvoke ( - const PortableServer::ObjectId & oid, - PortableServer::POA_ptr adapter, - const char * operation, - PortableServer::ServantLocator::Cookie the_cookie, - PortableServer::Servant the_servant); - - void init(CORBA::ORB_ptr orb); - -private: - /// Where we find out where to forward to. - ImR_Locator_i& locator_; - - /// POA reference. - PortableServer::Current_var poa_current_var_; - - /// Variable to save the ORB reference passed to the constr. - CORBA::ORB_ptr orb_; -}; - - -//--------------------------------------------------------------------------- +//---------------------------------------------------------------------------- /** * @class ImR_ReplyHandler * diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp index 7419e598256..621a85f3efe 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp @@ -20,9 +20,6 @@ static const int DEFAULT_START_LIMIT = 1; -static const int PING_RETRY_SCHEDULE[] = {0, 10, 100, 500, 1000, 1000, 1000, - 1000, 5000, 5000}; - static const ACE_Time_Value DEFAULT_SERVER_TIMEOUT (0, 10 * 1000); // 10ms /// We want to give shutdown a little more time to work, so that we @@ -56,13 +53,11 @@ createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) { } ImR_Locator_i::ImR_Locator_i (void) - : forwarder_ (*this) - , dsi_forwarder_ (*this) + : dsi_forwarder_ (*this) , ins_locator_ (0) , debug_ (0) , read_only_ (false) , unregister_if_address_reused_ (false) - , use_asynch_ (true) { // Visual C++ 6.0 is not smart enough to do a direct assignment // while allocating the INS_Locator. So, we have to do it in @@ -97,19 +92,9 @@ ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts) this->root_poa_ = PortableServer::POA::_narrow (obj.in ()); ACE_ASSERT (! CORBA::is_nil (this->root_poa_.in ())); - if (opts.use_asynch()) - { - this->use_asynch_ = true; - this->dsi_forwarder_.init (orb); - this->adapter_.init (& this->dsi_forwarder_); - this->pinger_.init (orb, ping_interval_); - } - else - { - this->use_asynch_ = true; - this->forwarder_.init (orb); - this->adapter_.init (& this->forwarder_); - } + this->dsi_forwarder_.init (orb); + this->adapter_.init (& this->dsi_forwarder_); + this->pinger_.init (orb, ping_interval_); // Register the Adapter_Activator reference to be the RootPOA's // Adapter Activator. @@ -439,8 +424,8 @@ ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start) ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - this->parse_id(name, server_id, serverKey, jacorb_server); - UpdateableServerInfo info(this->repository_.get(), serverKey); + this->parse_id (name, server_id, serverKey, jacorb_server); + UpdateableServerInfo info (this->repository_.get(), serverKey); if (info.null ()) { ACE_ERROR (( @@ -467,8 +452,8 @@ ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start, ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; - this->parse_id(name, server_id, serverKey, jacorb_server); - UpdateableServerInfo info(this->repository_.get(), serverKey); + this->parse_id (name, server_id, serverKey, jacorb_server); + UpdateableServerInfo info (this->repository_.get(), serverKey); if (info.null ()) { ACE_ERROR (( @@ -1494,239 +1479,11 @@ ImR_Locator_i::connect_server (UpdateableServerInfo& info) bool ImR_Locator_i::is_alive (UpdateableServerInfo& info) { - if (this->use_asynch_) - { - this->connect_server (info); - SyncListener listener (info->name.c_str(), - this->orb_.in(), - this->pinger_); - return listener.is_alive(); - } - - const size_t table_size = sizeof (PING_RETRY_SCHEDULE) / - sizeof (*PING_RETRY_SCHEDULE); - - for (size_t i = 0; i < table_size; ++i) - { - int status = this->is_alive_i (info); - if (status == 0) - return false; - if (status == 1) - return true; - - // This is evil, but there's not much else we can do for now. We - // should never reach this code once the ImR Servers are fixed - // so that they don't lie about server_is_running. Currently, - // they send this notification during poa creation. We have to - // run the orb, because the very thing that may be slowing the - // aliveness of the servers is the fact that they're trying to - // register more objects with us. In practical testing, we - // never retried the ping more than once, because the second - // ping always timed out, even if the servers poa manager had - // not been activated. The only way we saw multiple retries was - // if we ran the orb on the server before the poa manager was - // activated. For this reason, the first retry is immediate, - // and the orb->run () call is not required. The call will - // likely timeout, and is_alive will return true. - if (PING_RETRY_SCHEDULE[i] > 0) - { - ACE_Time_Value tv (0, PING_RETRY_SCHEDULE[i] * 1000); - this->orb_->run (tv); - } - } - if (debug_ > 0) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Ping retry count exceeded. alive=maybe.\n"), - info->name.c_str ())); - } - // We return true here, because the server *might* be alive, it's just - // not starting in a timely manner. We can't return false, because then - // we'll just try to start another instance, and the same thing will - // likely happen. - info.edit ()->last_ping = ACE_OS::gettimeofday (); - return true; -} - -int -ImR_Locator_i::is_alive_i (UpdateableServerInfo& info) -{ - // This is used by the ACE_TRY below when exceptions are turned off. - - if (info->ior.length () == 0 || info->partial_ior.length () == 0) - { - if (debug_ > 1) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("ImR: <%C> not running. alive=false.\n"), - info->name.c_str ())); - } - info.edit ()->last_ping = ACE_Time_Value::zero; - return 0; - } - - if (ping_interval_ == ACE_Time_Value::zero) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Ping verification disabled. alive=true.\n"), - info->name.c_str ())); - } - return 1; - } - - if ((ACE_OS::gettimeofday () - info->last_ping) < ping_interval_) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> within ping interval. alive=true.\n"), - info->name.c_str ())); - } - return 1; - } - - // If we don't have enough information to start the server if it isn't already - // then we might as well assume it is running. That way the client can get the - // status directly from the server. - if (info->cmdline.length () == 0 || ! repository_->has_activator (info->activator)) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: Ping verification skipped. <%C> not startable.\n"), - info->name.c_str ())); - } - return 1; - } - this->connect_server (info); - - if (CORBA::is_nil (info->server.in ())) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Could not connect. alive=false.\n"), - info->name.c_str ())); - } - return 0; - } - - try - { - // Make a copy, in case the info is updated during the ping. - ImplementationRepository::ServerObject_var server = info->server; - - // This will timeout if it takes too long - server->ping (); - - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Ping successful. alive=true\n"), - info->name.c_str ())); - } - info.edit ()->last_ping = ACE_OS::gettimeofday (); - } - catch (const CORBA::TRANSIENT& ex) - { - const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80; - switch (ex.minor () & BITS_5_THRU_12_MASK) - { - case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Local TRANSIENT. alive=false.\n"), - info->name.c_str ())); - } - } - info.edit ()->last_ping = ACE_Time_Value::zero; - return 0; - case TAO_POA_DISCARDING: - case TAO_POA_HOLDING: - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Remote TRANSIENT. alive=maybe.\n"), - info->name.c_str ())); - } - } - return -1; // We keep trying to ping, because returning 1 now, would just lead - // to clients getting the same exception. If we can't ping after several - // attempts, then we'll give up and return 1, letting the client worry about it. - default: - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> TRANSIENT exception. alive=false.\n"), - info->name.c_str ())); - } - info.edit ()->last_ping = ACE_Time_Value::zero; - } - return 0; - } - } - catch (const CORBA::TIMEOUT& ex) - { - if (ex.completed() == CORBA::COMPLETED_NO) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Ping timed out during connection. ") - ACE_TEXT ("alive=false.\n"), - info->name.c_str ())); - } - info.edit ()->last_ping = ACE_Time_Value::zero; - // still potentially ambiguous, the server could be so busy - // it couldn't even accept a connection. However the more - // likely assumption is the server is on windows, and is dead, - // but the host ignored the request rather than rejecting it. - return 0; - } - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Ping timed out, maybe completed. ") - ACE_TEXT ("alive=true.\n"), - info->name.c_str ())); - } - return 1; // This is "alive" as far as we're concerned. Presumably the client - // will have a less stringent timeout policy, or will want to know - // about the timeout. In any case, we're only guaranteeing that the - // server is alive, not that it's responsive. - } - catch (const CORBA::Exception& ex) - { - if (debug_ > 1) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("ImR: <%C> Unexpected Ping exception. alive=false\n"), - info->name.c_str ())); - ex._tao_print_exception ("\n"); - } - info.edit ()->last_ping = ACE_Time_Value::zero; - return 0; - } - return 1; + SyncListener listener (info->name.c_str(), + this->orb_.in(), + this->pinger_); + return listener.is_alive(); } int @@ -1745,40 +1502,47 @@ SyncListener::SyncListener (const char *server, pinger_ (pinger), status_ (LS_UNKNOWN), got_it_ (false), - retries_ (10) + callback_ (false) { } bool SyncListener::is_alive (void) { + this->status_ = this->pinger_.is_alive(this->server()); - this->status_ = this->pinger_.is_alive(this->server().c_str()); + // ACE_DEBUG ((LM_DEBUG, + // "ImR: SyncListener::is_alive() this = %x, server = <%C>, status = %d\n", + // this, this->server(), status_)); if (this->status_ == LS_ALIVE) return true; else if (this->status_ == LS_DEAD) return false; - int count = this->retries_; - this->pinger_.add_listener (this); + this->callback_ = true; while (!this->got_it_) { - ACE_Time_Value delay (1,0); + if (this->callback_) + { + this->pinger_.add_listener (this); + } + this->callback_ = false; + ACE_Time_Value delay (10,0); this->orb_->perform_work (delay); - if (count != this->retries_) - this->pinger_.add_listener (this); } this->got_it_ = false; - this->retries_ = 10; return this->status_ != LS_DEAD; } void -SyncListener::status_changed (LiveStatus status) +SyncListener::status_changed (LiveStatus status, bool may_retry) { + this->callback_ = true; this->status_ = status; - this->got_it_ = (status != LS_TRANSIENT) || (--this->retries_ == 0); - ACE_DEBUG ((LM_DEBUG, "SynchLisener::status_changed, got it = %d, status = %d\n", got_it_, status)); + this->got_it_ = (status != LS_TRANSIENT) || (! may_retry); + // ACE_DEBUG ((LM_DEBUG, + // "SynchLisener(%x)::status_changed, got it = %d, status = %d\n", + // this, got_it_, status)); } diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h index a1644d98d54..742dfac7013 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h @@ -142,9 +142,7 @@ private: private: // The class that handles the forwarding. - ImR_Forwarder forwarder_; - // The alternative forwarder uses DSI/AMH for asynch forwarding. ImR_DSI_Forwarder dsi_forwarder_; // Used for the forwarding of any type of POA. @@ -171,7 +169,6 @@ private: ACE_Time_Value startup_timeout_; ACE_Time_Value ping_interval_; bool unregister_if_address_reused_; - bool use_asynch_; }; //---------------------------------------------------------------------------- @@ -185,14 +182,14 @@ class SyncListener : public LiveListener bool is_alive (void); - void status_changed (LiveStatus status); + void status_changed (LiveStatus status, bool may_retry); private: CORBA::ORB_var orb_; LiveCheck &pinger_; LiveStatus status_; bool got_it_; - int retries_; + bool callback_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp index 89747d2795d..3539b4548b0 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -7,11 +7,11 @@ #include "ace/OS_NS_time.h" LiveListener::LiveListener (const char *server) - : server_(server) + : server_(server) { } -const ACE_CString & +const char * LiveListener::server (void) const { return this->server_; @@ -20,13 +20,43 @@ LiveListener::server (void) const //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- +const int LiveEntry::reping_msec_[] = {0, 10, 100, 500, 1000, 1000, + 1000, 1000, 5000, 5000}; +int LiveEntry::reping_limit_ = sizeof (LiveEntry::reping_msec_) / sizeof (int); + +void +LiveEntry::set_reping_limit (int max) +{ + LiveEntry::reping_limit_ = max; +} + +bool +LiveEntry::reping_available (void) +{ + return this->repings_ < LiveEntry::reping_limit_; +} + +int +LiveEntry::next_reping (void) +{ + if ( this->repings_ < LiveEntry::reping_limit_) + { + return LiveEntry::reping_msec_[this->repings_++]; + } + else + return -1; +} + LiveEntry::LiveEntry (LiveCheck *owner, + const char *server, ImplementationRepository::ServerObject_ptr ref) : owner_ (owner), + server_ (server), ref_ (ImplementationRepository::ServerObject::_duplicate (ref)), liveliness_ (LS_UNKNOWN), next_check_ (ACE_OS::time()), retry_count_ (0), + repings_ (0), ping_away_ (false), listeners_ (), lock_ () @@ -81,18 +111,20 @@ LiveEntry::status (LiveStatus l) i.next(ll); if (*ll != 0) { - (*ll)->status_changed (this->liveliness_); + (*ll)->status_changed (this->liveliness_, this->reping_available()); } } #else - ACE_DEBUG ((LM_DEBUG,"LiveEntry::status, listeners.size = %d\n", - listeners_.size())); + // ACE_DEBUG ((LM_DEBUG, + // ACE_TEXT ("(%P|%t) LiveEntry::status(%d), server = %s,") + // ACE_TEXT (" listeners.size = %d\n"), + // l, this->server_.c_str(), listeners_.size())); for (size_t i = 0; i < this->listeners_.size(); i++) { LiveListener *ll = this->listeners_[i]; if (ll != 0) { - (ll)->status_changed (this->liveliness_); + (ll)->status_changed (this->liveliness_, this->reping_available()); } } #endif @@ -127,7 +159,16 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa) this->next_check_ = now + owner_->ping_interval(); break; case LS_TRANSIENT: - this->next_check_ = now + ACE_Time_Value (0,5000); // retry delay + { + int ms = this->next_reping (); + if (ms != -1) + { + ACE_Time_Value next (ms / 1000, (ms % 1000) * 1000); + this->next_check_ = now + next; + } + else + return false; + } break; default:; } @@ -159,7 +200,7 @@ PingReceiver::~PingReceiver (void) void PingReceiver::ping (void) { - ACE_DEBUG ((LM_DEBUG,"ping received\n")); + // ACE_DEBUG ((LM_DEBUG,"ping received\n")); this->entry_->status (LS_ALIVE); PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); poa_->deactivate_object (oid.in()); @@ -258,9 +299,9 @@ void LiveCheck::add_server (const char *server, ImplementationRepository::ServerObject_ptr ref) { - ACE_CString s(server); + ACE_CString s (server); LiveEntry *entry = 0; - ACE_NEW (entry, LiveEntry(this,ref)); + ACE_NEW (entry, LiveEntry (this, server, ref)); int result = entry_map_.bind (s, entry); if (result != 0) { @@ -284,19 +325,30 @@ void LiveCheck::add_listener (LiveListener *l) { LiveEntry *entry = 0; - int result = entry_map_.find (l->server(), entry); + ACE_CString key (l->server()); + int result = entry_map_.find (key, entry); + if (result == 0 && entry != 0) { entry->add_listener (l); ACE_Time_Value now (ACE_OS::time()); ACE_Time_Value next = entry->next_check (); + if (next <= now) { + // ACE_DEBUG ((LM_DEBUG, + // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") + // ACE_TEXT ("immediate callback for <%C>\n"), + // l, l->server())); this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); } else { ACE_Time_Value delay = next - now; + // ACE_DEBUG ((LM_DEBUG, + // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") + // ACE_TEXT ("callback in %d ms for <%C>\n"), + // l, delay.sec() * 1000 + delay.usec() / 1000, l->server())); this->reactor()->schedule_timer (this, 0, delay); } diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h index 028dd61e430..f914662b3e8 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h @@ -67,13 +67,13 @@ class Locator_Export LiveListener /// called by the asynch ping receiver when a reply or an exception /// is received. - virtual void status_changed (LiveStatus status) = 0; + virtual void status_changed (LiveStatus status, bool may_retry) = 0; /// accessor for the server name. Used by the LiveCheck to associate a listener - const ACE_CString & server (void) const; + const char *server (void) const; private: - ACE_CString server_; + const char *server_; }; //--------------------------------------------------------------------------- @@ -90,7 +90,9 @@ class Locator_Export LiveListener class Locator_Export LiveEntry { public: - LiveEntry (LiveCheck *owner, ImplementationRepository::ServerObject_ptr ref); + LiveEntry (LiveCheck *owner, + const char *server, + ImplementationRepository::ServerObject_ptr ref); ~LiveEntry (void); void add_listener (LiveListener * ll); @@ -98,16 +100,24 @@ class Locator_Export LiveEntry void status (LiveStatus l); bool do_ping (PortableServer::POA_ptr poa); const ACE_Time_Value &next_check (void) const; + static void set_reping_limit (int max); + bool reping_available (void); + int next_reping (void); private: LiveCheck *owner_; + ACE_CString server_; ImplementationRepository::ServerObject_var ref_; LiveStatus liveliness_; ACE_Time_Value next_check_; short retry_count_; + int repings_; bool ping_away_; ACE_Vector<LiveListener *> listeners_; TAO_SYNCH_MUTEX lock_; + static const int reping_msec_ []; + static int reping_limit_; + }; //--------------------------------------------------------------------------- diff --git a/TAO/orbsvcs/ImplRepo_Service/Locator_Options.cpp b/TAO/orbsvcs/ImplRepo_Service/Locator_Options.cpp index 5cd9a7ab9e5..794b0240959 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Locator_Options.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/Locator_Options.cpp @@ -35,7 +35,6 @@ Options::Options () , service_command_ (SC_NONE) , unregister_if_address_reused_ (false) , imr_type_ (STANDALONE_IMR) -, use_asynch_ (true) { } @@ -236,11 +235,6 @@ Options::parse_args (int &argc, ACE_TCHAR *argv[]) this->ping_interval_ = ACE_Time_Value (0, 1000 * ACE_OS::atoi (shifter.get_current ())); } - else if (ACE_OS::strcasecmp (shifter.get_current (), - ACE_TEXT ("--old")) == 0) - { - this->use_asynch_ = false; - } else { shifter.ignore_arg (); @@ -373,10 +367,6 @@ Options::save_registry_options () (LPBYTE) &this->imr_type_ , sizeof (this->imr_type_)); ACE_ASSERT (err == ERROR_SUCCESS); - err = ACE_TEXT_RegSetValueEx (key, ACE_TEXT ("Asynch"), 0, REG_DWORD, - (LPBYTE) &this->use_asynch_ , sizeof (this->use_asynch_)); - ACE_ASSERT (err == ERROR_SUCCESS); - err = ::RegCloseKey (key); ACE_ASSERT (err == ERROR_SUCCESS); #endif @@ -496,14 +486,6 @@ Options::load_registry_options () ACE_ASSERT (type == REG_DWORD); } - sz = sizeof(use_asynch_); - err = ACE_TEXT_RegQueryValueEx (key, ACE_TEXT ("Asynch"), 0, &type, - (LPBYTE) &this->use_asynch_ , &sz); - if (err == ERROR_SUCCESS) - { - ACE_ASSERT (type == REG_DWORD); - } - err = ::RegCloseKey (key); ACE_ASSERT (err == ERROR_SUCCESS); #endif @@ -591,9 +573,3 @@ Options::imr_type (void) const { return this->imr_type_; } - -bool -Options::use_asynch (void) const -{ - return this->use_asynch_; -} diff --git a/TAO/orbsvcs/ImplRepo_Service/Locator_Options.h b/TAO/orbsvcs/ImplRepo_Service/Locator_Options.h index f7bac160d61..77d583751bf 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Locator_Options.h +++ b/TAO/orbsvcs/ImplRepo_Service/Locator_Options.h @@ -101,8 +101,6 @@ public: enum ImrType { BACKUP_IMR, PRIMARY_IMR, STANDALONE_IMR }; ImrType imr_type(void) const; - bool use_asynch (void) const; - private: /// Parses and pulls out arguments for the ImR int parse_args (int &argc, ACE_TCHAR *argv[]); @@ -159,9 +157,6 @@ private: /// The type of ImR Locator this is. ImrType imr_type_; - /// Is the implementation using synchronous or asynchronous handling? - bool use_asynch_; - }; #endif |