summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Mesnier <mesnier_p@ociweb.com>2013-03-28 17:08:01 +0000
committerPhil Mesnier <mesnier_p@ociweb.com>2013-03-28 17:08:01 +0000
commit2f975d3f5493837eb0429e07d31495638f93625f (patch)
treea2a2e3ec69aff1cd5223a8f5c5d6d195a4833877
parent35ef2426915e032ab1f294c95f6199a0cf24cf81 (diff)
downloadATCD-2f975d3f5493837eb0429e07d31495638f93625f.tar.gz
Thu Mar 28 17:05:52 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
-rw-r--r--TAO/ChangeLog_Asynch_ImR14
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp334
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h68
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp230
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h40
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc2
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp118
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.h10
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h12
9 files changed, 662 insertions, 166 deletions
diff --git a/TAO/ChangeLog_Asynch_ImR b/TAO/ChangeLog_Asynch_ImR
index f5a9875a436..f2fd175d59a 100644
--- a/TAO/ChangeLog_Asynch_ImR
+++ b/TAO/ChangeLog_Asynch_ImR
@@ -1,3 +1,17 @@
+Thu Mar 28 17:05:52 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
+
+ * orbsvcs/ImplRepo_Service/AsyncAccessManager.h:
+ * orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp:
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.h:
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp:
+ * orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc:
+ * orbsvcs/ImplRepo_Service/LiveCheck.h:
+ * orbsvcs/ImplRepo_Service/LiveCheck.cpp:
+ * orbsvcs/ImplRepo_Service/Locator_Repository.h:
+
+ Further improvments to the implementation. At the point where the basic ImplRepo
+ test passes.
+
Tue Mar 26 23:58:50 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
* orbsvcs/ImplRepo_Service/Adapter_Activator.h:
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
index daa4efdc025..1368c1bbdac 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
@@ -3,84 +3,318 @@
#include "AsyncAccessManager.h"
#include "ImR_Locator_i.h"
+#include "Locator_Repository.h"
+
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-#if 0
-class AsyncAccessManager
+AsyncAccessManager::AsyncAccessManager (const Server_Info &info,
+ bool manual,
+ ImR_Locator_i &locator)
+ :info_(0),
+ manual_start_ (manual),
+ locator_(locator),
+ poa_(locator.root_poa()),
+ rh_list_(),
+ status_(AAM_INIT),
+ refcount_(1),
+ lock_()
{
- public:
- AsyncAccessManager (UpdateableServerInfo &info, ImR_Locator_i &locator);
- ~AsyncAccessManager (void);
+ ACE_DEBUG ((LM_DEBUG,"New AAM: this = %x, name = %s\n",
+ this, info.name.c_str()));
+ this->info_ = new Server_Info (info);
+}
- void add_interest (ImR_ReplyHandler *rh);
- AAM_Status status (void) const;
+AsyncAccessManager::~AsyncAccessManager (void)
+{
+ delete this->info_;
+}
- void activator_replied (void);
- void server_is_running (void);
- void ping_replied (bool is_alive);
- void
+bool
+AsyncAccessManager::has_server (const char *s)
+{
+ return ACE_OS::strcmp (this->info_->name.c_str(),s) == 0;
+}
- void add_ref (void);
- void remove_ref (void);
+void
+AsyncAccessManager::add_interest (ImR_ReplyHandler *rh)
+{
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->rh_list_.push_back (rh);
+ }
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::add_interest: ")
+ ACE_TEXT ("server = <%C>, status = %d count = %d\n"),
+ this->info_->name.c_str(), this->status_, this->rh_list_.size()));
+ }
- private:
- UpdateableServerInfo &info_;
- ImR_Locator_i &locator_;
+ if (this->status_ == AAM_SERVER_READY)
+ {
+ if (this->locator_.pinger().is_alive (this->info_->name.c_str()))
+ {
+ this->final_state();
+ return;
+ }
+ }
- ACE_Stack<ImR_ReplyHandler *> rh_list_;
+ if (this->status_ == AAM_INIT || this->status_ == AAM_SERVER_READY)
+ {
+ // This is not a leak. The listener registers with
+ // the pinger and will delete itself when done.
+ AsyncLiveListener *l = 0;
+ ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(),
+ *this,
+ this->locator_.pinger()));
+ if (!l->start())
+ {
+ if (!this->send_start_request())
+ {
+ this->final_state();
+ }
+ }
+ else
+ {
+ this->status (AAM_WAIT_FOR_PING);
+ }
+ }
+}
- AAM_Status status_;
+void
+AsyncAccessManager::final_state (void)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::final_state: ")
+ ACE_TEXT ("status = %d, pior = <%C>\n"),
+ this->status_, this->info_->partial_ior.c_str()));
+ }
- int refcount_;
- TAO_SYNCH_MUTEX lock_;
+ for (size_t i = 0; i < this->rh_list_.size(); i++)
+ {
+ ImR_ReplyHandler *rh = this->rh_list_[i];
+ if (rh != 0)
+ {
+ if (this->status_ == AAM_SERVER_READY)
+ {
+ rh->send_ior (this->info_->partial_ior.c_str());
+ }
+ else
+ {
+ rh->send_exception ();
+ }
+ }
+ }
+ this->rh_list_.clear ();
+ if (this->info_->activation_mode == ImplementationRepository::PER_CLIENT ||
+ this->status_ != AAM_SERVER_READY)
+ {
+ this->locator_.remove_aam (this);
+ }
+}
-};
-#endif
+AAM_Status
+AsyncAccessManager::status (void) const
+{
+ return this->status_;
+}
+
+void
+AsyncAccessManager::status (AAM_Status s)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->status_ = s;
+}
+
+void
+AsyncAccessManager::activator_replied (bool success)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::activator_replied: ")
+ ACE_TEXT ("success = %d, status = %d\n"),
+ success, this->status_));
+ }
+ if (success)
+ {
+ this->status (AAM_WAIT_FOR_RUNNING);
+ }
+ else
+ {
+ this->status (AAM_NO_ACTIVATOR);
+ this->final_state ();
+ }
+}
+
+void
+AsyncAccessManager::server_is_running (const char *partial_ior)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::server_is_running: ")
+ ACE_TEXT ("name = %C, status = %d\n"),
+ this->info_->name.c_str(), this->status_));
+ }
+ this->status (AAM_WAIT_FOR_ALIVE);
+ this->info_->partial_ior = partial_ior;
+
+ if (this->locator_.pinger().is_alive (this->info_->name.c_str()))
+ {
+ this->status (AAM_SERVER_READY);
+ this->final_state ();
+ }
+
+ // This is not a leak. The listener registers with
+ // the pinger and will delete itself when done.
+ AsyncLiveListener *l = 0;
+ ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(),
+ *this,
+ this->locator_.pinger()));
+ if (!l->start())
+ {
+ this->status (AAM_SERVER_DEAD);
+ this->final_state ();
+ }
+}
+
+void
+AsyncAccessManager::ping_replied (bool is_alive)
+{
+ if (is_alive)
+ {
+ this->status (AAM_SERVER_READY);
+ }
+ else if (this->status_ == AAM_WAIT_FOR_PING)
+ {
+ if (this->send_start_request ())
+ {
+ return;
+ }
+ }
+ else
+ {
+ this->status (AAM_SERVER_DEAD);
+ }
+ this->final_state();
+}
+
+bool
+AsyncAccessManager::send_start_request (void)
+{
+ if (this->info_->activation_mode == ImplementationRepository::MANUAL &&
+ !this->manual_start_)
+ {
+ this->status (AAM_NOT_MANUAL);
+ return false;
+ }
+
+ Activator_Info_Ptr ainfo =
+ this->locator_.get_activator (this->info_->activator);
+
+ if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ()))
+ {
+ this->status (AAM_NO_ACTIVATOR);
+ return false;
+ }
+
+ PortableServer::ServantBase_var callback = new ActivatorReceiver (this,
+ this->poa_.in());
+ PortableServer::ObjectId_var oid = this->poa_->activate_object (callback.in());
+ CORBA::Object_var obj = this->poa_->id_to_reference (oid.in());
+ ImplementationRepository::AMI_ActivatorHandler_var cb =
+ ImplementationRepository::AMI_ActivatorHandler::_narrow (obj.in());
+
+ ainfo->activator->sendc_start_server (cb.in(),
+ this->info_->name.c_str (),
+ this->info_->cmdline.c_str (),
+ this->info_->dir.c_str (),
+ this->info_->env_vars);
+ this->status (AAM_ACTIVATION_SENT);
+ return true;
+}
+
+void
+AsyncAccessManager::add_ref (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ ++this->refcount_;
+}
+
+void
+AsyncAccessManager::remove_ref (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ if (--this->refcount_ == 0)
+ {
+ delete this;
+ }
+}
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-ImR_Loc_ReplyHandler::ImR_Loc_ReplyHandler (AMH_ImplementationRepository::LocatorResponseHandler_ptr rh)
- :rh_ (AMH_ImplementationRepository::LocatorResponseHandler::_duplicate(rh))
+ActivatorReceiver::ActivatorReceiver (AsyncAccessManager *aam,
+ PortableServer::POA_ptr poa)
+ :aam_ (aam),
+ poa_ (PortableServer::POA::_duplicate (poa))
{
+ this->aam_->add_ref ();
}
-ImR_Loc_ReplyHandler::~ImR_Loc_ReplyHandler (void)
+
+ActivatorReceiver::~ActivatorReceiver (void)
{
+ this->aam_->remove_ref ();
}
void
-ImR_Loc_ReplyHandler::send_ior (const char *)
+ActivatorReceiver::start_server (void)
{
- rh_->activate_server (); // void return
- delete this;
+ this->aam_->activator_replied (true);
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+void
+ActivatorReceiver::start_server_excep (Messaging::ExceptionHolder *)
+{
+ this->aam_->activator_replied (false);
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+
+void
+ActivatorReceiver::shutdown (void)
+{
+ // no-op, just satisfy virtual function
}
void
-ImR_Loc_ReplyHandler::send_exception (void)
+ActivatorReceiver::shutdown_excep (Messaging::ExceptionHolder * )
{
- CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code
- ( TAO_IMPLREPO_MINOR_CODE, 0),
- CORBA::COMPLETED_NO);
- TAO_AMH_DSI_Exception_Holder h(&ex);
- resp_->invoke_excep(&h);
- delete this;
+ // no-op, just satisfy virtual function
}
+
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-AsyncLiveListener::AsyncLiveListener (AsyncAccessManager &aam, LiveCheck &pinger)
- :aam_ (aam),
+AsyncLiveListener::AsyncLiveListener (const char *server,
+ AsyncAccessManager &aam,
+ LiveCheck &pinger)
+ :LiveListener (server),
+ aam_ (aam),
pinger_ (pinger),
- status_ (LS_UNKNOWN),
+ status_ (LS_UNKNOWN)
{
this->aam_.add_ref ();
- this->pinger_.add_listener (this);
}
AsyncLiveListener::~AsyncLiveListener (void)
@@ -88,14 +322,34 @@ AsyncLiveListener::~AsyncLiveListener (void)
this->aam_.remove_ref ();
}
+bool
+AsyncLiveListener::start (void)
+{
+ bool rtn = this->pinger_.add_listener (this);
+ if (!rtn)
+ delete this;
+ return rtn;
+}
+
void
AsyncLiveListener::status_changed (LiveStatus status, bool may_retry)
{
this->status_ = status;
if (status == LS_TRANSIENT && may_retry)
- this->pinger_.add_listener (this);
+ {
+ if (!this->pinger_.add_listener (this))
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "AsyncLiveListener::status_changed, deleting(1)\n"));
+ this->aam_.ping_replied (false);
+ delete this;
+ }
+ }
else
{
+ ACE_DEBUG ((LM_DEBUG,
+ "AsyncLiveListener::status_changed, status = %d, deleting(2)\n", status));
this->aam_.ping_replied (status != LS_DEAD);
+ delete this;
}
}
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
index f915419d896..e450440cb2d 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.h
@@ -12,11 +12,12 @@
#include "locator_export.h"
-#include "tao/ImR_Client/ServerObjectS.h" // ServerObject_AMIS.h
-
+#include "ImR_ActivatorS.h" // ImR_Activator_AMIS.h
#include "ace/Vector_T.h"
#include "ace/SString.h"
+#include "Forwarder.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
@@ -25,7 +26,7 @@
class ImR_Locator_i;
class ImR_ReplyHandler;
-class UpdateableServerInfo;
+struct Server_Info;
//----------------------------------------------------------------------------
/*
@@ -53,59 +54,77 @@ enum AAM_Status
AAM_ACTIVATION_SENT,
AAM_WAIT_FOR_RUNNING,
AAM_WAIT_FOR_PING,
+ AAM_WAIT_FOR_ALIVE,
AAM_SERVER_READY,
- AAM_SERVER_DEAD
+ AAM_SERVER_DEAD,
+ AAM_NOT_MANUAL,
+ AAM_NO_ACTIVATOR
};
class AsyncAccessManager
{
public:
- AsyncAccessManager (UpdateableServerInfo &info, ImR_Locator_i &locator);
+ AsyncAccessManager (const Server_Info &info,
+ bool manual,
+ ImR_Locator_i &locator);
+
~AsyncAccessManager (void);
+ bool has_server (const char *name);
+
void add_interest (ImR_ReplyHandler *rh);
AAM_Status status (void) const;
- void activator_replied (void);
- void server_is_running (void);
+ void activator_replied (bool success);
+ void server_is_running (const char *partial_ior);
void ping_replied (bool is_alive);
void add_ref (void);
void remove_ref (void);
private:
- UpdateableServerInfo &info_;
- ImR_Locator_i &locator_;
+ void final_state (void);
+ void status (AAM_Status s);
+ bool send_start_request (void);
+ Server_Info *info_;
+ bool manual_start_;
+ ImR_Locator_i &locator_;
+ PortableServer::POA_var poa_;
ACE_Vector<ImR_ReplyHandler *> rh_list_;
AAM_Status status_;
int refcount_;
TAO_SYNCH_MUTEX lock_;
-
};
+
+
//----------------------------------------------------------------------------
/*
- * @class ImR_Loc_ReplyHandler
+ * @class ActivatorReceiver
+ *
+ * @brief callback for handling asynch server startup requests
*
- * @brief specialized reply handler for Locator interface calls which have a
- * void return.
*/
-class ImR_Loc_ReplyHandler : public ImR_ReplyHandler
+class ActivatorReceiver :
+ public virtual POA_ImplementationRepository::AMI_ActivatorHandler
{
public:
- ImR_Loc_ReplyHandler (ImplementationRepository::AMH_LocatorResponseHandler_ptr rh);
- virtual ~ImR_Loc_ReplyHandler (void);
+ ActivatorReceiver (AsyncAccessManager *aam, PortableServer::POA_ptr poa);
+ virtual ~ActivatorReceiver (void);
- virtual void send_ior (const char *pior);
- virtual void send_exception (void);
+ void start_server (void);
+ void start_server_excep (Messaging::ExceptionHolder * excep_holder);
-private:
- ImplementationRepository::AMH_LocatorResponseHandler_var rh_;
+ void shutdown (void);
+ void shutdown_excep (Messaging::ExceptionHolder * excep_holder);
+private:
+ AsyncAccessManager *aam_;
+ PortableServer::POA_var poa_;
};
@@ -116,14 +135,17 @@ private:
class AsyncLiveListener : public LiveListener
{
public:
- AsyncLiveListener (AsyncAccessManager &aam, LiveCheck *pinger);
- ~AsyncLiveListener (void);
+ AsyncLiveListener (const char * server,
+ AsyncAccessManager &aam,
+ LiveCheck &pinger);
+ virtual ~AsyncLiveListener (void);
+ bool start (void);
void status_changed (LiveStatus status, bool may_retry);
private:
AsyncAccessManager &aam_;
- LiveCheck *pinger_;
+ LiveCheck &pinger_;
LiveStatus status_;
};
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
index 621a85f3efe..f1ad8da100f 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
@@ -55,6 +55,7 @@ createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) {
ImR_Locator_i::ImR_Locator_i (void)
: dsi_forwarder_ (*this)
, ins_locator_ (0)
+ , aam_set_ ()
, debug_ (0)
, read_only_ (false)
, unregister_if_address_reused_ (false)
@@ -502,6 +503,31 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
bool manual_start,
ImR_ReplyHandler *rh)
{
+ AsyncAccessManager *aam = 0;
+ if (info->activation_mode == ImplementationRepository::PER_CLIENT)
+ {
+ ACE_NEW (aam, AsyncAccessManager (*info, manual_start, *this));
+ this->aam_set_.insert (aam);
+ }
+ else
+ {
+ aam = this->find_aam (info->name.c_str());
+ if (aam == 0)
+ {
+ ACE_NEW (aam, AsyncAccessManager (*info, manual_start, *this));
+ this->aam_set_.insert (aam);
+ }
+ }
+ aam->add_interest (rh);
+}
+
+
+#if 0
+void
+ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
+ bool manual_start,
+ ImR_ReplyHandler *rh)
+{
if (info->activation_mode == ImplementationRepository::PER_CLIENT)
{
activate_perclient_server_i (info, manual_start,rh);
@@ -518,6 +544,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
rh->send_exception ();
}
}
+#endif
char*
ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
@@ -656,7 +683,7 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start,
"No command line registered for server.");
}
- Activator_Info_Ptr ainfo = get_activator (info->activator);
+ Activator_Info_Ptr ainfo = this->get_activator (info->activator);
if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ()))
{
@@ -668,7 +695,6 @@ ImR_Locator_i::start_server (UpdateableServerInfo& info, bool manual_start,
"No activator registered for server.");
}
- ImplementationRepository::StartupInfo_var si;
try
{
++waiting_clients;
@@ -1126,28 +1152,30 @@ ImR_Locator_i::server_is_running (const char* id,
if (info.null ())
{
if (this->debug_ > 0)
- ACE_DEBUG ((
- LM_DEBUG,
- ACE_TEXT ("ImR: Auto adding NORMAL server <%C>.\n"),
- name.c_str ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("ImR: Auto adding NORMAL server <%C>.\n"),
+ name.c_str ()));
+ }
ImplementationRepository::EnvironmentList env (0);
this->repository_->add_server (server_id,
- name,
- jacorb_server,
- "", // no activator
- "", // no cmdline
- ImplementationRepository::EnvironmentList (),
- "", // no working dir
- ImplementationRepository::NORMAL,
- DEFAULT_START_LIMIT,
- partial_ior,
- ior.in (),
- ImplementationRepository::ServerObject::_nil () // Will connect at first access
- );
+ name,
+ jacorb_server,
+ "", // no activator
+ "", // no cmdline
+ ImplementationRepository::EnvironmentList (),
+ "", // no working dir
+ ImplementationRepository::NORMAL,
+ DEFAULT_START_LIMIT,
+ partial_ior,
+ ior.in (),
+ ImplementationRepository::ServerObject::_nil ()
+ );
}
else
{
+#if 0
if (info->server_id != server_id)
{
if (! info->server_id.empty())
@@ -1158,29 +1186,58 @@ ImR_Locator_i::server_is_running (const char* id,
info.edit ()->server_id = server_id;
}
- if (info->activation_mode != ImplementationRepository::PER_CLIENT) {
- info.edit ()->ior = ior.in ();
- info.edit ()->partial_ior = partial_ior;
- // Will connect at first access
- info.edit ()->server = ImplementationRepository::ServerObject::_nil ();
+ if (info->activation_mode != ImplementationRepository::PER_CLIENT)
+ {
+ info.edit ()->ior = ior.in ();
+ info.edit ()->partial_ior = partial_ior;
+ // Will connect at first access
+ info.edit ()->server = ImplementationRepository::ServerObject::_nil ();
- info.update_repo();
+ info.update_repo();
- waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), false);
- } else {
- // Note : There's no need to unblock all the waiting request until
- // we know the final status of the server.
- if (info->waiting_clients > 0)
- {
- waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), true);
+ waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), false);
}
- else if (this->debug_ > 1)
+ else
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("ImR - Ignoring server_is_running due to no ")
- ACE_TEXT ("waiting PER_CLIENT clients.\n")));
+ // Note : There's no need to unblock all the waiting request until
+ // we know the final status of the server.
+ if (info->waiting_clients > 0)
+ {
+ waiter_svt_.unblock_one (name.c_str (), partial_ior, ior.in (), true);
+ }
+ else if (this->debug_ > 1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("ImR - Ignoring server_is_running due to no ")
+ ACE_TEXT ("waiting PER_CLIENT clients.\n")));
+ }
}
+#else
+
+ if (info->server_id != server_id)
+ {
+ if (! info->server_id.empty())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("ImR - WARNING: server \"%C\" changed server id from ")
+ ACE_TEXT ("\"%C\" to \"%C\" waiting PER_CLIENT clients.\n"),
+ name.c_str (), info->server_id.c_str (), server_id.c_str ()));
+ info.edit ()->server_id = server_id;
}
+
+ if (info->activation_mode != ImplementationRepository::PER_CLIENT)
+ {
+ info.edit ()->ior = ior.in ();
+ info.edit ()->partial_ior = partial_ior;
+ info.edit ()->server = s;
+
+ info.update_repo();
+
+ }
+
+ AsyncAccessManager *aam = this->find_aam (name.c_str());
+ if (aam != 0)
+ aam->server_is_running (partial_ior);
+#endif
}
}
@@ -1206,6 +1263,13 @@ ImR_Locator_i::server_is_shutting_down (const char* server)
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server <%C> is shutting down.\n"),
server));
+ this->pinger_.remove_server (server);
+ AsyncAccessManager *aam = this->find_aam (server);
+ if (aam != 0)
+ {
+ this->remove_aam (aam);
+ }
+
info.edit ()->reset ();
}
@@ -1489,7 +1553,57 @@ ImR_Locator_i::is_alive (UpdateableServerInfo& info)
int
ImR_Locator_i::debug () const
{
- return debug_;
+ return this->debug_;
+}
+
+LiveCheck&
+ImR_Locator_i::pinger (void)
+{
+ return this->pinger_;
+}
+
+PortableServer::POA_ptr
+ImR_Locator_i::root_poa (void)
+{
+ return PortableServer::POA::_duplicate (this->root_poa_.in());
+}
+
+void
+ImR_Locator_i::remove_aam (AsyncAccessManager *aam)
+{
+ this->aam_set_.remove (aam);
+ aam->remove_ref();
+}
+
+AsyncAccessManager *
+ImR_Locator_i::find_aam (const char *name)
+{
+
+ if (debug_ > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) find_aam: name = %s, aam_set_.size = %d\n"),
+ name, aam_set_.size()));
+ }
+
+ for (AAM_Set::ITERATOR i(this->aam_set_);
+ !i.done();
+ i.advance ())
+ {
+ AsyncAccessManager **entry = 0;
+ i.next(entry);
+ if (*entry != 0 && (*entry)->has_server (name))
+ {
+ return (*entry);
+ }
+ }
+ if (debug_ > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) find_aam: did not find\n")));
+ }
+
+ return 0;
}
//-------------------------------------------------------------------------
@@ -1511,10 +1625,6 @@ SyncListener::is_alive (void)
{
this->status_ = this->pinger_.is_alive(this->server());
- // ACE_DEBUG ((LM_DEBUG,
- // "ImR: SyncListener::is_alive() this = %x, server = <%C>, status = %d\n",
- // this, this->server(), status_));
-
if (this->status_ == LS_ALIVE)
return true;
else if (this->status_ == LS_DEAD)
@@ -1525,7 +1635,10 @@ SyncListener::is_alive (void)
{
if (this->callback_)
{
- this->pinger_.add_listener (this);
+ if (!this->pinger_.add_listener (this))
+ {
+ return false;
+ }
}
this->callback_ = false;
ACE_Time_Value delay (10,0);
@@ -1541,8 +1654,37 @@ SyncListener::status_changed (LiveStatus status, bool may_retry)
this->callback_ = true;
this->status_ = status;
this->got_it_ = (status != LS_TRANSIENT) || (! may_retry);
- // ACE_DEBUG ((LM_DEBUG,
- // "SynchLisener(%x)::status_changed, got it = %d, status = %d\n",
- // this, got_it_, status));
}
+//---------------------------------------------------------------------------
+//---------------------------------------------------------------------------
+#if 0
+ImR_Loc_ReplyHandler::ImR_Loc_ReplyHandler (AMH_ImplementationRepository::LocatorResponseHandler_ptr rh)
+ :rh_ (AMH_ImplementationRepository::LocatorResponseHandler::_duplicate(rh))
+{
+}
+
+ImR_Loc_ReplyHandler::~ImR_Loc_ReplyHandler (void)
+{
+}
+
+void
+ImR_Loc_ReplyHandler::send_ior (const char *)
+{
+ rh_->activate_server (); // void return
+ delete this;
+
+}
+
+void
+ImR_Loc_ReplyHandler::send_exception (void)
+{
+ CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code
+ ( TAO_IMPLREPO_MINOR_CODE, 0),
+ CORBA::COMPLETED_NO);
+ TAO_AMH_DSI_Exception_Holder h(&ex);
+ resp_->invoke_excep(&h);
+ delete this;
+}
+
+#endif
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
index 742dfac7013..132a9459abc 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.h
@@ -15,12 +15,12 @@
#include "Server_Info.h"
#include "ace/Auto_Ptr.h"
#include "AsyncStartupWaiter_i.h"
+#include "AsyncAccessManager.h"
#include "tao/IORTable/IORTable.h"
#include "ImR_LocatorS.h"
#include "AsyncStartupWaiterS.h"
#include "LiveCheck.h"
-//#include "AsyncAccessManager.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -100,6 +100,13 @@ public:
bool manual_start,
ImR_ReplyHandler *rh);
+ LiveCheck &pinger (void);
+ PortableServer::POA_ptr root_poa (void);
+ Activator_Info_Ptr get_activator (const ACE_CString& name);
+
+ void remove_aam (AsyncAccessManager *aam);
+ AsyncAccessManager *find_aam (const char *name);
+
private:
char* activate_server_i (UpdateableServerInfo& info,
@@ -126,7 +133,6 @@ private:
void unregister_activator_i(const char* activator);
- Activator_Info_Ptr get_activator (const ACE_CString& name);
void connect_activator (Activator_Info& info);
void auto_start_servers(void);
@@ -138,7 +144,11 @@ private:
PortableServer::POA_ptr findPOA(const char* name);
- void parse_id(const char* id, ACE_CString& server_id, ACE_CString& name, bool& jacorb_server);
+ void parse_id(const char* id,
+ ACE_CString& server_id,
+ ACE_CString& name,
+ bool& jacorb_server);
+
private:
// The class that handles the forwarding.
@@ -154,6 +164,10 @@ private:
/// The asynch server ping adapter
LiveCheck pinger_;
+ /// A collection of asynch activator instances
+ typedef ACE_Unbounded_Set<AsyncAccessManager *> AAM_Set;
+ AAM_Set aam_set_;
+
CORBA::ORB_var orb_;
PortableServer::POA_var root_poa_;
PortableServer::POA_var imr_poa_;
@@ -192,6 +206,26 @@ class SyncListener : public LiveListener
bool callback_;
};
+//----------------------------------------------------------------------------
+/*
+ * @class ImR_Loc_ReplyHandler
+ *
+ * @brief specialized reply handler for Locator interface calls which have a
+ * void return.
+ */
+class ImR_Loc_ReplyHandler : public ImR_ReplyHandler
+{
+public:
+ ImR_Loc_ReplyHandler (ImplementationRepository::AMH_LocatorResponseHandler_ptr rh);
+ virtual ~ImR_Loc_ReplyHandler (void);
+
+ virtual void send_ior (const char *pior);
+ virtual void send_exception (void);
+
+private:
+ ImplementationRepository::AMH_LocatorResponseHandler_var rh_;
+
+};
#include /**/ "ace/post.h"
#endif /* IMR_LOCATOR_I_H */
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
index d553232971f..2c12b0b3d85 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
+++ b/TAO/orbsvcs/ImplRepo_Service/ImplRepo_Service.mpc
@@ -76,7 +76,7 @@ project(ImR_Locator) : orbsvcslib, orbsvcs_output, conv_lib, avoids_minimum_corb
Activator_Info.cpp
Adapter_Activator.cpp
AsyncStartupWaiter_i.cpp
-// AsyncAccessManager.cpp
+ AsyncAccessManager.cpp
Forwarder.cpp
ImR_Locator_i.cpp
INS_Locator.cpp
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
index 3539b4548b0..c9ca79c4368 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
@@ -102,32 +102,14 @@ LiveEntry::status (LiveStatus l)
ACE_Time_Value now (ACE_OS::time());
this->next_check_ = now + owner_->ping_interval();
}
-#if 0
- for (ACE_Vector_Iterator<LiveListener *> i (this->listeners_);
- !i.done();
- i.advance())
- {
- LiveListener **ll = 0;
- i.next(ll);
- if (*ll != 0)
- {
- (*ll)->status_changed (this->liveliness_, this->reping_available());
- }
- }
-#else
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveEntry::status(%d), server = %s,")
- // ACE_TEXT (" listeners.size = %d\n"),
- // l, this->server_.c_str(), listeners_.size()));
for (size_t i = 0; i < this->listeners_.size(); i++)
{
LiveListener *ll = this->listeners_[i];
if (ll != 0)
{
- (ll)->status_changed (this->liveliness_, this->reping_available());
+ ll->status_changed (this->liveliness_, this->reping_available());
}
}
-#endif
this->listeners_.clear();
}
@@ -141,7 +123,12 @@ bool
LiveEntry::do_ping (PortableServer::POA_ptr poa)
{
ACE_Time_Value now (ACE_OS::time());
- if (this->next_check_ > now || this->liveliness_ == LS_DEAD || this->ping_away_)
+ if (this->ping_away_)
+ {
+ return true;
+ }
+
+ if (this->next_check_ > now || this->liveliness_ == LS_DEAD)
{
return false;
}
@@ -200,7 +187,6 @@ PingReceiver::~PingReceiver (void)
void
PingReceiver::ping (void)
{
- // ACE_DEBUG ((LM_DEBUG,"ping received\n"));
this->entry_->status (LS_ALIVE);
PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
poa_->deactivate_object (oid.in());
@@ -284,14 +270,28 @@ int
LiveCheck::handle_timeout (const ACE_Time_Value &,
const void *)
{
- for (LiveEntryMap::iterator i (this->entry_map_);
- !i.done();
- i.advance ())
+ for (LiveEntryMap::iterator le (this->entry_map_);
+ !le.done ();
+ le.advance ())
{
LiveEntryMap::value_type *pair = 0;
- i.next(pair);
- pair->item()->do_ping(poa_.in());
+ le.next(pair);
+ pair->item()->do_ping (poa_.in());
}
+ for (PerClientStack::ITERATOR pe (this->per_client_);
+ !pe.done ();
+ pe.advance ())
+ {
+ LiveEntry *entry = 0;
+ LiveEntry **n = &entry;
+ pe.next(n);
+ if (entry != 0 && !entry->do_ping (poa_.in()))
+ {
+ this->per_client_.remove (entry);
+ }
+
+ }
+
return 0;
}
@@ -322,37 +322,59 @@ LiveCheck::remove_server (const char *server)
}
void
+LiveCheck::remove_per_client_entry (LiveEntry *e)
+{
+ this->per_client_.remove (e);
+}
+
+bool
+LiveCheck::add_per_client_listener (LiveListener *l,
+ ImplementationRepository::ServerObject_ptr ref)
+{
+ LiveEntry *entry = 0;
+ ACE_NEW_RETURN (entry, LiveEntry (this, 0, ref), false);
+ if (this->per_client_.push(entry) == 0)
+ {
+ entry->add_listener (l);
+ this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
+ return true;
+ }
+ return false;
+}
+
+bool
LiveCheck::add_listener (LiveListener *l)
{
LiveEntry *entry = 0;
ACE_CString key (l->server());
int result = entry_map_.find (key, entry);
-
- if (result == 0 && entry != 0)
+ if (result == -1 || entry == 0)
{
- entry->add_listener (l);
- ACE_Time_Value now (ACE_OS::time());
- ACE_Time_Value next = entry->next_check ();
+ return false;
+ }
- if (next <= now)
- {
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("immediate callback for <%C>\n"),
- // l, l->server()));
- this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
- }
- else
- {
- ACE_Time_Value delay = next - now;
- // ACE_DEBUG ((LM_DEBUG,
- // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
- // ACE_TEXT ("callback in %d ms for <%C>\n"),
- // l, delay.sec() * 1000 + delay.usec() / 1000, l->server()));
- this->reactor()->schedule_timer (this, 0, delay);
- }
+ entry->add_listener (l);
+ ACE_Time_Value now (ACE_OS::time());
+ ACE_Time_Value next = entry->next_check ();
+ if (next <= now)
+ {
+ // ACE_DEBUG ((LM_DEBUG,
+ // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
+ // ACE_TEXT ("immediate callback for <%C>\n"),
+ // l, l->server()));
+ this->reactor()->schedule_timer (this,0,ACE_Time_Value::zero);
}
+ else
+ {
+ ACE_Time_Value delay = next - now;
+ // ACE_DEBUG ((LM_DEBUG,
+ // ACE_TEXT ("(%P|%t) LiveCheck::add_listener %x, ")
+ // ACE_TEXT ("callback in %d ms for <%C>\n"),
+ // l, delay.sec() * 1000 + delay.usec() / 1000, l->server()));
+ this->reactor()->schedule_timer (this, 0, delay);
+ }
+ return true;
}
LiveStatus
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
index f914662b3e8..4778cc5451c 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
@@ -168,9 +168,15 @@ class Locator_Export LiveCheck : public ACE_Event_Handler
void add_server (const char *server,
ImplementationRepository::ServerObject_ptr ref);
+
void remove_server (const char *server);
- void add_listener (LiveListener *waiter);
+ void remove_per_client_entry (LiveEntry *entry);
+
+ bool add_listener (LiveListener *listener);
+
+ bool add_per_client_listener (LiveListener *listener,
+ ImplementationRepository::ServerObject_ptr ref);
LiveStatus is_alive (const char *server);
@@ -182,8 +188,10 @@ class Locator_Export LiveCheck : public ACE_Event_Handler
ACE_Hash<ACE_CString>,
ACE_Equal_To<ACE_CString>,
TAO_SYNCH_MUTEX> LiveEntryMap;
+ typedef ACE_Unbounded_Stack <LiveEntry *> PerClientStack;
LiveEntryMap entry_map_;
+ PerClientStack per_client_;
PortableServer::POA_var poa_;
ACE_Time_Value ping_interval_;
};
diff --git a/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h b/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h
index 187debe8335..f9ab1888e63 100644
--- a/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h
+++ b/TAO/orbsvcs/ImplRepo_Service/Locator_Repository.h
@@ -196,7 +196,6 @@ private:
virtual int persistent_remove(const ACE_CString& name, bool activator);
};
-
/**
* @class UpdateableServerInfo
*
@@ -222,10 +221,10 @@ public:
UpdateableServerInfo(const Server_Info& si);
/// destructor (updates repo if needed)
- ~UpdateableServerInfo();
+ ~UpdateableServerInfo(void);
/// explicitly update repo if needed
- void update_repo();
+ void update_repo(void);
/// const Server_Info access
const Server_Info* operator->() const;
@@ -235,13 +234,14 @@ public:
/// retrieve smart pointer to non-const Server_Info
/// and indicate repo update required
- const Server_Info_Ptr& edit();
+ const Server_Info_Ptr& edit(void);
/// force indication of update needed
- void needs_update();
+ void needs_update(void);
/// indicate it Server_Info_Ptr is null
- bool null() const;
+ bool null(void) const;
+
private:
UpdateableServerInfo(const UpdateableServerInfo& );
const UpdateableServerInfo& operator=(const UpdateableServerInfo& );