diff options
Diffstat (limited to 'TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp')
-rw-r--r-- | TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp | 152 |
1 files changed, 116 insertions, 36 deletions
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp index c9ca79c4368..f5087bfdffb 100644 --- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp +++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp @@ -57,7 +57,6 @@ LiveEntry::LiveEntry (LiveCheck *owner, next_check_ (ACE_OS::time()), retry_count_ (0), repings_ (0), - ping_away_ (false), listeners_ (), lock_ () { @@ -71,7 +70,8 @@ void LiveEntry::add_listener (LiveListener* ll) { ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); - this->listeners_.push_back (ll); + this->listeners_.insert (ll); + // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::add_listener\n", this->server_.c_str())); } LiveStatus @@ -92,25 +92,46 @@ LiveEntry::status (void) const void LiveEntry::status (LiveStatus l) { - ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); - this->liveliness_ = l; - this->ping_away_ = false; - + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->liveliness_ = l; + } if (l == LS_ALIVE) { this->retry_count_ = 0; ACE_Time_Value now (ACE_OS::time()); this->next_check_ = now + owner_->ping_interval(); } - for (size_t i = 0; i < this->listeners_.size(); i++) + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners, size = %d\n", + this->server_.c_str(), this->listeners_.size())); + + LiveStatus ls = this->liveliness_; + if (ls == LS_TRANSIENT && ! this->reping_available()) + ls = LS_LAST_TRANSIENT; + for (Listen_Set::ITERATOR i(this->listeners_); + !i.done(); + i.advance ()) { - LiveListener *ll = this->listeners_[i]; - if (ll != 0) + LiveListener **ll = 0; + i.next (ll); + if (*ll != 0) { - ll->status_changed (this->liveliness_, this->reping_available()); + if ((*ll)->status_changed (this->liveliness_)) + { + ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_); + this->listeners_.remove (*ll); + } } } - this->listeners_.clear(); + + if (this->listeners_.size() > 0) + { + this->owner_->schedule_ping (this); + } + + // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners done, size = %d\n", + // this->server_.c_str(), this->listeners_.size())); + } const ACE_Time_Value & @@ -123,19 +144,21 @@ bool LiveEntry::do_ping (PortableServer::POA_ptr poa) { ACE_Time_Value now (ACE_OS::time()); - if (this->ping_away_) + if (this->liveliness_ == LS_PING_AWAY) { - return true; + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, ping_away_ is true\n", + this->server_.c_str())); + return false; } if (this->next_check_ > now || this->liveliness_ == LS_DEAD) { - return false; + ACE_Time_Value diff = next_check_ - now; + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, too soon = %d, come back in %d ms, status = %d\n", + this->server_.c_str(), (this->next_check_ > now), diff.sec() * 1000 + (diff.usec() / 1000), liveliness_)); + return true; } - if (this->owner_->ping_interval() == ACE_Time_Value::zero) - return false; - switch (this->liveliness_) { case LS_UNKNOWN: @@ -154,21 +177,30 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa) this->next_check_ = now + next; } else - return false; + { + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, transient, but no next\n", + this->server_.c_str())); + return false; + } } break; default:; } - this->ping_away_ = true; - this->retry_count_++; + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, false); + this->liveliness_ = LS_PING_AWAY; + this->retry_count_++; + } + ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, sending ping, retry count = %d\n", + this->server_.c_str(), retry_count_)); PortableServer::ServantBase_var callback = new PingReceiver (this, poa); PortableServer::ObjectId_var oid = poa->activate_object (callback.in()); CORBA::Object_var obj = poa->id_to_reference (oid.in()); ImplementationRepository::AMI_ServerObjectHandler_var cb = ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in()); this->ref_->sendc_ping (cb.in()); - return true; + return false; } //--------------------------------------------------------------------------- @@ -187,6 +219,7 @@ PingReceiver::~PingReceiver (void) void PingReceiver::ping (void) { + ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping\n")); this->entry_->status (LS_ALIVE); PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); poa_->deactivate_object (oid.in()); @@ -197,6 +230,7 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder) { try { + ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping_excep\n")); excep_holder->raise_exception (); } catch (CORBA::TRANSIENT &ex) @@ -270,26 +304,66 @@ int LiveCheck::handle_timeout (const ACE_Time_Value &, const void *) { + ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout\n")); + bool want_reping = false; + ACE_Time_Value next; for (LiveEntryMap::iterator le (this->entry_map_); !le.done (); le.advance ()) { LiveEntryMap::value_type *pair = 0; le.next(pair); - pair->item()->do_ping (poa_.in()); + if (pair->item()->do_ping (poa_.in ())) + { + LiveStatus status = pair->item ()->status (); + if (status != LS_DEAD) + { + if (!want_reping || pair->item ()->next_check() < next) + { + want_reping = true; + next = pair->item ()->next_check(); + } + } + } } for (PerClientStack::ITERATOR pe (this->per_client_); !pe.done (); pe.advance ()) { - LiveEntry *entry = 0; - LiveEntry **n = &entry; - pe.next(n); - if (entry != 0 && !entry->do_ping (poa_.in())) + LiveEntry **entry = 0; + pe.next(entry); + if (*entry != 0) { - this->per_client_.remove (entry); + bool result = (*entry)->do_ping (poa_.in ()); + LiveStatus status = (*entry)->status (); + if (result) + { + if (status != LS_DEAD) + { + if (!want_reping || (*entry)->next_check() < next) + { + want_reping = true; + next = (*entry)->next_check(); + } + } + } + else + { + if (status != LS_PING_AWAY && status != LS_TRANSIENT) + { + this->per_client_.remove (*entry); + } + } } + } + if (want_reping) + { + ACE_Time_Value now (ACE_OS::time()); + ACE_Time_Value delay = next - now; + ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout schdeuling next - in %d ms \n", + delay.sec() * 1000 + (delay.usec()/1000))); + this->reactor()->schedule_timer (this, 0, delay); } return 0; @@ -354,32 +428,38 @@ LiveCheck::add_listener (LiveListener *l) } entry->add_listener (l); + this->schedule_ping (entry); + return true; +} + +void +LiveCheck::schedule_ping (LiveEntry *entry) +{ ACE_Time_Value now (ACE_OS::time()); ACE_Time_Value next = entry->next_check (); if (next <= now) { - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("immediate callback for <%C>\n"), - // l, l->server())); + ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - immediate \n")); this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero); } else { ACE_Time_Value delay = next - now; - // ACE_DEBUG ((LM_DEBUG, - // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ") - // ACE_TEXT ("callback in %d ms for <%C>\n"), - // l, delay.sec() * 1000 + delay.usec() / 1000, l->server())); + ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - in %dms \n", + delay.sec() * 1000 + (delay.usec()/1000))); this->reactor()->schedule_timer (this, 0, delay); } - return true; } LiveStatus LiveCheck::is_alive (const char *server) { + if (this->ping_interval_ == ACE_Time_Value::zero) + { + return LS_ALIVE; + } + ACE_CString s(server); LiveEntry *entry = 0; int result = entry_map_.find (s, entry); |