summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp')
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp334
1 files changed, 294 insertions, 40 deletions
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
index daa4efdc025..1368c1bbdac 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncAccessManager.cpp
@@ -3,84 +3,318 @@
#include "AsyncAccessManager.h"
#include "ImR_Locator_i.h"
+#include "Locator_Repository.h"
+
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-#if 0
-class AsyncAccessManager
+AsyncAccessManager::AsyncAccessManager (const Server_Info &info,
+ bool manual,
+ ImR_Locator_i &locator)
+ :info_(0),
+ manual_start_ (manual),
+ locator_(locator),
+ poa_(locator.root_poa()),
+ rh_list_(),
+ status_(AAM_INIT),
+ refcount_(1),
+ lock_()
{
- public:
- AsyncAccessManager (UpdateableServerInfo &info, ImR_Locator_i &locator);
- ~AsyncAccessManager (void);
+ ACE_DEBUG ((LM_DEBUG,"New AAM: this = %x, name = %s\n",
+ this, info.name.c_str()));
+ this->info_ = new Server_Info (info);
+}
- void add_interest (ImR_ReplyHandler *rh);
- AAM_Status status (void) const;
+AsyncAccessManager::~AsyncAccessManager (void)
+{
+ delete this->info_;
+}
- void activator_replied (void);
- void server_is_running (void);
- void ping_replied (bool is_alive);
- void
+bool
+AsyncAccessManager::has_server (const char *s)
+{
+ return ACE_OS::strcmp (this->info_->name.c_str(),s) == 0;
+}
- void add_ref (void);
- void remove_ref (void);
+void
+AsyncAccessManager::add_interest (ImR_ReplyHandler *rh)
+{
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->rh_list_.push_back (rh);
+ }
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::add_interest: ")
+ ACE_TEXT ("server = <%C>, status = %d count = %d\n"),
+ this->info_->name.c_str(), this->status_, this->rh_list_.size()));
+ }
- private:
- UpdateableServerInfo &info_;
- ImR_Locator_i &locator_;
+ if (this->status_ == AAM_SERVER_READY)
+ {
+ if (this->locator_.pinger().is_alive (this->info_->name.c_str()))
+ {
+ this->final_state();
+ return;
+ }
+ }
- ACE_Stack<ImR_ReplyHandler *> rh_list_;
+ if (this->status_ == AAM_INIT || this->status_ == AAM_SERVER_READY)
+ {
+ // This is not a leak. The listener registers with
+ // the pinger and will delete itself when done.
+ AsyncLiveListener *l = 0;
+ ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(),
+ *this,
+ this->locator_.pinger()));
+ if (!l->start())
+ {
+ if (!this->send_start_request())
+ {
+ this->final_state();
+ }
+ }
+ else
+ {
+ this->status (AAM_WAIT_FOR_PING);
+ }
+ }
+}
- AAM_Status status_;
+void
+AsyncAccessManager::final_state (void)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::final_state: ")
+ ACE_TEXT ("status = %d, pior = <%C>\n"),
+ this->status_, this->info_->partial_ior.c_str()));
+ }
- int refcount_;
- TAO_SYNCH_MUTEX lock_;
+ for (size_t i = 0; i < this->rh_list_.size(); i++)
+ {
+ ImR_ReplyHandler *rh = this->rh_list_[i];
+ if (rh != 0)
+ {
+ if (this->status_ == AAM_SERVER_READY)
+ {
+ rh->send_ior (this->info_->partial_ior.c_str());
+ }
+ else
+ {
+ rh->send_exception ();
+ }
+ }
+ }
+ this->rh_list_.clear ();
+ if (this->info_->activation_mode == ImplementationRepository::PER_CLIENT ||
+ this->status_ != AAM_SERVER_READY)
+ {
+ this->locator_.remove_aam (this);
+ }
+}
-};
-#endif
+AAM_Status
+AsyncAccessManager::status (void) const
+{
+ return this->status_;
+}
+
+void
+AsyncAccessManager::status (AAM_Status s)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->status_ = s;
+}
+
+void
+AsyncAccessManager::activator_replied (bool success)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::activator_replied: ")
+ ACE_TEXT ("success = %d, status = %d\n"),
+ success, this->status_));
+ }
+ if (success)
+ {
+ this->status (AAM_WAIT_FOR_RUNNING);
+ }
+ else
+ {
+ this->status (AAM_NO_ACTIVATOR);
+ this->final_state ();
+ }
+}
+
+void
+AsyncAccessManager::server_is_running (const char *partial_ior)
+{
+ if (this->locator_.debug() > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncAccessManager::server_is_running: ")
+ ACE_TEXT ("name = %C, status = %d\n"),
+ this->info_->name.c_str(), this->status_));
+ }
+ this->status (AAM_WAIT_FOR_ALIVE);
+ this->info_->partial_ior = partial_ior;
+
+ if (this->locator_.pinger().is_alive (this->info_->name.c_str()))
+ {
+ this->status (AAM_SERVER_READY);
+ this->final_state ();
+ }
+
+ // This is not a leak. The listener registers with
+ // the pinger and will delete itself when done.
+ AsyncLiveListener *l = 0;
+ ACE_NEW (l, AsyncLiveListener (this->info_->name.c_str(),
+ *this,
+ this->locator_.pinger()));
+ if (!l->start())
+ {
+ this->status (AAM_SERVER_DEAD);
+ this->final_state ();
+ }
+}
+
+void
+AsyncAccessManager::ping_replied (bool is_alive)
+{
+ if (is_alive)
+ {
+ this->status (AAM_SERVER_READY);
+ }
+ else if (this->status_ == AAM_WAIT_FOR_PING)
+ {
+ if (this->send_start_request ())
+ {
+ return;
+ }
+ }
+ else
+ {
+ this->status (AAM_SERVER_DEAD);
+ }
+ this->final_state();
+}
+
+bool
+AsyncAccessManager::send_start_request (void)
+{
+ if (this->info_->activation_mode == ImplementationRepository::MANUAL &&
+ !this->manual_start_)
+ {
+ this->status (AAM_NOT_MANUAL);
+ return false;
+ }
+
+ Activator_Info_Ptr ainfo =
+ this->locator_.get_activator (this->info_->activator);
+
+ if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ()))
+ {
+ this->status (AAM_NO_ACTIVATOR);
+ return false;
+ }
+
+ PortableServer::ServantBase_var callback = new ActivatorReceiver (this,
+ this->poa_.in());
+ PortableServer::ObjectId_var oid = this->poa_->activate_object (callback.in());
+ CORBA::Object_var obj = this->poa_->id_to_reference (oid.in());
+ ImplementationRepository::AMI_ActivatorHandler_var cb =
+ ImplementationRepository::AMI_ActivatorHandler::_narrow (obj.in());
+
+ ainfo->activator->sendc_start_server (cb.in(),
+ this->info_->name.c_str (),
+ this->info_->cmdline.c_str (),
+ this->info_->dir.c_str (),
+ this->info_->env_vars);
+ this->status (AAM_ACTIVATION_SENT);
+ return true;
+}
+
+void
+AsyncAccessManager::add_ref (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ ++this->refcount_;
+}
+
+void
+AsyncAccessManager::remove_ref (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ if (--this->refcount_ == 0)
+ {
+ delete this;
+ }
+}
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-ImR_Loc_ReplyHandler::ImR_Loc_ReplyHandler (AMH_ImplementationRepository::LocatorResponseHandler_ptr rh)
- :rh_ (AMH_ImplementationRepository::LocatorResponseHandler::_duplicate(rh))
+ActivatorReceiver::ActivatorReceiver (AsyncAccessManager *aam,
+ PortableServer::POA_ptr poa)
+ :aam_ (aam),
+ poa_ (PortableServer::POA::_duplicate (poa))
{
+ this->aam_->add_ref ();
}
-ImR_Loc_ReplyHandler::~ImR_Loc_ReplyHandler (void)
+
+ActivatorReceiver::~ActivatorReceiver (void)
{
+ this->aam_->remove_ref ();
}
void
-ImR_Loc_ReplyHandler::send_ior (const char *)
+ActivatorReceiver::start_server (void)
{
- rh_->activate_server (); // void return
- delete this;
+ this->aam_->activator_replied (true);
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+void
+ActivatorReceiver::start_server_excep (Messaging::ExceptionHolder *)
+{
+ this->aam_->activator_replied (false);
+ PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this);
+ poa_->deactivate_object (oid.in());
+}
+
+void
+ActivatorReceiver::shutdown (void)
+{
+ // no-op, just satisfy virtual function
}
void
-ImR_Loc_ReplyHandler::send_exception (void)
+ActivatorReceiver::shutdown_excep (Messaging::ExceptionHolder * )
{
- CORBA::TRANSIENT ex (CORBA::SystemException::_tao_minor_code
- ( TAO_IMPLREPO_MINOR_CODE, 0),
- CORBA::COMPLETED_NO);
- TAO_AMH_DSI_Exception_Holder h(&ex);
- resp_->invoke_excep(&h);
- delete this;
+ // no-op, just satisfy virtual function
}
+
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
-AsyncLiveListener::AsyncLiveListener (AsyncAccessManager &aam, LiveCheck &pinger)
- :aam_ (aam),
+AsyncLiveListener::AsyncLiveListener (const char *server,
+ AsyncAccessManager &aam,
+ LiveCheck &pinger)
+ :LiveListener (server),
+ aam_ (aam),
pinger_ (pinger),
- status_ (LS_UNKNOWN),
+ status_ (LS_UNKNOWN)
{
this->aam_.add_ref ();
- this->pinger_.add_listener (this);
}
AsyncLiveListener::~AsyncLiveListener (void)
@@ -88,14 +322,34 @@ AsyncLiveListener::~AsyncLiveListener (void)
this->aam_.remove_ref ();
}
+bool
+AsyncLiveListener::start (void)
+{
+ bool rtn = this->pinger_.add_listener (this);
+ if (!rtn)
+ delete this;
+ return rtn;
+}
+
void
AsyncLiveListener::status_changed (LiveStatus status, bool may_retry)
{
this->status_ = status;
if (status == LS_TRANSIENT && may_retry)
- this->pinger_.add_listener (this);
+ {
+ if (!this->pinger_.add_listener (this))
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "AsyncLiveListener::status_changed, deleting(1)\n"));
+ this->aam_.ping_replied (false);
+ delete this;
+ }
+ }
else
{
+ ACE_DEBUG ((LM_DEBUG,
+ "AsyncLiveListener::status_changed, status = %d, deleting(2)\n", status));
this->aam_.ping_replied (status != LS_DEAD);
+ delete this;
}
}