From 6ea1bdd1685be6ab58ce8a32c7a715050684663a Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Wed, 29 May 2013 21:42:08 +0000 Subject: Wed May 29 21:32:51 UTC 2013 Phil Mesnier --- TAO/ChangeLog | 25 ++++ TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp | 63 +++++--- TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h | 4 +- TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp | 2 +- TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 171 ++++++++++++++++------ TAO/orbsvcs/ImplRepo_Service/LiveCheck.h | 42 ++++++ 6 files changed, 245 insertions(+), 62 deletions(-) diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 216e6d3994f..cfe5e92c764 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,28 @@ +Wed May 29 21:32:51 UTC 2013 Phil Mesnier + + * orbsvcs/ImplRepo_Service/AsyncListManager.h: + * orbsvcs/ImplRepo_Service/AsyncListManager.cpp: + + Add in the "jacorb" prefix for JacORB based server identifiers. + Also add the testing of initial status to avoid waiting for non- + pingable servers. Add some higher verbosity level debug output. + + * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp: + + Minor code style fix. + + * orbsvcs/ImplRepo_Service/LiveCheck.h: + * orbsvcs/ImplRepo_Service/LiveCheck.cpp: + + Support the initial setting of of state when a listener is added. + + Prevent redundant timeout registrations by deferring a timer schedule + if one is currently being processed. This could happen if a ping + required a non-blocking connect, allowing a new request to be received. + Subsequent pings may still be requested, but only the next closest + time will be schheduled after a current handle timeout is completed. + + Wed May 29 19:27:35 UTC 2013 Johnny Willemsen * orbsvcs/tests/Trading/Offer_Exporter.cpp: diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp index ad4bf4347eb..3682be1f289 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp @@ -161,8 +161,15 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many) it.advance (); Server_Info_Ptr info = entry->int_id_; - - this->server_list_[i].server = info->name.c_str (); + if (info->jacorb_server) + { + ACE_CString jacorb_name (ACE_TEXT ("JACORB:") + info->name); + this->server_list_[i].server = jacorb_name.c_str (); + } + else + { + this->server_list_[i].server = info->name.c_str (); + } this->server_list_[i].startup.command_line = info->cmdline.c_str (); this->server_list_[i].startup.environment = info->env_vars; this->server_list_[i].startup.working_directory = info->dir.c_str (); @@ -187,7 +194,10 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many) } else { - this->waiters_++; + if (!evaluate_status (i,l->status())) + { + this->waiters_++; + } } } } @@ -197,7 +207,7 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) AsyncListManager::list_i, %d waiters") ACE_TEXT (" out of %d regsitered servers\n"), - this->waiters_, (this->pinger_ != 0))); + this->waiters_, len)); } if (this->waiters_ == 0) @@ -206,10 +216,10 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many) } } - -void -AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status) +bool +AsyncListManager::evaluate_status (CORBA::ULong index, LiveStatus status) { + bool is_final = true; switch (status) { case LS_ALIVE: @@ -226,16 +236,26 @@ AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status) ImplementationRepository::ACTIVE_NO; break; default: - if (ImR_Locator_i::debug() > 4) - { - ORBSVCS_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) AsyncListManager::ping_replied, index = %d ") - ACE_TEXT ("status = %d\n"))); - } + is_final = false; + } + return is_final; +} + +void +AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status) +{ + if (evaluate_status (index, status)) + { + this->waiters_--; + this->final_state(); return; } - this->waiters_--; - this->final_state(); + if (ImR_Locator_i::debug() > 4) + { + ORBSVCS_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncListManager::ping_replied, index = %d ") + ACE_TEXT ("status = %d\n"))); + } } AsyncListManager * @@ -272,7 +292,8 @@ ListLiveListener::ListLiveListener (const char *server, owner_ (owner->_add_ref ()), pinger_ (pinger), status_ (LS_UNKNOWN), - index_ (index) + index_ (index), + started_ (false) { } @@ -284,9 +305,16 @@ bool ListLiveListener::start (void) { bool rtn = this->pinger_.add_poll_listener (this); + this->started_ = true; return rtn; } +LiveStatus +ListLiveListener::status (void) +{ + return this->status_; +} + bool ListLiveListener::status_changed (LiveStatus status) { @@ -297,7 +325,8 @@ ListLiveListener::status_changed (LiveStatus status) } else { - this->owner_->ping_replied (this->index_, status); + if (this->started_) + this->owner_->ping_replied (this->index_, status); } return true; } diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h index 9aa5799b86e..b31ebe984ea 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h @@ -53,6 +53,7 @@ class AsyncListManager CORBA::ULong list (ImplementationRepository::AMH_ServerInformationIteratorResponseHandler_ptr _tao_rh, CORBA::ULong start, CORBA::ULong count); + bool evaluate_status (CORBA::ULong index, LiveStatus status); void ping_replied (CORBA::ULong index, LiveStatus status); AsyncListManager *_add_ref (void); @@ -90,7 +91,7 @@ class ListLiveListener : public LiveListener virtual ~ListLiveListener (void); bool start (void); - + LiveStatus status (void); bool status_changed (LiveStatus status); private: @@ -98,6 +99,7 @@ class ListLiveListener : public LiveListener LiveCheck &pinger_; LiveStatus status_; CORBA::ULong index_; + bool started_; }; diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp index bf24cc46296..0ce84d357d8 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp @@ -554,7 +554,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, ACE_NEW (aam_raw, AsyncAccessManager (*info, manual_start, *this)); aam = aam_raw; this->aam_set_.insert_tail (aam); - } + } else { aam = this->find_aam (info->name.c_str()); diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp index e93571dcb0d..cd226dd123c 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -149,13 +149,25 @@ LiveEntry::reset_status (void) this->repings_ = 0; this->next_check_ = ACE_High_Res_Timer::gettimeofday_hr(); } + if (ImR_Locator_i::debug () > 2) + { + ORBSVCS_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) LiveEntry::reset_status this = %x, ") + ACE_TEXT ("server = %C status = %s\n"), + this, this->server_.c_str(), + status_name (this->liveliness_))); + } + } LiveStatus LiveEntry::status (void) const { if (!this->may_ping_) - return LS_ALIVE; + { + return LS_ALIVE; + } + if ( this->liveliness_ == LS_ALIVE && this->owner_->ping_interval() != ACE_Time_Value::zero ) @@ -243,7 +255,7 @@ LiveEntry::validate_ping (bool &want_reping, ACE_Time_Value& next) this->liveliness_ == LS_DEAD || this->listeners_.size () == 0) { - if (ImR_Locator_i::debug () > 5) + if (ImR_Locator_i::debug () > 4) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) LiveEntry::validate_ping, status ") @@ -300,6 +312,14 @@ LiveEntry::validate_ping (bool &want_reping, ACE_Time_Value& next) } ACE_Time_Value next (ms / 1000, (ms % 1000) * 1000); this->next_check_ = now + next; + if (ImR_Locator_i::debug () > 4) + { + ORBSVCS_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) LiveEntry::validate_ping, ") + ACE_TEXT ("transient, reping in %d ms, ") + ACE_TEXT ("server %C\n"), + ms, this->server_.c_str())); + } } else { @@ -337,6 +357,10 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa) CORBA::Object_var obj = poa->id_to_reference (oid.in()); ImplementationRepository::AMI_ServerObjectHandler_var cb = ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in()); + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->liveliness_ = LS_PING_AWAY; + } try { this->ref_->sendc_ping (cb.in()); @@ -346,8 +370,6 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa) ACE_TEXT ("(%P|%t) LiveEntry::do_ping, ") ACE_TEXT ("sendc_ping returned OK\n"))); } - ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); - this->liveliness_ = LS_PING_AWAY; } catch (CORBA::Exception &ex) { @@ -422,10 +444,74 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder) //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- +LC_TimeoutGuard::LC_TimeoutGuard (LiveCheck *owner, int token) + :owner_ (owner), + token_ (token), + blocked_ (owner->handle_timeout_busy_ == 0) +{ + if (!blocked_) + { + --owner_->handle_timeout_busy_; + } +} + +LC_TimeoutGuard::~LC_TimeoutGuard (void) +{ + if (blocked_) + { + return; + } + + ++owner_->handle_timeout_busy_; + if (owner_->want_timeout_) + { + ACE_Time_Value delay = ACE_Time_Value::zero; + if (owner_->deferred_timeout_ != ACE_Time_Value::zero) + { + ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr()); + if (owner_->deferred_timeout_ > now) + delay = owner_->deferred_timeout_ - now; + } + ++owner_->token_; + if (ImR_Locator_i::debug () > 2) + { + ORBSVCS_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) LC_TimeoutGuard(%d)::dtor,") + ACE_TEXT ("scheduling new timeout(%d), delay = %d,%d\n"), + this->token_, owner_->token_, delay.sec(), delay.usec())); + } + owner_->reactor()->schedule_timer (owner_, + reinterpret_cast(owner_->token_), + delay); + owner_->want_timeout_ = false; + } + else + { + if (ImR_Locator_i::debug () > 3) + { + ORBSVCS_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) LC_TimeoutGuard(%d)::dtor,") + ACE_TEXT ("no pending timeouts requested\n"), + this->token_)); + } + } +} + +bool LC_TimeoutGuard::blocked (void) +{ + return this->blocked_; +} + +//--------------------------------------------------------------------------- +//--------------------------------------------------------------------------- + LiveCheck::LiveCheck () :ping_interval_(), running_ (false), - token_ (100) + token_ (100), + handle_timeout_busy_ (1), + want_timeout_ (false), + deferred_timeout_ (0,0) { } @@ -485,15 +571,17 @@ LiveCheck::handle_timeout (const ACE_Time_Value &, if (!this->running_) return -1; - bool want_reping = false; - ACE_Time_Value next; + LC_TimeoutGuard tg (this, static_cast(token)); + if (tg.blocked ()) + return 0; + LiveEntryMap::iterator le_end = this->entry_map_.end(); for (LiveEntryMap::iterator le = this->entry_map_.begin(); le != le_end; ++le) { LiveEntry *entry = le->item (); - if (entry->validate_ping (want_reping, next)) + if (entry->validate_ping (this->want_timeout_, this->deferred_timeout_)) { entry->do_ping (poa_.in ()); if (ImR_Locator_i::debug () > 2) @@ -524,7 +612,7 @@ LiveCheck::handle_timeout (const ACE_Time_Value &, LiveEntry *entry = *pe; if (entry != 0) { - if (entry->validate_ping (want_reping, next)) + if (entry->validate_ping (this->want_timeout_, this->deferred_timeout_)) { entry->do_ping (poa_.in ()); } @@ -536,22 +624,6 @@ LiveCheck::handle_timeout (const ACE_Time_Value &, } } - - if (want_reping) - { - ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr()); - ACE_Time_Value delay = next - now; - ++this->token_; - if (ImR_Locator_i::debug () > 2) - { - ORBSVCS_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) LiveCheck::handle_timeout(%d),") - ACE_TEXT (" want reping(%d), delay = %d,%d\n"), - token, this->token_, delay.sec(), delay.usec())); - } - this->reactor()->schedule_timer (this, reinterpret_cast(this->token_), delay); - } - return 0; } @@ -614,10 +686,18 @@ LiveCheck::add_per_client_listener (LiveListener *l, { entry->add_listener (l); - ++this->token_; - this->reactor()->schedule_timer (this, - reinterpret_cast(this->token_), - ACE_Time_Value::zero); + if (this->handle_timeout_busy_ > 0) + { + ++this->token_; + this->reactor()->schedule_timer (this, + reinterpret_cast(this->token_), + ACE_Time_Value::zero); + } + else + { + this->want_timeout_ = true; + this->deferred_timeout_ = ACE_Time_Value::zero; + } return true; } return false; @@ -639,6 +719,7 @@ LiveCheck::add_poll_listener (LiveListener *l) entry->add_listener (l); entry->reset_status (); + l->status_changed (entry->status()); return this->schedule_ping (entry); } @@ -674,34 +755,38 @@ LiveCheck::schedule_ping (LiveEntry *entry) ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr()); ACE_Time_Value next = entry->next_check (); - ++this->token_; - if (next <= now) + + if (this->handle_timeout_busy_ > 0) { + ACE_Time_Value delay = ACE_Time_Value::zero; + if (next > now) + { + delay = next - now; + } + ++this->token_; if (ImR_Locator_i::debug () > 2) { ORBSVCS_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) LiveCheck::Schdedule_ping(%d),") - ACE_TEXT (" immediate\n"), - this->token_)); + ACE_TEXT ("(%P|%t) LiveCheck::schedule_ping (%d),") + ACE_TEXT (" delay = %d,%d\n"), + this->token_, delay.sec(), delay.usec())); } - this->reactor()->schedule_timer (this, reinterpret_cast(this->token_), - ACE_Time_Value::zero); + delay); } else { - ACE_Time_Value delay = next - now; if (ImR_Locator_i::debug () > 2) { ORBSVCS_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) LiveCheck::Schdedule_ping(%d),") - ACE_TEXT (" delay = %d,%d\n"), - this->token_, delay.sec(), delay.usec())); + ACE_TEXT ("(%P|%t) LiveCheck::schedule_ping deferred"))); + } + if (!this->want_timeout_ || next < this->deferred_timeout_) + { + this->want_timeout_ = true; + this->deferred_timeout_ = next; } - this->reactor()->schedule_timer (this, - reinterpret_cast(this->token_), - delay); } return true; } diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h index 005ae3a3005..5d30e55cd5f 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h @@ -172,7 +172,10 @@ class Locator_Export PingReceiver : PingReceiver (LiveEntry * entry, PortableServer::POA_ptr poa); virtual ~PingReceiver (void); + /// Called when an anticipated ping reply is received void ping (void); + + /// Called when an anticipated ping raises an exception void ping_excep (Messaging::ExceptionHolder * excep_holder); private: @@ -180,6 +183,40 @@ class Locator_Export PingReceiver : LiveEntry * entry_; }; + +//--------------------------------------------------------------------------- +/* + * @class LC_TimeoutGuard + * + * @brief A helper object to avoid reentrancy in the handle_timout method + * + * The LiveCheck::handle_timeout may be called reentrantly on a single thread + * if the sending of a ping uses non-blocking connection establishment. If a + * connection must be established before the ping can be sent, that may involve + * waiting in the reactor, possibly handing other requests, and possibly even + * subsequent timeouts. + * */ + +class Locator_Export LC_TimeoutGuard +{ + public: + /// construct a new stack-based guard. This sets a flag in the owner that will + /// be cleared on destruction. + LC_TimeoutGuard (LiveCheck *owner, int token); + + /// releases the flag. If the LiveCheck received any requests for an immediate + /// or defered ping during this time, schedule it now. + ~LC_TimeoutGuard (void); + + /// Returns true if the busy flag in the owner was already set. + bool blocked (void); + + private: + LiveCheck *owner_; + int token_; + bool blocked_; +}; + //--------------------------------------------------------------------------- /* * @class LiveCheck @@ -194,6 +231,8 @@ class Locator_Export PingReceiver : class Locator_Export LiveCheck : public ACE_Event_Handler { public: + friend class LC_TimeoutGuard; + LiveCheck (); ~LiveCheck (void); @@ -241,6 +280,9 @@ class Locator_Export LiveCheck : public ACE_Event_Handler ACE_Time_Value ping_interval_; bool running_; int token_; + int handle_timeout_busy_; + bool want_timeout_; + ACE_Time_Value deferred_timeout_; }; #endif /* IMR_LIVECHECK_H_ */ -- cgit v1.2.1