From e3af852bff87bfa0fd80671ad34aca3ec007e08a Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Fri, 29 Mar 2013 22:54:18 +0000 Subject: Fri Mar 29 22:52:42 UTC 2013 Phil Mesnier --- TAO/ChangeLog_Asynch_ImR | 16 +++ .../ImplRepo_Service/AsyncAccessManager.cpp | 61 ++++++--- TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h | 4 +- TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp | 4 + TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp | 21 +-- TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h | 6 +- TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 152 ++++++++++++++++----- TAO/orbsvcs/ImplRepo_Service/LiveCheck.h | 13 +- TAO/utils/logWalker/Log.cpp | 2 +- 9 files changed, 194 insertions(+), 85 deletions(-) diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR index 9f71fe57bea..e82acf490c3 100644 --- a/TAO/ChangeLog_Asynch_ImR +++ b/TAO/ChangeLog_Asynch_ImR @@ -1,3 +1,19 @@ +Fri Mar 29 22:52:42 UTC 2013 Phil Mesnier + + * orbsvcs/ImplRepo_Service/AsyncAccessManager.h: + * orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp: + * orbsvcs/ImplRepo_Service/Forwarder.cpp: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.h: + * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp: + * orbsvcs/ImplRepo_Service/LiveCheck.h: + * orbsvcs/ImplRepo_Service/LiveCheck.cpp: + + Improve the ping retry state machine, add more diagonstics. + + * utils/logWalker/Log.cpp: + + Fix a memory managment error. + Fri Mar 29 00:23:44 UTC 2013 Phil Mesnier * orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc: diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp index c2042ae97e5..63713bd5204 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp @@ -82,6 +82,11 @@ AsyncAccessManager::add_interest (ImR_ResponseHandler *rh) this->status (AAM_WAIT_FOR_PING); } } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) AsyncAccessManager::add_interest: ") + ACE_TEXT ("server = <%C>, status = %d returning\n"), + this->info_->name.c_str(), this->status_)); } void @@ -91,8 +96,8 @@ AsyncAccessManager::final_state (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) AsyncAccessManager::final_state: ") - ACE_TEXT ("status = %d, pior = <%C>\n"), - this->status_, this->info_->partial_ior.c_str())); + ACE_TEXT ("status = %d, server = <%C> list size = %d\n"), + this->status_, this->info_->name.c_str(), rh_list_.size())); } for (size_t i = 0; i < this->rh_list_.size(); i++) @@ -206,22 +211,32 @@ AsyncAccessManager::server_is_running (const char *partial_ior) } void -AsyncAccessManager::ping_replied (bool is_alive) +AsyncAccessManager::ping_replied (LiveStatus server) { - if (is_alive) + switch (server) { + case LS_ALIVE: + case LS_LAST_TRANSIENT: + case LS_TIMEDOUT: this->status (AAM_SERVER_READY); - } - else if (this->status_ == AAM_WAIT_FOR_PING) - { - if (this->send_start_request ()) - { - return; - } - } - else - { - this->status (AAM_SERVER_DEAD); + break; + case LS_DEAD: + { + if (this->status_ == AAM_WAIT_FOR_PING) + { + if (this->send_start_request ()) + { + return; + } + } + else + { + this->status (AAM_SERVER_DEAD); + } + } + break; + default: + return; } this->final_state(); } @@ -353,30 +368,36 @@ bool AsyncLiveListener::start (void) { bool rtn = this->pinger_.add_listener (this); + ACE_DEBUG ((LM_DEBUG, + "AsyncLiveListener::start, add_listener returned %d\n", rtn)); if (!rtn) delete this; return rtn; } -void -AsyncLiveListener::status_changed (LiveStatus status, bool may_retry) +bool +AsyncLiveListener::status_changed (LiveStatus status) { this->status_ = status; - if (status == LS_TRANSIENT && may_retry) + if (status == LS_TRANSIENT) { if (!this->pinger_.add_listener (this)) { ACE_DEBUG ((LM_DEBUG, "AsyncLiveListener::status_changed, deleting(1)\n")); - this->aam_.ping_replied (false); + this->aam_.ping_replied (status); delete this; + return true; } + return false; } else { ACE_DEBUG ((LM_DEBUG, "AsyncLiveListener::status_changed, status = %d, deleting(2)\n", status)); - this->aam_.ping_replied (status != LS_DEAD); + this->aam_.ping_replied (status); delete this; + return true; } + return true; } diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h index 4653d2159f1..20c1861af73 100644 --- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h +++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h @@ -78,7 +78,7 @@ class AsyncAccessManager void activator_replied (bool success); void server_is_running (const char *partial_ior); - void ping_replied (bool is_alive); + void ping_replied (LiveStatus server); void add_ref (void); void remove_ref (void); @@ -142,7 +142,7 @@ class AsyncLiveListener : public LiveListener virtual ~AsyncLiveListener (void); bool start (void); - void status_changed (LiveStatus status, bool may_retry); + bool status_changed (LiveStatus status); private: AsyncAccessManager &aam_; diff --git a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp index da0db01ddba..e65a268da82 100644 --- a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp @@ -244,6 +244,10 @@ ImR_DSI_ResponseHandler::send_ior (const char *pior) void ImR_DSI_ResponseHandler::send_exception (CORBA::Exception *) { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ImR_DSI_ResponseHandler::send_exception() ") + ACE_TEXT ("on <%s>\n"), + this->server_name_.in())); CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code ( TAO_IMPLREPO_MINOR_CODE, 0), CORBA::COMPLETED_NO); diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp index 5e863f16afc..3bad6c22b3b 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp @@ -1280,14 +1280,6 @@ ImR_Locator_i::remove_aam (AsyncAccessManager *aam) AsyncAccessManager * ImR_Locator_i::find_aam (const char *name) { - - if (debug_ > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) find_aam: name = %s, aam_set_.size = %d\n"), - name, aam_set_.size())); - } - for (AAM_Set::ITERATOR i(this->aam_set_); !i.done(); i.advance ()) @@ -1299,12 +1291,6 @@ ImR_Locator_i::find_aam (const char *name) return (*entry); } } - if (debug_ > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) find_aam: did not find\n"))); - } - return 0; } @@ -1350,12 +1336,13 @@ SyncListener::is_alive (void) return this->status_ != LS_DEAD; } -void -SyncListener::status_changed (LiveStatus status, bool may_retry) +bool +SyncListener::status_changed (LiveStatus status) { this->callback_ = true; this->status_ = status; - this->got_it_ = (status != LS_TRANSIENT) || (! may_retry); + this->got_it_ = (status != LS_TRANSIENT); + return true; } //--------------------------------------------------------------------------- diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h index 119684c5083..53ae2e8fb69 100644 --- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h +++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h @@ -150,10 +150,6 @@ private: bool is_alive(UpdateableServerInfo& info); -#if 0 - int is_alive_i(UpdateableServerInfo& info); -#endif - void unregister_activator_i(const char* activator); void connect_activator (Activator_Info& info); @@ -215,7 +211,7 @@ class SyncListener : public LiveListener bool is_alive (void); - void status_changed (LiveStatus status, bool may_retry); + bool status_changed (LiveStatus status); private: CORBA::ORB_var orb_; diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp index c9ca79c4368..f5087bfdffb 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -57,7 +57,6 @@ LiveEntry::LiveEntry (LiveCheck *owner, next_check_ (ACE_OS::time()), retry_count_ (0), repings_ (0), - ping_away_ (false), listeners_ (), lock_ () { @@ -71,7 +70,8 @@ void LiveEntry::add_listener (LiveListener* ll) { ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); - this->listeners_.push_back (ll); + this->listeners_.insert (ll); + // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::add_listener\n", this->server_.c_str())); } LiveStatus @@ -92,25 +92,46 @@ LiveEntry::status (void) const void LiveEntry::status (LiveStatus l) { - ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); - this->liveliness_ = l; - this->ping_away_ = false; - + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->liveliness_ = l; + } if (l == LS_ALIVE) { this->retry_count_ = 0; ACE_Time_Value now (ACE_OS::time()); this->next_check_ = now + owner_->ping_interval(); } - for (size_t i = 0; i < this->listeners_.size(); i++) + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners, size = %d\n", + this->server_.c_str(), this->listeners_.size())); + + LiveStatus ls = this->liveliness_; + if (ls == LS_TRANSIENT && ! this->reping_available()) + ls = LS_LAST_TRANSIENT; + for (Listen_Set::ITERATOR i(this->listeners_); + !i.done(); + i.advance ()) { - LiveListener *ll = this->listeners_[i]; - if (ll != 0) + LiveListener **ll = 0; + i.next (ll); + if (*ll != 0) { - ll->status_changed (this->liveliness_, this->reping_available()); + if ((*ll)->status_changed (this->liveliness_)) + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->listeners_.remove (*ll); + } } } - this->listeners_.clear(); + + if (this->listeners_.size() > 0) + { + this->owner_->schedule_ping (this); + } + + // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners done, size = %d\n", + // this->server_.c_str(), this->listeners_.size())); + } const ACE_Time_Value & @@ -123,19 +144,21 @@ bool LiveEntry::do_ping (PortableServer::POA_ptr poa) { ACE_Time_Value now (ACE_OS::time()); - if (this->ping_away_) + if (this->liveliness_ == LS_PING_AWAY) { - return true; + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, ping_away_ is true\n", + this->server_.c_str())); + return false; } if (this->next_check_ > now || this->liveliness_ == LS_DEAD) { - return false; + ACE_Time_Value diff = next_check_ - now; + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, too soon = %d, come back in %d ms, status = %d\n", + this->server_.c_str(), (this->next_check_ > now), diff.sec() * 1000 + (diff.usec() / 1000), liveliness_)); + return true; } - if (this->owner_->ping_interval() == ACE_Time_Value::zero) - return false; - switch (this->liveliness_) { case LS_UNKNOWN: @@ -154,21 +177,30 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa) this->next_check_ = now + next; } else - return false; + { + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, transient, but no next\n", + this->server_.c_str())); + return false; + } } break; default:; } - this->ping_away_ = true; - this->retry_count_++; + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, false); + this->liveliness_ = LS_PING_AWAY; + this->retry_count_++; + } + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, sending ping, retry count = %d\n", + this->server_.c_str(), retry_count_)); PortableServer::ServantBase_var callback = new PingReceiver (this, poa); PortableServer::ObjectId_var oid = poa->activate_object (callback.in()); CORBA::Object_var obj = poa->id_to_reference (oid.in()); ImplementationRepository::AMI_ServerObjectHandler_var cb = ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in()); this->ref_->sendc_ping (cb.in()); - return true; + return false; } //--------------------------------------------------------------------------- @@ -187,6 +219,7 @@ PingReceiver::~PingReceiver (void) void PingReceiver::ping (void) { + ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping\n")); this->entry_->status (LS_ALIVE); PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); poa_->deactivate_object (oid.in()); @@ -197,6 +230,7 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder) { try { + ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping_excep\n")); excep_holder->raise_exception (); } catch (CORBA::TRANSIENT &ex) @@ -270,26 +304,66 @@ int LiveCheck::handle_timeout (const ACE_Time_Value &, const void *) { + ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout\n")); + bool want_reping = false; + ACE_Time_Value next; for (LiveEntryMap::iterator le (this->entry_map_); !le.done (); le.advance ()) { LiveEntryMap::value_type *pair = 0; le.next(pair); - pair->item()->do_ping (poa_.in()); + if (pair->item()->do_ping (poa_.in ())) + { + LiveStatus status = pair->item ()->status (); + if (status != LS_DEAD) + { + if (!want_reping || pair->item ()->next_check() < next) + { + want_reping = true; + next = pair->item ()->next_check(); + } + } + } } for (PerClientStack::ITERATOR pe (this->per_client_); !pe.done (); pe.advance ()) { - LiveEntry *entry = 0; - LiveEntry **n = &entry; - pe.next(n); - if (entry != 0 && !entry->do_ping (poa_.in())) + LiveEntry **entry = 0; + pe.next(entry); + if (*entry != 0) { - this->per_client_.remove (entry); + bool result = (*entry)->do_ping (poa_.in ()); + LiveStatus status = (*entry)->status (); + if (result) + { + if (status != LS_DEAD) + { + if (!want_reping || (*entry)->next_check() < next) + { + want_reping = true; + next = (*entry)->next_check(); + } + } + } + else + { + if (status != LS_PING_AWAY && status != LS_TRANSIENT) + { + this->per_client_.remove (*entry); + } + } } + } + if (want_reping) + { + ACE_Time_Value now (ACE_OS::time()); + ACE_Time_Value delay = next - now; + ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout schdeuling next - in %d ms \n", + delay.sec() * 1000 + (delay.usec()/1000))); + this->reactor()->schedule_timer (this, 0, delay); } return 0; @@ -354,32 +428,38 @@ LiveCheck::add_listener (LiveListener *l) } entry->add_listener (l); + this->schedule_ping (entry); + return true; +} + +void +LiveCheck::schedule_ping (LiveEntry *entry) +{ ACE_Time_Value now (ACE_OS::time()); ACE_Time_Value next = entry->next_check (); if (next <= now) { - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("immediate callback for <%C>\n"), - // l, l->server())); + ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - immediate \n")); this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); } else { ACE_Time_Value delay = next - now; - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("callback in %d ms for <%C>\n"), - // l, delay.sec() * 1000 + delay.usec() / 1000, l->server())); + ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - in %dms \n", + delay.sec() * 1000 + (delay.usec()/1000))); this->reactor()->schedule_timer (this, 0, delay); } - return true; } LiveStatus LiveCheck::is_alive (const char *server) { + if (this->ping_interval_ == ACE_Time_Value::zero) + { + return LS_ALIVE; + } + ACE_CString s(server); LiveEntry *entry = 0; int result = entry_map_.find (s, entry); diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h index 4778cc5451c..64ccad0e7fa 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h @@ -39,9 +39,11 @@ class LiveCheck; */ enum LiveStatus { LS_UNKNOWN, + LS_PING_AWAY, LS_DEAD, LS_ALIVE, LS_TRANSIENT, + LS_LAST_TRANSIENT, LS_TIMEDOUT }; @@ -66,8 +68,8 @@ class Locator_Export LiveListener LiveListener (const char *server); /// called by the asynch ping receiver when a reply or an exception - /// is received. - virtual void status_changed (LiveStatus status, bool may_retry) = 0; + /// is received. Returns true if finished listening + virtual bool status_changed (LiveStatus status) = 0; /// accessor for the server name. Used by the LiveCheck to associate a listener const char *server (void) const; @@ -112,8 +114,9 @@ class Locator_Export LiveEntry ACE_Time_Value next_check_; short retry_count_; int repings_; - bool ping_away_; - ACE_Vector listeners_; + + typedef ACE_Unbounded_Set Listen_Set; + Listen_Set listeners_; TAO_SYNCH_MUTEX lock_; static const int reping_msec_ []; static int reping_limit_; @@ -178,6 +181,8 @@ class Locator_Export LiveCheck : public ACE_Event_Handler bool add_per_client_listener (LiveListener *listener, ImplementationRepository::ServerObject_ptr ref); + void schedule_ping (LiveEntry *entry); + LiveStatus is_alive (const char *server); const ACE_Time_Value &ping_interval (void) const; diff --git a/TAO/utils/logWalker/Log.cpp b/TAO/utils/logWalker/Log.cpp index bc43e48bb98..f01a59df9ef 100644 --- a/TAO/utils/logWalker/Log.cpp +++ b/TAO/utils/logWalker/Log.cpp @@ -552,7 +552,7 @@ Log::parse_complete_connection_i (void) { c_iter.remove(); // ACE_DEBUG ((LM_DEBUG,"%d: complete_connection: purging waiter\n",this->offset_)); - delete waiter; + // delete waiter; break; } } -- cgit v1.2.1