summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp')
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp152
1 files changed, 116 insertions, 36 deletions
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
index c9ca79c4368..f5087bfdffb 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
@@ -57,7 +57,6 @@ LiveEntry::LiveEntry (LiveCheck *owner,
next_check_ (ACE_OS::time()),
retry_count_ (0),
repings_ (0),
- ping_away_ (false),
listeners_ (),
lock_ ()
{
@@ -71,7 +70,8 @@ void
LiveEntry::add_listener (LiveListener* ll)
{
ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
- this->listeners_.push_back (ll);
+ this->listeners_.insert (ll);
+ // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::add_listener\n", this->server_.c_str()));
}
LiveStatus
@@ -92,25 +92,46 @@ LiveEntry::status (void) const
void
LiveEntry::status (LiveStatus l)
{
- ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
- this->liveliness_ = l;
- this->ping_away_ = false;
-
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->liveliness_ = l;
+ }
if (l == LS_ALIVE)
{
this->retry_count_ = 0;
ACE_Time_Value now (ACE_OS::time());
this->next_check_ = now + owner_->ping_interval();
}
- for (size_t i = 0; i < this->listeners_.size(); i++)
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners, size = %d\n",
+ this->server_.c_str(), this->listeners_.size()));
+
+ LiveStatus ls = this->liveliness_;
+ if (ls == LS_TRANSIENT && ! this->reping_available())
+ ls = LS_LAST_TRANSIENT;
+ for (Listen_Set::ITERATOR i(this->listeners_);
+ !i.done();
+ i.advance ())
{
- LiveListener *ll = this->listeners_[i];
- if (ll != 0)
+ LiveListener **ll = 0;
+ i.next (ll);
+ if (*ll != 0)
{
- ll->status_changed (this->liveliness_, this->reping_available());
+ if ((*ll)->status_changed (this->liveliness_))
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->listeners_.remove (*ll);
+ }
}
}
- this->listeners_.clear();
+
+ if (this->listeners_.size() > 0)
+ {
+ this->owner_->schedule_ping (this);
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners done, size = %d\n",
+ // this->server_.c_str(), this->listeners_.size()));
+
}
const ACE_Time_Value &
@@ -123,19 +144,21 @@ bool
LiveEntry::do_ping (PortableServer::POA_ptr poa)
{
ACE_Time_Value now (ACE_OS::time());
- if (this->ping_away_)
+ if (this->liveliness_ == LS_PING_AWAY)
{
- return true;
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, ping_away_ is true\n",
+ this->server_.c_str()));
+ return false;
}
if (this->next_check_ > now || this->liveliness_ == LS_DEAD)
{
- return false;
+ ACE_Time_Value diff = next_check_ - now;
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, too soon = %d, come back in %d ms, status = %d\n",
+ this->server_.c_str(), (this->next_check_ > now), diff.sec() * 1000 + (diff.usec() / 1000), liveliness_));
+ return true;
}
- if (this->owner_->ping_interval() == ACE_Time_Value::zero)
- return false;
-
switch (this->liveliness_)
{
case LS_UNKNOWN:
@@ -154,21 +177,30 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa)
this->next_check_ = now + next;
}
else
- return false;
+ {
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, transient, but no next\n",
+ this->server_.c_str()));
+ return false;
+ }
}
break;
default:;
}
- this->ping_away_ = true;
- this->retry_count_++;
+ {
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, false);
+ this->liveliness_ = LS_PING_AWAY;
+ this->retry_count_++;
+ }
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, sending ping, retry count = %d\n",
+ this->server_.c_str(), retry_count_));
PortableServer::ServantBase_var callback = new PingReceiver (this, poa);
PortableServer::ObjectId_var oid = poa->activate_object (callback.in());
CORBA::Object_var obj = poa->id_to_reference (oid.in());
ImplementationRepository::AMI_ServerObjectHandler_var cb =
ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in());
this->ref_->sendc_ping (cb.in());
- return true;
+ return false;
}
//---------------------------------------------------------------------------
@@ -187,6 +219,7 @@ PingReceiver::~PingReceiver (void)
void
PingReceiver::ping (void)
{
+ ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping\n"));
this->entry_->status (LS_ALIVE);
PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
poa_->deactivate_object (oid.in());
@@ -197,6 +230,7 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder)
{
try
{
+ ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping_excep\n"));
excep_holder->raise_exception ();
}
catch (CORBA::TRANSIENT &ex)
@@ -270,26 +304,66 @@ int
LiveCheck::handle_timeout (const ACE_Time_Value &,
const void *)
{
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout\n"));
+ bool want_reping = false;
+ ACE_Time_Value next;
for (LiveEntryMap::iterator le (this->entry_map_);
!le.done ();
le.advance ())
{
LiveEntryMap::value_type *pair = 0;
le.next(pair);
- pair->item()->do_ping (poa_.in());
+ if (pair->item()->do_ping (poa_.in ()))
+ {
+ LiveStatus status = pair->item ()->status ();
+ if (status != LS_DEAD)
+ {
+ if (!want_reping || pair->item ()->next_check() < next)
+ {
+ want_reping = true;
+ next = pair->item ()->next_check();
+ }
+ }
+ }
}
for (PerClientStack::ITERATOR pe (this->per_client_);
!pe.done ();
pe.advance ())
{
- LiveEntry *entry = 0;
- LiveEntry **n = &entry;
- pe.next(n);
- if (entry != 0 && !entry->do_ping (poa_.in()))
+ LiveEntry **entry = 0;
+ pe.next(entry);
+ if (*entry != 0)
{
- this->per_client_.remove (entry);
+ bool result = (*entry)->do_ping (poa_.in ());
+ LiveStatus status = (*entry)->status ();
+ if (result)
+ {
+ if (status != LS_DEAD)
+ {
+ if (!want_reping || (*entry)->next_check() < next)
+ {
+ want_reping = true;
+ next = (*entry)->next_check();
+ }
+ }
+ }
+ else
+ {
+ if (status != LS_PING_AWAY && status != LS_TRANSIENT)
+ {
+ this->per_client_.remove (*entry);
+ }
+ }
}
+ }
+ if (want_reping)
+ {
+ ACE_Time_Value now (ACE_OS::time());
+ ACE_Time_Value delay = next - now;
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout schdeuling next - in %d ms \n",
+ delay.sec() * 1000 + (delay.usec()/1000)));
+ this->reactor()->schedule_timer (this, 0, delay);
}
return 0;
@@ -354,32 +428,38 @@ LiveCheck::add_listener (LiveListener *l)
}
entry->add_listener (l);
+ this->schedule_ping (entry);
+ return true;
+}
+
+void
+LiveCheck::schedule_ping (LiveEntry *entry)
+{
ACE_Time_Value now (ACE_OS::time());
ACE_Time_Value next = entry->next_check ();
if (next <= now)
{
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("immediate callback for <%C>\n"),
- // l, l->server()));
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - immediate \n"));
this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
}
else
{
ACE_Time_Value delay = next - now;
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("callback in %d ms for <%C>\n"),
- // l, delay.sec() * 1000 + delay.usec() / 1000, l->server()));
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - in %dms \n",
+ delay.sec() * 1000 + (delay.usec()/1000)));
this->reactor()->schedule_timer (this, 0, delay);
}
- return true;
}
LiveStatus
LiveCheck::is_alive (const char *server)
{
+ if (this->ping_interval_ == ACE_Time_Value::zero)
+ {
+ return LS_ALIVE;
+ }
+
ACE_CString s(server);
LiveEntry *entry = 0;
int result = entry_map_.find (s, entry);