summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Mesnier <mesnier_p@ociweb.com>2013-03-29 22:54:18 +0000
committerPhil Mesnier <mesnier_p@ociweb.com>2013-03-29 22:54:18 +0000
commite3af852bff87bfa0fd80671ad34aca3ec007e08a (patch)
tree791bfa86b8b61c1cdf719eb045fddc0fdec6b446
parent00ce9adcb8ec0eaedf6b68a9a3ca16d2a63fab9e (diff)
downloadATCD-e3af852bff87bfa0fd80671ad34aca3ec007e08a.tar.gz
Fri Mar 29 22:52:42 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
-rw-r--r--TAO/ChangeLog_Asynch_ImR16
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp61
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h4
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp4
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp21
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h6
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp152
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.h13
-rw-r--r--TAO/utils/logWalker/Log.cpp2
9 files changed, 194 insertions, 85 deletions
diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR
index 9f71fe57bea..e82acf490c3 100644
--- a/TAO/ChangeLog_Asynch_ImR
+++ b/TAO/ChangeLog_Asynch_ImR
@@ -1,3 +1,19 @@
+Fri Mar 29 22:52:42 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
+
+ * orbsvcs/ImplRepo_Service/AsyncAccessManager.h:
+ * orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp:
+ * orbsvcs/ImplRepo_Service/Forwarder.cpp:
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.h:
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp:
+ * orbsvcs/ImplRepo_Service/LiveCheck.h:
+ * orbsvcs/ImplRepo_Service/LiveCheck.cpp:
+
+ Improve the ping retry state machine, add more diagonstics.
+
+ * utils/logWalker/Log.cpp:
+
+ Fix a memory managment error.
+
Fri Mar 29 00:23:44 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
* orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc:
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
index c2042ae97e5..63713bd5204 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
@@ -82,6 +82,11 @@ AsyncAccessManager::add_interest (ImR_ResponseHandler *rh)
this->status (AAM_WAIT_FOR_PING);
}
}
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::add_interest: ")
+ ACE_TEXT ("server = <%C>, status = %d returning\n"),
+ this->info_->name.c_str(), this->status_));
}
void
@@ -91,8 +96,8 @@ AsyncAccessManager::final_state (void)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) AsyncAccessManager::final_state: ")
- ACE_TEXT ("status = %d, pior = <%C>\n"),
- this->status_, this->info_->partial_ior.c_str()));
+ ACE_TEXT ("status = %d, server = <%C> list size = %d\n"),
+ this->status_, this->info_->name.c_str(), rh_list_.size()));
}
for (size_t i = 0; i < this->rh_list_.size(); i++)
@@ -206,22 +211,32 @@ AsyncAccessManager::server_is_running (const char *partial_ior)
}
void
-AsyncAccessManager::ping_replied (bool is_alive)
+AsyncAccessManager::ping_replied (LiveStatus server)
{
- if (is_alive)
+ switch (server)
{
+ case LS_ALIVE:
+ case LS_LAST_TRANSIENT:
+ case LS_TIMEDOUT:
this->status (AAM_SERVER_READY);
- }
- else if (this->status_ == AAM_WAIT_FOR_PING)
- {
- if (this->send_start_request ())
- {
- return;
- }
- }
- else
- {
- this->status (AAM_SERVER_DEAD);
+ break;
+ case LS_DEAD:
+ {
+ if (this->status_ == AAM_WAIT_FOR_PING)
+ {
+ if (this->send_start_request ())
+ {
+ return;
+ }
+ }
+ else
+ {
+ this->status (AAM_SERVER_DEAD);
+ }
+ }
+ break;
+ default:
+ return;
}
this->final_state();
}
@@ -353,30 +368,36 @@ bool
AsyncLiveListener::start (void)
{
bool rtn = this->pinger_.add_listener (this);
+ ACE_DEBUG ((LM_DEBUG,
+ "AsyncLiveListener::start, add_listener returned %d\n", rtn));
if (!rtn)
delete this;
return rtn;
}
-void
-AsyncLiveListener::status_changed (LiveStatus status, bool may_retry)
+bool
+AsyncLiveListener::status_changed (LiveStatus status)
{
this->status_ = status;
- if (status == LS_TRANSIENT && may_retry)
+ if (status == LS_TRANSIENT)
{
if (!this->pinger_.add_listener (this))
{
ACE_DEBUG ((LM_DEBUG,
"AsyncLiveListener::status_changed, deleting(1)\n"));
- this->aam_.ping_replied (false);
+ this->aam_.ping_replied (status);
delete this;
+ return true;
}
+ return false;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"AsyncLiveListener::status_changed, status = %d, deleting(2)\n", status));
- this->aam_.ping_replied (status != LS_DEAD);
+ this->aam_.ping_replied (status);
delete this;
+ return true;
}
+ return true;
}
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
index 4653d2159f1..20c1861af73 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
@@ -78,7 +78,7 @@ class AsyncAccessManager
void activator_replied (bool success);
void server_is_running (const char *partial_ior);
- void ping_replied (bool is_alive);
+ void ping_replied (LiveStatus server);
void add_ref (void);
void remove_ref (void);
@@ -142,7 +142,7 @@ class AsyncLiveListener : public LiveListener
virtual ~AsyncLiveListener (void);
bool start (void);
- void status_changed (LiveStatus status, bool may_retry);
+ bool status_changed (LiveStatus status);
private:
AsyncAccessManager &aam_;
diff --git a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp
index da0db01ddba..e65a268da82 100644
--- a/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/Forwarder.cpp
@@ -244,6 +244,10 @@ ImR_DSI_ResponseHandler::send_ior (const char *pior)
void
ImR_DSI_ResponseHandler::send_exception (CORBA::Exception *)
{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("ImR_DSI_ResponseHandler::send_exception() ")
+ ACE_TEXT ("on <%s>\n"),
+ this->server_name_.in()));
CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code
( TAO_IMPLREPO_MINOR_CODE, 0),
CORBA::COMPLETED_NO);
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
index 5e863f16afc..3bad6c22b3b 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
@@ -1280,14 +1280,6 @@ ImR_Locator_i::remove_aam (AsyncAccessManager *aam)
AsyncAccessManager *
ImR_Locator_i::find_aam (const char *name)
{
-
- if (debug_ > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) find_aam: name = %s, aam_set_.size = %d\n"),
- name, aam_set_.size()));
- }
-
for (AAM_Set::ITERATOR i(this->aam_set_);
!i.done();
i.advance ())
@@ -1299,12 +1291,6 @@ ImR_Locator_i::find_aam (const char *name)
return (*entry);
}
}
- if (debug_ > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) find_aam: did not find\n")));
- }
-
return 0;
}
@@ -1350,12 +1336,13 @@ SyncListener::is_alive (void)
return this->status_ != LS_DEAD;
}
-void
-SyncListener::status_changed (LiveStatus status, bool may_retry)
+bool
+SyncListener::status_changed (LiveStatus status)
{
this->callback_ = true;
this->status_ = status;
- this->got_it_ = (status != LS_TRANSIENT) || (! may_retry);
+ this->got_it_ = (status != LS_TRANSIENT);
+ return true;
}
//---------------------------------------------------------------------------
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
index 119684c5083..53ae2e8fb69 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
@@ -150,10 +150,6 @@ private:
bool is_alive(UpdateableServerInfo& info);
-#if 0
- int is_alive_i(UpdateableServerInfo& info);
-#endif
-
void unregister_activator_i(const char* activator);
void connect_activator (Activator_Info& info);
@@ -215,7 +211,7 @@ class SyncListener : public LiveListener
bool is_alive (void);
- void status_changed (LiveStatus status, bool may_retry);
+ bool status_changed (LiveStatus status);
private:
CORBA::ORB_var orb_;
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
index c9ca79c4368..f5087bfdffb 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
@@ -57,7 +57,6 @@ LiveEntry::LiveEntry (LiveCheck *owner,
next_check_ (ACE_OS::time()),
retry_count_ (0),
repings_ (0),
- ping_away_ (false),
listeners_ (),
lock_ ()
{
@@ -71,7 +70,8 @@ void
LiveEntry::add_listener (LiveListener* ll)
{
ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
- this->listeners_.push_back (ll);
+ this->listeners_.insert (ll);
+ // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::add_listener\n", this->server_.c_str()));
}
LiveStatus
@@ -92,25 +92,46 @@ LiveEntry::status (void) const
void
LiveEntry::status (LiveStatus l)
{
- ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
- this->liveliness_ = l;
- this->ping_away_ = false;
-
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->liveliness_ = l;
+ }
if (l == LS_ALIVE)
{
this->retry_count_ = 0;
ACE_Time_Value now (ACE_OS::time());
this->next_check_ = now + owner_->ping_interval();
}
- for (size_t i = 0; i < this->listeners_.size(); i++)
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners, size = %d\n",
+ this->server_.c_str(), this->listeners_.size()));
+
+ LiveStatus ls = this->liveliness_;
+ if (ls == LS_TRANSIENT && ! this->reping_available())
+ ls = LS_LAST_TRANSIENT;
+ for (Listen_Set::ITERATOR i(this->listeners_);
+ !i.done();
+ i.advance ())
{
- LiveListener *ll = this->listeners_[i];
- if (ll != 0)
+ LiveListener **ll = 0;
+ i.next (ll);
+ if (*ll != 0)
{
- ll->status_changed (this->liveliness_, this->reping_available());
+ if ((*ll)->status_changed (this->liveliness_))
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->listeners_.remove (*ll);
+ }
}
}
- this->listeners_.clear();
+
+ if (this->listeners_.size() > 0)
+ {
+ this->owner_->schedule_ping (this);
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::status updating listeners done, size = %d\n",
+ // this->server_.c_str(), this->listeners_.size()));
+
}
const ACE_Time_Value &
@@ -123,19 +144,21 @@ bool
LiveEntry::do_ping (PortableServer::POA_ptr poa)
{
ACE_Time_Value now (ACE_OS::time());
- if (this->ping_away_)
+ if (this->liveliness_ == LS_PING_AWAY)
{
- return true;
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, ping_away_ is true\n",
+ this->server_.c_str()));
+ return false;
}
if (this->next_check_ > now || this->liveliness_ == LS_DEAD)
{
- return false;
+ ACE_Time_Value diff = next_check_ - now;
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, too soon = %d, come back in %d ms, status = %d\n",
+ this->server_.c_str(), (this->next_check_ > now), diff.sec() * 1000 + (diff.usec() / 1000), liveliness_));
+ return true;
}
- if (this->owner_->ping_interval() == ACE_Time_Value::zero)
- return false;
-
switch (this->liveliness_)
{
case LS_UNKNOWN:
@@ -154,21 +177,30 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa)
this->next_check_ = now + next;
}
else
- return false;
+ {
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, transient, but no next\n",
+ this->server_.c_str()));
+ return false;
+ }
}
break;
default:;
}
- this->ping_away_ = true;
- this->retry_count_++;
+ {
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, false);
+ this->liveliness_ = LS_PING_AWAY;
+ this->retry_count_++;
+ }
+ ACE_DEBUG ((LM_DEBUG, "LiveEntry(%s)::do_ping, sending ping, retry count = %d\n",
+ this->server_.c_str(), retry_count_));
PortableServer::ServantBase_var callback = new PingReceiver (this, poa);
PortableServer::ObjectId_var oid = poa->activate_object (callback.in());
CORBA::Object_var obj = poa->id_to_reference (oid.in());
ImplementationRepository::AMI_ServerObjectHandler_var cb =
ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in());
this->ref_->sendc_ping (cb.in());
- return true;
+ return false;
}
//---------------------------------------------------------------------------
@@ -187,6 +219,7 @@ PingReceiver::~PingReceiver (void)
void
PingReceiver::ping (void)
{
+ ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping\n"));
this->entry_->status (LS_ALIVE);
PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
poa_->deactivate_object (oid.in());
@@ -197,6 +230,7 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder)
{
try
{
+ ACE_DEBUG ((LM_DEBUG, "PingReceiver::ping_excep\n"));
excep_holder->raise_exception ();
}
catch (CORBA::TRANSIENT &ex)
@@ -270,26 +304,66 @@ int
LiveCheck::handle_timeout (const ACE_Time_Value &,
const void *)
{
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout\n"));
+ bool want_reping = false;
+ ACE_Time_Value next;
for (LiveEntryMap::iterator le (this->entry_map_);
!le.done ();
le.advance ())
{
LiveEntryMap::value_type *pair = 0;
le.next(pair);
- pair->item()->do_ping (poa_.in());
+ if (pair->item()->do_ping (poa_.in ()))
+ {
+ LiveStatus status = pair->item ()->status ();
+ if (status != LS_DEAD)
+ {
+ if (!want_reping || pair->item ()->next_check() < next)
+ {
+ want_reping = true;
+ next = pair->item ()->next_check();
+ }
+ }
+ }
}
for (PerClientStack::ITERATOR pe (this->per_client_);
!pe.done ();
pe.advance ())
{
- LiveEntry *entry = 0;
- LiveEntry **n = &entry;
- pe.next(n);
- if (entry != 0 && !entry->do_ping (poa_.in()))
+ LiveEntry **entry = 0;
+ pe.next(entry);
+ if (*entry != 0)
{
- this->per_client_.remove (entry);
+ bool result = (*entry)->do_ping (poa_.in ());
+ LiveStatus status = (*entry)->status ();
+ if (result)
+ {
+ if (status != LS_DEAD)
+ {
+ if (!want_reping || (*entry)->next_check() < next)
+ {
+ want_reping = true;
+ next = (*entry)->next_check();
+ }
+ }
+ }
+ else
+ {
+ if (status != LS_PING_AWAY && status != LS_TRANSIENT)
+ {
+ this->per_client_.remove (*entry);
+ }
+ }
}
+ }
+ if (want_reping)
+ {
+ ACE_Time_Value now (ACE_OS::time());
+ ACE_Time_Value delay = next - now;
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::handle_timeout schdeuling next - in %d ms \n",
+ delay.sec() * 1000 + (delay.usec()/1000)));
+ this->reactor()->schedule_timer (this, 0, delay);
}
return 0;
@@ -354,32 +428,38 @@ LiveCheck::add_listener (LiveListener *l)
}
entry->add_listener (l);
+ this->schedule_ping (entry);
+ return true;
+}
+
+void
+LiveCheck::schedule_ping (LiveEntry *entry)
+{
ACE_Time_Value now (ACE_OS::time());
ACE_Time_Value next = entry->next_check ();
if (next <= now)
{
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("immediate callback for <%C>\n"),
- // l, l->server()));
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - immediate \n"));
this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
}
else
{
ACE_Time_Value delay = next - now;
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("callback in %d ms for <%C>\n"),
- // l, delay.sec() * 1000 + delay.usec() / 1000, l->server()));
+ ACE_DEBUG ((LM_DEBUG, "LiveCheck::schedule_ping - in %dms \n",
+ delay.sec() * 1000 + (delay.usec()/1000)));
this->reactor()->schedule_timer (this, 0, delay);
}
- return true;
}
LiveStatus
LiveCheck::is_alive (const char *server)
{
+ if (this->ping_interval_ == ACE_Time_Value::zero)
+ {
+ return LS_ALIVE;
+ }
+
ACE_CString s(server);
LiveEntry *entry = 0;
int result = entry_map_.find (s, entry);
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
index 4778cc5451c..64ccad0e7fa 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
@@ -39,9 +39,11 @@ class LiveCheck;
*/
enum LiveStatus {
LS_UNKNOWN,
+ LS_PING_AWAY,
LS_DEAD,
LS_ALIVE,
LS_TRANSIENT,
+ LS_LAST_TRANSIENT,
LS_TIMEDOUT
};
@@ -66,8 +68,8 @@ class Locator_Export LiveListener
LiveListener (const char *server);
/// called by the asynch ping receiver when a reply or an exception
- /// is received.
- virtual void status_changed (LiveStatus status, bool may_retry) = 0;
+ /// is received. Returns true if finished listening
+ virtual bool status_changed (LiveStatus status) = 0;
/// accessor for the server name. Used by the LiveCheck to associate a listener
const char *server (void) const;
@@ -112,8 +114,9 @@ class Locator_Export LiveEntry
ACE_Time_Value next_check_;
short retry_count_;
int repings_;
- bool ping_away_;
- ACE_Vector<LiveListener *> listeners_;
+
+ typedef ACE_Unbounded_Set<LiveListener *> Listen_Set;
+ Listen_Set listeners_;
TAO_SYNCH_MUTEX lock_;
static const int reping_msec_ [];
static int reping_limit_;
@@ -178,6 +181,8 @@ class Locator_Export LiveCheck : public ACE_Event_Handler
bool add_per_client_listener (LiveListener *listener,
ImplementationRepository::ServerObject_ptr ref);
+ void schedule_ping (LiveEntry *entry);
+
LiveStatus is_alive (const char *server);
const ACE_Time_Value &ping_interval (void) const;
diff --git a/TAO/utils/logWalker/Log.cpp b/TAO/utils/logWalker/Log.cpp
index bc43e48bb98..f01a59df9ef 100644
--- a/TAO/utils/logWalker/Log.cpp
+++ b/TAO/utils/logWalker/Log.cpp
@@ -552,7 +552,7 @@ Log::parse_complete_connection_i (void)
{
c_iter.remove();
// ACE_DEBUG ((LM_DEBUG,"%d: complete_connection: purging waiter\n",this->offset_));
- delete waiter;
+ // delete waiter;
break;
}
}