summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Mesnier <mesnier_p@ociweb.com>2013-03-19 13:41:09 +0000
committerPhil Mesnier <mesnier_p@ociweb.com>2013-03-19 13:41:09 +0000
commit5fd897d1479642d19e699a81a25156fcdb9aab8f (patch)
treec5ab94f78bb1efed0394f903bc8ec1027f6148e4
parente5d67bc4a7ed1c5b03c4d6cd1b2866d4589d13d5 (diff)
downloadATCD-5fd897d1479642d19e699a81a25156fcdb9aab8f.tar.gz
Tue Mar 19 13:31:43 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
-rw-r--r--TAO/ChangeLog_Asynch_ImR16
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp71
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h26
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc2
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp304
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.h177
6 files changed, 585 insertions, 11 deletions
diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR
index b1d50608e83..5d925036e3c 100644
--- a/TAO/ChangeLog_Asynch_ImR
+++ b/TAO/ChangeLog_Asynch_ImR
@@ -1,3 +1,19 @@
+Tue Mar 19 13:31:43 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
+
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.h:
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp:
+ * orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc:
+ * orbsvcs/ImplRepo_Service/LiveCheck.h:
+ * orbsvcs/ImplRepo_Service/LiveCheck.cpp:
+
+ Adding new mechanism for pinging servers asynchronously. When the IMR is
+ interested in the status of a server, it now registers a callback object and
+ a ping is scheduled. At the appropriate time, the ping request is sent using
+ AMI. The reply or exception is then handled asynchronously.
+
+ For testing in an otherwise synchronous locator, a blocking ping reply waiter
+ is used.
+
Thu Mar 14 21:36:30 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
* orbsvcs/ImplRepo_Service/AsyncStartupWaiter_i.cpp:
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
index 8dfa7737424..dc43da3a662 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
@@ -100,6 +100,7 @@ ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts)
{
this->dsi_forwarder_.init (orb);
this->adapter_.init (& this->dsi_forwarder_);
+ this->pinger_.init (orb,ping_interval_);
}
else
{
@@ -435,7 +436,7 @@ ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start)
ACE_CString serverKey;
ACE_CString server_id;
bool jacorb_server = false;
- parse_id(name, server_id, serverKey, jacorb_server);
+ this->parse_id(name, server_id, serverKey, jacorb_server);
UpdateableServerInfo info(this->repository_.get(), serverKey);
if (info.null ())
{
@@ -525,7 +526,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
// Note: We already updated info with StartupInfo in server_is_running ()
ImplementationRepository::StartupInfo_var si =
- start_server (info, manual_start, info.edit()->waiting_clients);
+ this->start_server (info, manual_start, info.edit()->waiting_clients);
}
}
@@ -539,7 +540,7 @@ ImR_Locator_i::activate_perclient_server_i (UpdateableServerInfo& shared_info,
do
{
ImplementationRepository::StartupInfo* psi =
- start_server (info, manual_start, shared_info.edit()->waiting_clients);
+ this->start_server (info, manual_start, shared_info.edit()->waiting_clients);
// waiting_clients will be updated by each call to start_server
shared_info.update_repo ();
@@ -563,7 +564,8 @@ ImR_Locator_i::activate_perclient_server_i (UpdateableServerInfo& shared_info,
}
info.edit()->reset ();
}
- } while (info->start_count < info->start_limit);
+ }
+ while (info->start_count < info->start_limit);
if (this->debug_ > 0)
{
@@ -629,8 +631,7 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start,
ACE_TEXT ("ImR: Starting server <%C>. Attempt %d/%d.\n"),
info->name.c_str (), info->start_count, info->start_limit));
}
- ainfo->activator->start_server (
- info->name.c_str (),
+ ainfo->activator->start_server (info->name.c_str (),
info->cmdline.c_str (),
info->dir.c_str (),
info->env_vars);
@@ -806,7 +807,7 @@ ImR_Locator_i::add_or_update_server (
ACE_CString serverKey;
ACE_CString server_id;
bool jacorb_server = false;
- parse_id(server, server_id, serverKey, jacorb_server);
+ this->parse_id(server, server_id, serverKey, jacorb_server);
UpdateableServerInfo info(this->repository_.get(), serverKey);
if (info.null ())
{
@@ -914,7 +915,7 @@ ImR_Locator_i::remove_server (const char* name)
ACE_CString serverKey;
ACE_CString server_id;
bool jacorb_server = false;
- parse_id(name, server_id, serverKey, jacorb_server);
+ this->parse_id(name, server_id, serverKey, jacorb_server);
Server_Info_Ptr info = this->repository_->get_server (serverKey);
if (! info.null ())
{
@@ -971,7 +972,7 @@ ImR_Locator_i::shutdown_server (const char* server)
ACE_CString name;
ACE_CString server_id;
bool jacorb_server = false;
- parse_id(server, server_id, name, jacorb_server);
+ this->parse_id(server, server_id, name, jacorb_server);
UpdateableServerInfo info(this->repository_.get(), name);
if (info.null ())
@@ -1044,7 +1045,7 @@ ImR_Locator_i::server_is_running (const char* id,
ACE_CString server_id;
ACE_CString name;
bool jacorb_server = false;
- parse_id(id, server_id, name, jacorb_server);
+ this->parse_id(id, server_id, name, jacorb_server);
if (this->debug_ > 0)
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server %C is running at %C.\n"),
@@ -1060,6 +1061,12 @@ ImR_Locator_i::server_is_running (const char* id,
if (this->unregister_if_address_reused_)
this->repository_->unregister_if_address_reused (server_id, name, partial_ior);
+ CORBA::Object_var obj = this->set_timeout_policy (server,ACE_Time_Value (1,0));
+ ImplementationRepository::ServerObject_var s =
+ ImplementationRepository::ServerObject::_narrow (obj.in());
+
+ this->pinger_.add_server (server_id.c_str(), s);
+
UpdateableServerInfo info(this->repository_.get(), name);
if (info.null ())
{
@@ -1156,7 +1163,7 @@ ImR_Locator_i::find (const char* server,
ACE_CString serverKey;
ACE_CString server_id;
bool jacorb_server = false;
- parse_id(server, server_id, serverKey, jacorb_server);
+ this->parse_id(server, server_id, serverKey, jacorb_server);
UpdateableServerInfo info(this->repository_.get(), serverKey);
if (! info.null ())
{
@@ -1648,3 +1655,45 @@ ImR_Locator_i::debug () const
{
return debug_;
}
+
+
+SyncListener::SyncListener (const char *server,
+ CORBA::ORB_ptr orb,
+ LiveCheck *pinger)
+ :LiveListener (server),
+ orb_ (CORBA::ORB::_duplicate (orb)),
+ pinger_ (pinger),
+ status_ (LS_UNKNOWN),
+ got_it_ (false),
+ retries_ (10)
+{
+}
+
+bool
+SyncListener::is_alive (void)
+{
+
+ this->status_ = this->pinger_->is_alive(this->server().c_str());
+
+ if (this->status_ == LS_ALIVE)
+ return true;
+
+ this->pinger_->add_listener (this);
+ while (!this->got_it_)
+ {
+ ACE_Time_Value delay (1,0);
+ this->orb_->perform_work (delay);
+ this->pinger_->add_listener (this);
+ }
+ this->got_it_ = false;
+ this->retries_ = 10;
+ return this->status_ != LS_DEAD;
+}
+
+void
+SyncListener::status_changed (LiveStatus status)
+{
+ this->status_ = status;
+ this->got_it_ = (status != LS_TRANSIENT) || (--this->retries_ == 0);
+}
+
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
index 0cf30dd85fa..f2dd9564848 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
@@ -10,6 +10,7 @@
#include "Adapter_Activator.h"
#include "Activator_Info.h"
#include "Forwarder.h"
+#include "LiveCheck.h"
#include "Locator_Options.h"
#include "Server_Info.h"
#include "ace/Auto_Ptr.h"
@@ -18,6 +19,7 @@
#include "ImR_LocatorS.h"
#include "AsyncStartupWaiterS.h"
+#include "LiveCheck.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -142,6 +144,9 @@ private:
/// The locator interface for the IORTable
IORTable::Locator_var ins_locator_;
+ /// The asynch server ping adapter
+ LiveCheck pinger_;
+
CORBA::ORB_var orb_;
PortableServer::POA_var root_poa_;
PortableServer::POA_var imr_poa_;
@@ -159,5 +164,26 @@ private:
bool unregister_if_address_reused_;
};
+class Locator_Export SyncListener : public LiveListener
+{
+ public:
+ SyncListener (const char *server, CORBA::ORB_ptr orb, LiveCheck *pinger);
+
+ bool is_alive (void);
+
+ void status_changed (LiveStatus status);
+
+ private:
+ CORBA::ORB_var orb_;
+ LiveCheck *pinger_;
+ LiveStatus status_;
+ bool got_it_;
+ int retries_;
+
+};
+
+
+
+
#include /**/ "ace/post.h"
#endif /* IMR_LOCATOR_I_H */
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
index 1c44cbf19df..95616644e5a 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
+++ b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
@@ -71,6 +71,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb
after += ImR_Locator_IDL ImR_Activator_IDL
libs += TAO_ImR_Locator_IDL TAO_ImR_Activator_IDL
avoids += uses_wchar
+
Source_Files {
Activator_Info.cpp
Adapter_Activator.cpp
@@ -78,6 +79,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb
ImR_Locator_i.cpp
AsyncStartupWaiter_i.cpp
INS_Locator.cpp
+ LiveCheck.cpp
Locator_XMLHandler.cpp
Locator_Loader.cpp
Locator_Options.cpp
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
new file mode 100644
index 00000000000..09d67c35e66
--- /dev/null
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
@@ -0,0 +1,304 @@
+// -*- C++ -*-
+//
+
+#include "LiveCheck.h"
+#include "tao/ORB_Core.h"
+#include "ace/Reactor.h"
+
+LiveListener::LiveListener (const char *server)
+ : server_(server)
+{
+}
+
+const ACE_CString &
+LiveListener::server (void) const
+{
+ return this->server_;
+}
+
+//---------------------------------------------------------------------------
+//---------------------------------------------------------------------------
+
+LiveEntry::LiveEntry (LiveCheck *owner,
+ ImplementationRepository::ServerObject_ptr ref)
+ : owner_ (owner),
+ ref_ (ImplementationRepository::ServerObject::_duplicate (ref)),
+ liveliness_ (LS_UNKNOWN),
+ next_check_ (ACE_OS::time()),
+ retry_count_ (0),
+ ping_away_ (false),
+ listeners_ (),
+ lock_ ()
+{
+}
+
+LiveEntry::~LiveEntry (void)
+{
+}
+
+void
+LiveEntry::add_listener (LiveListener* ll)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->listeners_.push_back (ll);
+}
+
+LiveStatus
+LiveEntry::status (void) const
+{
+ if ( this->liveliness_ == LS_ALIVE &&
+ this->owner_->ping_interval() != ACE_Time_Value::zero )
+ {
+ ACE_Time_Value now (ACE_OS::time());
+ if (now >= this->next_check_)
+ {
+ return LS_UNKNOWN;
+ }
+ }
+ return this->liveliness_;
+}
+
+void
+LiveEntry::status (LiveStatus l)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->liveliness_ = l;
+ this->ping_away_ = false;
+
+ if (l == LS_ALIVE)
+ {
+ this->retry_count_ = 0;
+ ACE_Time_Value now (ACE_OS::time());
+ this->next_check_ = now + owner_->ping_interval();
+ }
+ for (ACE_Vector_Iterator<LiveListener *> i (this->listeners_);
+ !i.done();
+ i.advance())
+ {
+ LiveListener *item = 0;
+ LiveListener **ll = &item;
+ i.next(ll);
+ if (item != 0)
+ {
+ item->status_changed (this->liveliness_);
+ }
+ }
+ this->listeners_.clear();
+}
+
+const ACE_Time_Value &
+LiveEntry::next_check (void) const
+{
+ return this->next_check_;
+}
+
+bool
+LiveEntry::do_ping (PortableServer::POA_ptr poa)
+{
+ ACE_Time_Value now (ACE_OS::time());
+ if (this->next_check_ > now || this->liveliness_ == LS_DEAD || this->ping_away_)
+ {
+ return false;
+ }
+
+ if (this->owner_->ping_interval() == ACE_Time_Value::zero)
+ return false;
+
+ switch (this->liveliness_)
+ {
+ case LS_UNKNOWN:
+ this->next_check_ = now;
+ break;
+ case LS_ALIVE:
+ case LS_TIMEDOUT:
+ this->next_check_ = now + owner_->ping_interval();
+ break;
+ case LS_TRANSIENT:
+ this->next_check_ = now + ACE_Time_Value (0,5000); // retry delay
+ break;
+ default:;
+ }
+
+ this->ping_away_ = true;
+ this->retry_count_++;
+ PortableServer::ServantBase_var callback = new PingReceiver (this, poa);
+ PortableServer::ObjectId_var oid = poa->activate_object (callback.in());
+ CORBA::Object_var obj = poa->id_to_reference (oid.in());
+ ImplementationRepository::AMI_ServerObjectHandler_var cb =
+ ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in());
+ this->ref_->sendc_ping (cb.in());
+ return true;
+}
+
+//---------------------------------------------------------------------------
+//---------------------------------------------------------------------------
+
+PingReceiver::PingReceiver (LiveEntry *entry, PortableServer::POA_ptr poa)
+ :poa_ (PortableServer::POA::_duplicate(poa)),
+ entry_ (entry)
+{
+}
+
+PingReceiver::~PingReceiver (void)
+{
+}
+
+void
+PingReceiver::ping (void)
+{
+ this->entry_->status (LS_ALIVE);
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+
+void
+PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder)
+{
+ try
+ {
+ excep_holder->raise_exception ();
+ }
+ catch (CORBA::TRANSIENT &ex)
+ {
+ const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80;
+ switch (ex.minor () & BITS_5_THRU_12_MASK)
+ {
+ case TAO_POA_DISCARDING:
+ case TAO_POA_HOLDING:
+ {
+ this->entry_->status (LS_TRANSIENT);
+ break;
+ }
+ default: //case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
+ {
+ this->entry_->status (LS_DEAD);
+ }
+ }
+ }
+ catch (CORBA::TIMEOUT &)
+ {
+ this->entry_->status (LS_TIMEDOUT);
+ }
+ catch (CORBA::Exception &)
+ {
+ this->entry_->status (LS_DEAD);
+ }
+
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+
+//---------------------------------------------------------------------------
+//---------------------------------------------------------------------------
+
+LiveCheck::LiveCheck ()
+ :ping_interval_()
+{
+}
+
+LiveCheck::~LiveCheck (void)
+{
+ while (this->entry_map_.current_size() > 0)
+ {
+ LiveEntryMap::iterator i (this->entry_map_);
+ LiveEntryMap::value_type *pair = 0;
+ i.next (pair);
+ this->entry_map_.unbind(pair);
+ delete pair->item();
+ delete pair;
+ }
+}
+
+void
+LiveCheck::init (CORBA::ORB_ptr orb,
+ const ACE_Time_Value &pi )
+{
+ this->ping_interval_ = pi;
+ ACE_Reactor *r = orb->orb_core()->reactor();
+ this->reactor (r);
+ CORBA::Object_var obj = orb->resolve_initial_references ("RootPOA");
+ this->poa_ = PortableServer::POA::_narrow (obj.in());
+}
+
+const ACE_Time_Value &
+LiveCheck::ping_interval (void) const
+{
+ return this->ping_interval_;
+}
+
+int
+LiveCheck::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ for (LiveEntryMap::iterator i (this->entry_map_);
+ !i.done();
+ i.advance ())
+ {
+ LiveEntryMap::value_type *pair = 0;
+ i.next(pair);
+ pair->item()->do_ping(poa_.in());
+ }
+ return 0;
+}
+
+void
+LiveCheck::add_server (const char *server,
+ ImplementationRepository::ServerObject_ptr ref)
+{
+ ACE_CString s(server);
+ LiveEntry *entry = 0;
+ ACE_NEW (entry, LiveEntry(this,ref));
+ int result = entry_map_.bind (s, entry);
+ if (result != 0)
+ {
+ LiveEntry *old = 0;
+ result = entry_map_.rebind (s, entry, old);
+ delete old;
+ }
+}
+
+void
+LiveCheck::remove_server (const char *server)
+{
+ ACE_CString s(server);
+ LiveEntry *entry = 0;
+ int result = entry_map_.unbind (s, entry);
+ if (result == 0)
+ delete entry;
+}
+
+void
+LiveCheck::add_listener (LiveListener *l)
+{
+ LiveEntry *entry = 0;
+ int result = entry_map_.find (l->server(), entry);
+ if (result == 0 && entry != 0)
+ {
+ entry->add_listener (l);
+ ACE_Time_Value now (ACE_OS::time());
+ ACE_Time_Value next = entry->next_check ();
+ if (next <= now)
+ {
+ this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
+ }
+ else
+ {
+ ACE_Time_Value delay = next - now;
+ this->reactor()->schedule_timer (this, 0, delay);
+ }
+
+ }
+}
+
+LiveStatus
+LiveCheck::is_alive (const char *server)
+{
+ ACE_CString s(server);
+ LiveEntry *entry = 0;
+ int result = entry_map_.find (s, entry);
+ if (result == 0 && entry != 0)
+ {
+ return entry->status ();
+ }
+ return LS_DEAD;
+}
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
new file mode 100644
index 00000000000..1170a056c03
--- /dev/null
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
@@ -0,0 +1,177 @@
+// -*- C++ -*-
+/*
+ * @file LiveCheck.h
+ *
+ * $Id$
+ *
+ * @author Phil Mesnier <mesnier_p@ociweb.com>
+ */
+
+#ifndef IMR_LIVECHECK_H_
+#define IMR_LIVECHECK_H_
+
+#include "locator_export.h"
+
+#include "tao/ImR_Client/ServerObjectS.h" // ServerObject_AMIS.h
+
+#include "ace/Vector_T.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/SString.h"
+#include "ace/Event_Handler.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+#pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class LiveCheck;
+
+/*---------------------------------------------------------------------------
+ * @enum LiveStatus
+ *
+ * @brief indication of the known condition of a target server
+ *
+ * LS_UNKNOWN - The server hasn't yet been pinged
+ * LS_DEAD - The ping failed for reasons other than POA Activation
+ * LS_ALIVE - The server positively acknowledged a ping
+ * LS_TRANSIENT - The server connected, but acively raised a transient
+ * LS_TIMEDOUT - The server connected, but never returned any result.
+ */
+enum LiveStatus {
+ LS_UNKNOWN,
+ LS_DEAD,
+ LS_ALIVE,
+ LS_TRANSIENT,
+ LS_TIMEDOUT
+};
+
+/*---------------------------------------------------------------------------
+ * @class LiveListener
+ *
+ * @brief An interface for receiving asynch liveness status updates
+ *
+ * The code waiting on a confirmation of liveness status creates an instance
+ * of a LiveListener and registers it with the LiveCheck object.
+ * When the desired ping occurs, the status_changed method is called and the
+ * listener is unregistered. It is up to the owner of the listener to re-
+ * register if the ping result was inconclusive, such as a status of TRANSIENT
+ * or TIMEDOUT. Such a decision is based on configuration settings.
+ */
+class Locator_Export LiveListener
+{
+ public:
+ /// Construct a new listener. The server name suppled is used to
+ /// look up a listener entry in the LiveCheck map.
+ LiveListener (const char *server);
+
+ /// called by the asynch ping receiver when a reply or an exception
+ /// is received.
+ virtual void status_changed (LiveStatus status) = 0;
+
+ /// accessor for the server name. Used by the LiveCheck to associate a listener
+ const ACE_CString & server (void) const;
+
+ private:
+ ACE_CString server_;
+};
+
+/*---------------------------------------------------------------------------
+ * @class LiveEntry
+ *
+ * @brief Contains a list of interested listeners for a server
+ *
+ * Each server the Locator is interested in has a live entry instance.
+ * This holds the liveliness status and determines the next allowed time
+ * for a ping. Instances of the LiveEntry class are retained until the
+ * locator is no longer interested in the target server.
+ */
+class Locator_Export LiveEntry
+{
+ public:
+ LiveEntry (LiveCheck *owner, ImplementationRepository::ServerObject_ptr ref);
+ ~LiveEntry (void);
+
+ void add_listener (LiveListener * ll);
+ LiveStatus status (void) const;
+ void status (LiveStatus l);
+ bool do_ping (PortableServer::POA_ptr poa);
+ const ACE_Time_Value &next_check (void) const;
+
+ private:
+ LiveCheck *owner_;
+ ImplementationRepository::ServerObject_var ref_;
+ LiveStatus liveliness_;
+ ACE_Time_Value next_check_;
+ short retry_count_;
+ bool ping_away_;
+ ACE_Vector<LiveListener *> listeners_;
+ TAO_SYNCH_MUTEX lock_;
+};
+
+/*---------------------------------------------------------------------------
+ * @class PingReceiver
+ *
+ * @brief callback handler for asynch ping requests
+ *
+ * An instance of the ping receiver is used to handle the reply from a ping
+ * request. Instances are created for the ping, then destroyed.
+`*/
+class Locator_Export PingReceiver :
+ public virtual POA_ImplementationRepository::AMI_ServerObjectHandler
+{
+ public:
+ PingReceiver (LiveEntry * entry, PortableServer::POA_ptr poa);
+ virtual ~PingReceiver (void);
+
+ void ping (void);
+ void ping_excep (Messaging::ExceptionHolder * excep_holder);
+
+ private:
+ PortableServer::POA_var poa_;
+ LiveEntry * entry_;
+};
+
+/*---------------------------------------------------------------------------
+ * @class LiveCheck
+ *
+ * @brief The manager class used for pinging servers as needed.
+ *
+ * The LiveCheck class maintains a map of named LiveEntries. When the locator
+ * needs to determine the liveliness of a server, registers a LiveListener
+ * for the desired server. A ping to the server is then scheduled, based on the
+ * limits determined by the entry's state.
+ */
+class Locator_Export LiveCheck : public ACE_Event_Handler
+{
+ public:
+ LiveCheck ();
+ ~LiveCheck (void);
+
+ void init (CORBA::ORB_ptr orb, const ACE_Time_Value &interval);
+
+ int handle_timeout (const ACE_Time_Value &current_time,
+ const void *act = 0);
+
+ void add_server (const char *server,
+ ImplementationRepository::ServerObject_ptr ref);
+ void remove_server (const char *server);
+
+ void add_listener (LiveListener *waiter);
+
+ LiveStatus is_alive (const char *server);
+
+ const ACE_Time_Value &ping_interval (void) const;
+
+ private:
+ typedef ACE_Hash_Map_Manager_Ex<ACE_CString,
+ LiveEntry *,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ TAO_SYNCH_MUTEX> LiveEntryMap;
+
+ LiveEntryMap entry_map_;
+ PortableServer::POA_var poa_;
+ ACE_Time_Value ping_interval_;
+};
+
+#endif /* IMR_LIVECHECK_H_ */
+