diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2010-06-17 19:25:02 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2010-06-17 19:25:02 +0000 |
commit | dcc39d49e76fd0a022f73599cfca4e3765135ee4 (patch) | |
tree | 8bfbdca3c9ca2a29c28d081f33b8470deba46c16 | |
parent | f2597ba51a90dbea94b70f0a4418f2915ae3a2a3 (diff) | |
download | ATCD-dcc39d49e76fd0a022f73599cfca4e3765135ee4.tar.gz |
Thu Jun 17 19:22:51 UTC 2010 William R. Otte <wotte@dre.vanderbilt.edu>
* DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h:
Layout change.
* DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h:
* DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp:
Bugfixes in the callback code to accomodate non-orb managed threadpools.
* DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h:
* DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp:
Changed how internal Activator is configured.
* DAnCE/LocalityManager/Scheduler/Deployment_Events.h:
* DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp:
* DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h:
* DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp:
* DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp:
Linking fixes and minor changes in exception handling.
* DAnCE/NodeApplication/NodeApplication.mpc:
* DAnCE/NodeApplication/NodeApplication_Impl.h:
* DAnCE/NodeApplication/NodeApplication_Impl.cpp:
Integration of Deployment Scheduler to NodeApplication.
14 files changed, 465 insertions, 193 deletions
diff --git a/CIAO/ChangeLog b/CIAO/ChangeLog index 4f8eb1e1da6..b8bbbf17586 100644 --- a/CIAO/ChangeLog +++ b/CIAO/ChangeLog @@ -1,3 +1,33 @@ +Thu Jun 17 19:22:51 UTC 2010 William R. Otte <wotte@dre.vanderbilt.edu> + + * DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h: + + Layout change. + + * DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h: + * DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp: + + Bugfixes in the callback code to accomodate non-orb managed threadpools. + + * DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h: + * DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp: + + Changed how internal Activator is configured. + + * DAnCE/LocalityManager/Scheduler/Deployment_Events.h: + * DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp: + * DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h: + * DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp: + * DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp: + + Linking fixes and minor changes in exception handling. + + * DAnCE/NodeApplication/NodeApplication.mpc: + * DAnCE/NodeApplication/NodeApplication_Impl.h: + * DAnCE/NodeApplication/NodeApplication_Impl.cpp: + + Integration of Deployment Scheduler to NodeApplication. + Thu Jun 17 19:18:13 UTC 2010 William R. Otte <wotte@dre.vanderbilt.edu> * DAnCE/MPC/config/dance_nodeapplication.mpb: diff --git a/CIAO/DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h b/CIAO/DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h index 9e00683c334..74128bec132 100644 --- a/CIAO/DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h +++ b/CIAO/DAnCE/LocalityManager/Daemon/Locality_Manager_Impl.h @@ -90,7 +90,7 @@ namespace DAnCE }; typedef std::map <std::string, - Handler> HANDLER_TABLE; + Handler> HANDLER_TABLE; HANDLER_TABLE instance_handlers_; diff --git a/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp b/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp index 5dd463fe5df..50707f0a3c6 100644 --- a/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp +++ b/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.cpp @@ -46,18 +46,24 @@ namespace DAnCE Server_Info *info = 0; - for (SERVER_INFOS::iterator i (this->server_infos_.begin ()); - !i.done (); ++i) - { - DANCE_DEBUG (9, (LM_TRACE, DLINFO - ACE_TEXT ("DAnCE_LocalityActivator_i::locality_manager_callback - ") - ACE_TEXT ("Comparing %C with %C\n"), - (*i)->uuid_.c_str (), server_UUID)); - if ((*i)->uuid_ == server_UUID) - { - info = (*i).get (); - } - } + { + ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, + guard, + this->container_mutex_, + CORBA::NO_RESOURCES ()); + for (SERVER_INFOS::iterator i (this->server_infos_.begin ()); + !i.done (); ++i) + { + DANCE_DEBUG (9, (LM_TRACE, DLINFO + ACE_TEXT ("DAnCE_LocalityActivator_i::locality_manager_callback - ") + ACE_TEXT ("Comparing %C with %C\n"), + (*i)->uuid_.c_str (), server_UUID)); + if ((*i)->uuid_ == server_UUID) + { + info = (*i).get (); + } + } + } if (!info) { @@ -72,7 +78,8 @@ namespace DAnCE { DANCE_ERROR (1, (LM_ERROR, DLINFO ACE_TEXT ("DAnCE_LocalityActivator_i::locality_manager_callback - ") - ACE_TEXT ("Received callback from LocalityManager %C, which has already been configured.\n"), + ACE_TEXT ("Received callback from LocalityManager %C, ") + ACE_TEXT ("which has already been configured.\n"), server_UUID)); throw ::CORBA::BAD_INV_ORDER (); } @@ -81,7 +88,8 @@ namespace DAnCE { DANCE_ERROR (1, (LM_ERROR, DLINFO ACE_TEXT ("DAnCE_LocalityActivator_i::locality_manager_callback - ") - ACE_TEXT ("Received callback from LocalityManager %C, which has already called back.\n"), + ACE_TEXT ("Received callback from LocalityManager %C, ") + ACE_TEXT ("which has already called back.\n"), server_UUID)); throw ::CORBA::BAD_INV_ORDER (); } @@ -116,14 +124,20 @@ namespace DAnCE { Server_Info *info = 0; - for (SERVER_INFOS::ITERATOR j (this->server_infos_); - !j.done (); ++j) - { - if ((*j)->uuid_ == server_UUID) - { - info = (*j).get (); - } - } + { + ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, + guard, + this->container_mutex_, + CORBA::NO_RESOURCES ()); + for (SERVER_INFOS::ITERATOR j (this->server_infos_); + !j.done (); ++j) + { + if ((*j)->uuid_ == server_UUID) + { + info = (*j).get (); + } + } + } if (!info) { @@ -153,8 +167,13 @@ namespace DAnCE server_UUID)); throw ::CORBA::BAD_INV_ORDER (); } - + + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, + guard, + info->mutex_, + CORBA::NO_RESOURCES ()); info->activated_ = true; + info->condition_.signal (); } catch (...) { @@ -187,7 +206,13 @@ namespace DAnCE ACE_TEXT ("LocalityManager arguments: %C\n"), cmd_options.c_str ())); - server_infos_.insert_tail (server); + { + ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, + guard, + this->container_mutex_, + CORBA::NO_RESOURCES ()); + server_infos_.insert_tail (server); + } DANCE_DEBUG (9, (LM_TRACE, DLINFO ACE_TEXT ("DAnCE_LocalityActivator_i::create_locality_manager - ") @@ -411,8 +436,25 @@ namespace DAnCE // for thread-pool concurrency model. while (true) { - this->orb_->perform_work (timeout); + if (!si.activated_) + { + ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, + guard, + this->mutex_, + CORBA::NO_RESOURCES ()); + // The next guy to acquire the mutex may have already + // been activated by the previous leader's perform_work, + // so let's check to make sure that only non-activated + // folks are hanging on perform_work. + if (!si.activated_) + this->orb_->perform_work (timeout); + } + if (si.activated_) + { + break; + } + if (timeout == ACE_Time_Value::zero) { DANCE_ERROR (1, (LM_ERROR, DLINFO @@ -422,17 +464,12 @@ namespace DAnCE throw ::Deployment::StartError ("locality_manager", "Timed out waiting for LocalityManager"); } - - if (si.activated_) - { - break; - } } } void DAnCE_LocalityActivator_i:: - multi_threaded_wait_for_callback (const Server_Info &si, + multi_threaded_wait_for_callback (Server_Info &si, ACE_Time_Value &timeout) { DANCE_TRACE ("DAnCE_LocalityActivator_i::multi_threaded_wait_for_callback"); @@ -440,11 +477,11 @@ namespace DAnCE // Wait for a conditional variable ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, guard, - this->mutex_, + si.mutex_, CORBA::NO_RESOURCES ()); while (! si.activated_ ) - if (this->condition_.wait (&timeout) == -1) + if (si.condition_.wait (/*&timeout*/) == -1) { DANCE_ERROR (1, (LM_ERROR, DLINFO ACE_TEXT ("DAnCE_LocalityActivator_i::multi_threaded_wait_for_callback - ") @@ -462,14 +499,20 @@ namespace DAnCE Server_Info *info = 0; - for (SERVER_INFOS::ITERATOR i (this->server_infos_); - !i.done (); ++i) - { - if ((*i)->ref_->_is_equivalent (server)) - { - info = (*i).get (); - } - } + { + ACE_GUARD_THROW_EX ( TAO_SYNCH_MUTEX, + guard, + this->container_mutex_, + CORBA::NO_RESOURCES ()); + for (SERVER_INFOS::ITERATOR i (this->server_infos_); + !i.done (); ++i) + { + if ((*i)->ref_->_is_equivalent (server)) + { + info = (*i).get (); + } + } + } if (!info) { diff --git a/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h b/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h index 82414023da4..dca4e27ef26 100644 --- a/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h +++ b/CIAO/DAnCE/LocalityManager/Handler/LocalityActivator_Impl.h @@ -112,7 +112,7 @@ namespace DAnCE /// multiple threaded. Internally it waits on a conditional variable /// that could be modified by the callback servant which runs in /// another thread - void multi_threaded_wait_for_callback (const Server_Info &si, + void multi_threaded_wait_for_callback (Server_Info &si, ACE_Time_Value &timeout); void create_properties (const Server_Info &info, @@ -124,7 +124,10 @@ namespace DAnCE : cmap_ (new DAnCE::Utility::PROPERTY_MAP (cmap_size_hint)), ref_ (DAnCE::LocalityManager::_nil ()), pid_ (ACE_INVALID_PID), - activated_ (false) {} + activated_ (false), + mutex_ (), + condition_ (mutex_) + {} typedef ACE_Refcounted_Auto_Ptr <DAnCE::Utility::PROPERTY_MAP, ACE_Null_Mutex> PROPERTY_MAP_PTR; @@ -134,6 +137,8 @@ namespace DAnCE DAnCE::LocalityManager_var ref_; pid_t pid_; bool activated_; + TAO_SYNCH_MUTEX mutex_; + ACE_Condition<TAO_SYNCH_MUTEX> condition_; }; typedef ACE_Refcounted_Auto_Ptr<Server_Info, ACE_Null_Mutex> Safe_Server_Info; @@ -152,6 +157,8 @@ namespace DAnCE /// Default args to pass to all componentservers. ACE_CString default_args_; + TAO_SYNCH_MUTEX container_mutex_; + SERVER_INFOS server_infos_; ACE_Process_Manager process_manager_; diff --git a/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp b/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp index b571861e627..e7f21b893c4 100644 --- a/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp +++ b/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.cpp @@ -4,11 +4,17 @@ // TAO_IDL - Generated from // be/be_codegen.cpp:1560 +#include "tao/ORB_Core.h" #include "Locality_Manager_Handler_Impl.h" #include "LocalityActivator_Impl.h" #include "DAnCE/DAnCE_PropertiesC.h" +#ifdef GEN_OSTREAM_OPS +#include <iostream> +#include <sstream> +#endif /* GEN_OSTREAM_OPS */ + namespace DAnCE { const char * @@ -17,50 +23,8 @@ namespace DAnCE // Implementation skeleton constructor Locality_Handler_i:: - Locality_Handler_i (const Utility::PROPERTY_MAP &prop, - CORBA::ORB_ptr orb, - PortableServer::POA_ptr poa) - : activator_ (0), - properties_ (prop) + Locality_Handler_i (void) { - CORBA::ULong spawn = 0; - const char *cs_path = 0; - const char *cs_args = 0; - CORBA::Boolean multithread = false; - - Utility::get_property_value (DAnCE::LOCALITY_EXECUTABLE, - this->properties_, cs_path); - DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") - ACE_TEXT("Component server path: %C\n"), cs_path)); - Utility::get_property_value (DAnCE::LOCALITY_ARGUMENTS, - this->properties_, cs_args); - DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") - ACE_TEXT("Component server arguments: %C\n"), cs_args)); - Utility::get_property_value (DAnCE::LOCALITY_TIMEOUT, - this->properties_, spawn); - DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") - ACE_TEXT("Spawn delay: %u\n"), spawn)); - Utility::get_property_value (DAnCE::LOCALITY_MULTITHREAD, - this->properties_, multithread); - DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") - ACE_TEXT("Threading: %C\n"), - multithread ? "Multi" : "Single")); - - DANCE_DEBUG (9, (LM_TRACE, DLINFO ACE_TEXT("Locality_Handler_i - ") - ACE_TEXT("Spawning Locality handler\n"))); - - ACE_NEW_THROW_EX (this->activator_, - DAnCE_LocalityActivator_i (spawn, - cs_path, - cs_args, - multithread, - orb, - poa), - CORBA::NO_MEMORY ()); - - PortableServer::ServantBase_var safe_servant (this->activator_); - - poa->activate_object (this->activator_); } // Implementation skeleton destructor @@ -79,6 +43,19 @@ namespace DAnCE ::CORBA::ULong instanceRef, ::CORBA::Any_out instance_reference) { +#ifdef GEN_OSTREAM_OPS + { + std::ostringstream plan_stream; + plan_stream << plan << std::endl; + + DANCE_DEBUG (10, (LM_TRACE, DLINFO + ACE_TEXT ("Locality_Handler_i::install_instance - ") + ACE_TEXT ("Deploying instance %u of plan %C\n"), + instanceRef, + plan_stream.str ().c_str ())); + } +#endif /* GEN_OSTREAM_OPS */ + if (plan.instance.length () <= instanceRef) { DANCE_ERROR (1, (LM_ERROR, DLINFO @@ -89,12 +66,29 @@ namespace DAnCE throw ::Deployment::PlanError (plan.UUID.in (), "Invalid instance reference"); } - + const ::Deployment::InstanceDeploymentDescription &idd = plan.instance[instanceRef]; + + if (plan.implementation.length () <= idd.implementationRef) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("Locality_Handler_i::install_instance - ") + ACE_TEXT ("Invalid implementation reference %u provided ") + ACE_TEXT ("to install_instance\n"), + idd.implementationRef)); + throw ::Deployment::PlanError (plan.UUID.in (), + "Invalid Implementation reference"); + } + const ::Deployment::MonolithicDeploymentDescription &mdd = plan.implementation[idd.implementationRef]; + DANCE_DEBUG (10, (LM_TRACE, DLINFO + ACE_TEXT ("Locality_Handler_i::install_instance - ") + ACE_TEXT ("Starting installation of instance <%C>\n"), + idd.name.in ())); + CORBA::ULong allprops_len = idd.configProperty.length () + mdd.execParameter.length () + 1; ::Deployment::Properties allprops (allprops_len); @@ -167,5 +161,66 @@ namespace DAnCE void Locality_Handler_i::configure (const ::Deployment::Properties &prop ) { + ::DAnCE::Utility::PROPERTY_MAP pmap (prop.length ()); + + ::DAnCE::Utility::build_property_map (pmap, + prop); + + CORBA::ULong spawn = 0; + const char *cs_path = 0; + const char *cs_args = 0; + CORBA::Boolean multithread = false; + PortableServer::POA_var poa; + + Utility::get_property_value (DAnCE::LOCALITY_EXECUTABLE, + pmap, cs_path); + DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Component server path: %C\n"), cs_path)); + Utility::get_property_value (DAnCE::LOCALITY_ARGUMENTS, + pmap, cs_args); + DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Component server arguments: %C\n"), cs_args)); + Utility::get_property_value (DAnCE::LOCALITY_TIMEOUT, + pmap, spawn); + DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Spawn delay: %u\n"), spawn)); + Utility::get_property_value (DAnCE::LOCALITY_MULTITHREAD, + pmap, multithread); + DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Threading: %C\n"), + multithread ? "Multi" : "Single")); + + Utility::get_property_value (DAnCE::ENTITY_POA, + pmap, poa); + + DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Threading: %C\n"), + multithread ? "Multi" : "Single")); + + CORBA::ORB_var orb = TAO_ORB_Core_instance ()->orb (); + + DANCE_DEBUG (9, (LM_TRACE, DLINFO ACE_TEXT("Locality_Handler_i - ") + ACE_TEXT("Spawning Locality handler\n"))); + + ACE_NEW_THROW_EX (this->activator_, + DAnCE_LocalityActivator_i (spawn, + cs_path, + cs_args, + false, + orb, + poa), + CORBA::NO_MEMORY ()); + + PortableServer::ServantBase_var safe_servant (this->activator_); + + poa->activate_object (this->activator_); + } +} + +extern "C" +{ + ::DAnCE::InstanceDeploymentHandler_ptr create_Locality_Handler (void) + { + return new DAnCE::Locality_Handler_i (); } } diff --git a/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h b/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h index 1eef296073d..7c0e2cf7bc9 100644 --- a/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h +++ b/CIAO/DAnCE/LocalityManager/Handler/Locality_Manager_Handler_Impl.h @@ -13,8 +13,6 @@ #include "DAnCE/DAnCE_LocalityManagerS.h" #include "DAnCE/DAnCE_Utility.h" -#include "Split_Plan/Locality_Splitter.h" -#include "Split_Plan/Split_Plan.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) #pragma once @@ -33,9 +31,7 @@ namespace DAnCE { public: // Constructor - Locality_Handler_i (const Utility::PROPERTY_MAP &props, - CORBA::ORB_ptr orb, - PortableServer::POA_ptr poa); + Locality_Handler_i (void); // Destructor virtual ~Locality_Handler_i (void); @@ -82,11 +78,17 @@ namespace DAnCE virtual void configure(const Deployment::Properties&); + private: static const char *instance_type_; DAnCE_LocalityActivator_i *activator_; - const Utility::PROPERTY_MAP &properties_; - }; } + +extern "C" +{ + ::DAnCE::InstanceDeploymentHandler_ptr + DAnCE_Locality_Handler_Export create_Locality_Handler (void); +} + #endif diff --git a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp index 61d0aff92c8..31c1ae7ab14 100644 --- a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp +++ b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.cpp @@ -75,7 +75,7 @@ namespace DAnCE if (CORBA::is_nil (handler)) { throw ::Deployment::StartError (name, - "Unable to load appropriate instance handler"); + "Unable to load appropriate instance handler"); } ::CORBA::Any_var instance_ref; @@ -234,7 +234,7 @@ namespace DAnCE if (CORBA::is_nil (handler)) { throw ::Deployment::StartError (name, - "Unable to load appropriate instance handler"); + "Unable to load appropriate instance handler"); } DANCE_DEBUG (10, (LM_TRACE, DLINFO @@ -345,8 +345,8 @@ namespace DAnCE Remove_Instance::call (void) { DANCE_DEBUG (10, (LM_TRACE, DLINFO - ACE_TEXT ("Install_Instance::call - ") - ACE_TEXT ("Entering Install_Instance\n"))); + ACE_TEXT ("Remove_Instance::call - ") + ACE_TEXT ("Entering Remove_Instance\n"))); const char *name = this->plan_.instance[this->instanceRef_].name.in (); @@ -369,7 +369,7 @@ namespace DAnCE if (CORBA::is_nil (handler)) { - throw ::Deployment::StartError (name, + throw ::Deployment::StopError (name, "Unable to load appropriate instance handler"); } @@ -389,7 +389,7 @@ namespace DAnCE { DANCE_ERROR (3, (LM_ERROR, DLINFO ACE_TEXT ("Remove_Instance::call - ") - ACE_TEXT ("Caught C++ exception while removeing instance ") + ACE_TEXT ("Caught CORBA exception while removeing instance ") ACE_TEXT ("%u:<%C>\n"), this->instanceRef_, name)); diff --git a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.h b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.h index 327cdd4f6ba..14dba5b2028 100644 --- a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.h +++ b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Events.h @@ -15,10 +15,11 @@ #include "ace/Method_Request.h" #include "ace/Future.h" #include "Deployment/Deployment_DeploymentPlanC.h" +#include "LocalityManager/Scheduler/Deployment_Scheduler_export.h" namespace DAnCE { - struct Event_Result + struct Deployment_Scheduler_Export Event_Result { std::string id_; bool exception_; @@ -32,7 +33,7 @@ namespace DAnCE * @brief Future observer that invokes a parameterized functor on the future */ template <typename Functor> - class Event_Handler + class Deployment_Scheduler_Export Event_Handler : ACE_Future_Observer< Event_Result > { public: @@ -54,7 +55,7 @@ namespace DAnCE Functor &specific_handler_; }; - class Deployment_Event : + class Deployment_Scheduler_Export Deployment_Event : public virtual ACE_Method_Request { public: @@ -68,7 +69,7 @@ namespace DAnCE std::string instance_type_; }; - class Install_Instance : + class Deployment_Scheduler_Export Install_Instance : public virtual Deployment_Event { public: @@ -86,7 +87,7 @@ namespace DAnCE ::CORBA::ULong instanceRef_; }; - class Connect_Instance : + class Deployment_Scheduler_Export Connect_Instance : public virtual Deployment_Event { public: @@ -106,7 +107,7 @@ namespace DAnCE ::CORBA::Any provided_ref_; }; - class Remove_Instance : + class Deployment_Scheduler_Export Remove_Instance : public virtual Deployment_Event { public: diff --git a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp index 105f0b39be4..3c35e71bd9c 100644 --- a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp +++ b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.cpp @@ -20,6 +20,12 @@ namespace DAnCE return retval; } + void + Deployment_Scheduler::terminate_scheduler (void) + { + this->event_queue_.queue ()->close (); + } + int Deployment_Scheduler::svc (void) { @@ -36,10 +42,10 @@ namespace DAnCE } else { - DANCE_ERROR (1, (LM_ERROR, DLINFO - ACE_TEXT ("Deployment_Scheduler::svc - ") - ACE_TEXT ("Failed to retrieve deployment event from queue"))); + break; } } + + return 0; } } diff --git a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h index 09d944a2bea..ed76600c310 100644 --- a/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h +++ b/CIAO/DAnCE/LocalityManager/Scheduler/Deployment_Scheduler.h @@ -14,11 +14,14 @@ namespace DAnCE { - class Deployment_Scheduler : public ACE_Task_Base + class Deployment_Scheduler_Export Deployment_Scheduler : + public ACE_Task_Base { public: /// Schedule an event for execution int schedule_event (Deployment_Event *event); + + void terminate_scheduler (void); protected: /// Scheduler event loop diff --git a/CIAO/DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp b/CIAO/DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp index 587704ecc4d..dcddf2ee7d4 100644 --- a/CIAO/DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp +++ b/CIAO/DAnCE/LocalityManager/Scheduler/Plugin_Manager.cpp @@ -124,8 +124,8 @@ namespace DAnCE artifact, entrypoint, ex._info ().c_str ())); - throw ::Deployment::StartError (ACE_TEXT_ALWAYS_CHAR (artifact), - ex._info ().c_str ()); + throw ::Deployment::PlanError (ACE_TEXT_ALWAYS_CHAR (artifact), + ex._info ().c_str ()); } catch (...) { @@ -134,8 +134,8 @@ namespace DAnCE ACE_TEXT ("Unknown C++ exception while configuring plugin from <%s>:<%s>\n"), artifact, entrypoint)); - throw ::Deployment::StartError (ACE_TEXT_ALWAYS_CHAR (artifact), - "Unknown C++ exception during handler configuration\n"); + throw ::Deployment::PlanError (ACE_TEXT_ALWAYS_CHAR (artifact), + "Unknown C++ exception during handler configuration\n"); } } @@ -263,7 +263,7 @@ namespace DAnCE artifact, entrypoint, ex._info ().c_str ())); - throw ::Deployment::StartError (ACE_TEXT_ALWAYS_CHAR (artifact), + throw ::Deployment::PlanError (ACE_TEXT_ALWAYS_CHAR (artifact), ex._info ().c_str ()); } catch (...) @@ -273,7 +273,7 @@ namespace DAnCE ACE_TEXT ("Unknown C++ exception while configuring plugin from <%s>:<%s>\n"), artifact, entrypoint)); - throw ::Deployment::StartError (ACE_TEXT_ALWAYS_CHAR (artifact), + throw ::Deployment::PlanError (ACE_TEXT_ALWAYS_CHAR (artifact), "Unknown C++ exception during handler configuration\n"); } } diff --git a/CIAO/DAnCE/NodeApplication/NodeApplication.mpc b/CIAO/DAnCE/NodeApplication/NodeApplication.mpc index 5fd58d77f23..37f4a0bcaa3 100644 --- a/CIAO/DAnCE/NodeApplication/NodeApplication.mpc +++ b/CIAO/DAnCE/NodeApplication/NodeApplication.mpc @@ -2,7 +2,7 @@ // $Id$ project(DAnCE_NodeApplication): install, dance_lib, \ - dance_nodeapplication_skel, \ + dance_nodeapplication_skel, dance_deployment_scheduler, \ dance_logger, iortable, naming, dance_locality_handler, \ gen_ostream, avoids_ace_for_tao, dance_applicationmanager_stub { sharedname = DAnCE_NodeApplication diff --git a/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.cpp b/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.cpp index 7222c85673b..cc591f6bf3f 100644 --- a/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.cpp +++ b/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.cpp @@ -16,6 +16,7 @@ #include "DAnCE/DAnCE_Utility.h" #include "DAnCE/DAnCE_PropertiesC.h" #include "DAnCE/DAnCE_LocalityManagerC.h" +#include "DAnCE/LocalityManager/Scheduler/Plugin_Manager.h" #include "Split_Plan/Locality_Splitter.h" #include "Split_Plan/Split_Plan.h" @@ -33,30 +34,36 @@ NodeApplication_Impl::NodeApplication_Impl (CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, DAnCE::ArtifactInstallation_ptr installer, const ACE_CString& node_name, - const PROPERTY_MAP &properties) + const DAnCE::Utility::PROPERTY_MAP &properties) : orb_ (CORBA::ORB::_duplicate (orb)), poa_ (PortableServer::POA::_duplicate (poa)), installer_ (DAnCE::ArtifactInstallation::_duplicate (installer)), node_name_ (node_name), - properties_ (), - handler_ (properties, - orb, - poa) + scheduler_ () { DANCE_TRACE ("NodeApplication_Impl::NodeApplication_Impl"); - PROPERTY_MAP::const_iterator i = properties.begin (); - while (!i.done ()) - { - DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT("NodeApplication_Impl::NodeApplication_Impl - ") - ACE_TEXT("Binding value for property '%C'\n"), i->key ().c_str ())); - this->properties_.bind (i->key (), i->item ()); - i.advance (); - } + + ::Deployment::Properties prop; + ::DAnCE::Utility::build_property_sequence (prop, properties); + PLUGIN_MANAGER::instance ()->set_configuration (prop); + PLUGIN_MANAGER::instance ()->register_installation_handler ("DAnCE_Locality_Handler", + "create_Locality_Handler"); + + DANCE_DEBUG (8, (LM_INFO, DLINFO + ACE_TEXT("NodeApplication_Impl::NodeApplication_Impl - ") + ACE_TEXT("Plugin loaded\n"))); + + // Spawn thread pool + // @Todo: We can probably move this up into the NodeManager and + // share the thread pool among several node applications. + this->scheduler_.activate (THR_DETACHED, + 10); } NodeApplication_Impl::~NodeApplication_Impl() { DANCE_TRACE( "NodeApplication_Impl::~NodeApplication_Impl()"); + this->scheduler_.terminate_scheduler (); } void @@ -65,6 +72,8 @@ NodeApplication_Impl::prepare_instances (const LocalitySplitter::TSubPlans& plan DANCE_TRACE ("NodeApplication_Impl::prepare_instances"); CORBA::ULong plan (0); + std::list < Event_Future > prepared_instances; + // for each sub plan LocalitySplitter::TSubPlanConstIterator plans_end (plans, 1); for (LocalitySplitter::TSubPlanConstIterator i (plans); @@ -80,7 +89,7 @@ NodeApplication_Impl::prepare_instances (const LocalitySplitter::TSubPlans& plan DANCE_DEBUG (9, (LM_TRACE, DLINFO ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") ACE_TEXT ("Considering sub-plan %u:%C with %u instances\n"), - plan++, + plan, sub_plan.UUID.in (), sub_plan.instance.length () )); @@ -89,50 +98,86 @@ NodeApplication_Impl::prepare_instances (const LocalitySplitter::TSubPlans& plan // instance (creating default if necessary) and identifies it in the key CORBA::ULong loc_manager_instance = sub_plan_key.locality_manager_instance (); - const ::Deployment::InstanceDeploymentDescription &lm_idd = sub_plan.instance[loc_manager_instance]; - + const ::Deployment::InstanceDeploymentDescription &lm_idd + = sub_plan.instance[loc_manager_instance]; + + DANCE_DEBUG (4, (LM_DEBUG, DLINFO - ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") - ACE_TEXT ("Found Locality Manager instance %u:%C, deploying\n"), - loc_manager_instance, - lm_idd.name.in () - )); + ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT ("Found Locality Manager instance %u:%C, deploying\n"), + loc_manager_instance, + lm_idd.name.in () + )); + + this->sub_plans_ [ lm_idd.name.in () ] = SUB_PLAN (loc_manager_instance, + sub_plan); CORBA::Any_var reference; - this->handler_.install_instance (sub_plan, - loc_manager_instance, - reference.out ()); + Install_Instance *event (0); + Event_Future result; + + ACE_NEW_THROW_EX (event, + Install_Instance (sub_plan, + loc_manager_instance, + DAnCE::DANCE_LOCALITYMANAGER, + result + ), + CORBA::NO_MEMORY ()); + + prepared_instances.push_back (result); + this->scheduler_.schedule_event (event); + ++plan; + } + + plan = 0; + for (std::list < Event_Future >::iterator i = prepared_instances.begin (); + i != prepared_instances.end (); + ++i) + { + Event_Result event; + if (i->get (event, + 0 /*need to wait based on spawn delay*/) != 0) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT("Failed to get future value for current instance\n"))); + continue; + } + ::DAnCE::LocalityManager_var lm_ref; - if (reference.in () >>= lm_ref) + if (event.contents_.in ().impl () && + (event.contents_.in () >>= lm_ref) && + !CORBA::is_nil (lm_ref.in ())) { - this->localities_[lm_idd.name.in ()] = lm_ref._retn (); + this->localities_[event.id_] = ::DAnCE::LocalityManager::_duplicate (lm_ref); DANCE_DEBUG (4, (LM_INFO, DLINFO - ACE_TEXT("NodeApplication_Impl::prepare_instances - ") - ACE_TEXT("Successfully started Locality %C\n"), - lm_idd.name.in ())); + ACE_TEXT("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT("Successfully started Locality %C\n"), + event.id_.c_str ())); } else { DANCE_ERROR (1, (LM_ERROR, DLINFO - ACE_TEXT("NodeApplication_Impl::prepare_instances - ") - ACE_TEXT("Unable to resolve LocalityManager object reference\n"))); - throw ::Deployment::StartError (lm_idd.name.in (), + ACE_TEXT("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT("Unable to resolve LocalityManager object reference\n"))); + throw ::Deployment::StartError (event.id_.c_str (), "Unable to resolve LocalityManager object ref\n"); } DANCE_DEBUG (4, (LM_DEBUG, DLINFO - ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") - ACE_TEXT ("Invoking preparePlan on locality %C\n"), - lm_idd.name.in ())); + ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT ("Invoking preparePlan on locality %C\n"), + event.id_.c_str ())); + - this->prepare_instance (lm_idd.name.in (), - sub_plan); + this->prepare_instance (event.id_.c_str (), + (this->sub_plans_[event.id_].second)); DANCE_DEBUG (9, (LM_DEBUG, DLINFO - ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") - ACE_TEXT ("Successfully executed preparePlan on locality %C\n"), - lm_idd.name.in ())); + ACE_TEXT ("NodeApplication_Impl::prepare_instances - ") + ACE_TEXT ("Successfully executed preparePlan on locality %C\n"), + event.id_.c_str ())); } } @@ -147,7 +192,7 @@ NodeApplication_Impl::prepare_instance (const char *name, try { app = this->localities_[name]->preparePlan (plan, - 0); + 0); DANCE_DEBUG (6, (LM_DEBUG, DLINFO ACE_TEXT ("NodeApplication_Impl::prepare_instance - ") ACE_TEXT ("Locality <%C> successfully prepared.\n"), @@ -180,13 +225,13 @@ NodeApplication_Impl::start_launch_instances (const Deployment::Properties &prop Deployment::Connections_out providedReference) { DANCE_TRACE ("NodeApplication_Impl::start_launch_instances"); -// Deployment::Connections *tmp (0); + // Deployment::Connections *tmp (0); -// ACE_NEW_THROW_EX (tmp, -// Deployment::Connections (this->plan_.connection.length ()), -// CORBA::NO_MEMORY ()); -// -// Deployment::Connections_var retval (tmp); + // ACE_NEW_THROW_EX (tmp, + // Deployment::Connections (this->plan_.connection.length ()), + // CORBA::NO_MEMORY ()); + // + // Deployment::Connections_var retval (tmp); for (LOCALITY_MAP::const_iterator i = this->localities_.begin (); i != this->localities_.end (); ++i) @@ -297,8 +342,9 @@ NodeApplication_Impl::remove_instances (void) ::Deployment::StopError final_exception; bool flag (false); + std::list < Event_Future > removed_instances; - for (LOCALITY_MAP::iterator i = this->localities_.begin (); + for (LOCALITY_MAP::iterator i = this->localities_.begin (); i != this->localities_.end (); ++i) { DANCE_DEBUG (4, (LM_INFO, DLINFO @@ -309,17 +355,93 @@ NodeApplication_Impl::remove_instances (void) try { CORBA::Any ref; - ref <<= DAnCE::LocalityManager::_duplicate (i->second); + ref <<= ::DAnCE::LocalityManager::_duplicate (i->second); i->second->destroyApplication (0); + + Remove_Instance *event (0); + Event_Future result; + + PLAN_MAP::iterator sub_plan; + + if ((sub_plan = this->sub_plans_.find (i->first)) != + this->sub_plans_.end ()) + { + ACE_NEW (event, + Remove_Instance (sub_plan->second.second, + sub_plan->second.first, + ref, + DANCE_LOCALITYMANAGER, + result)); + + removed_instances.push_back (result); + this->scheduler_.schedule_event (event); + } + else + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("NodeApplication_Impl::remove_instances - ") + ACE_TEXT ("Unable to find sub plan for instance <%C>\n"), + i->first.c_str ())); + } + } + catch (::Deployment::StopError &ex) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("NodeApplication_Impl::remove_instances - ") + ACE_TEXT ("Caught StopError final_exception %C, %C\n"), + ex.name.in (), + ex.reason.in ())); + Utility::test_and_set_exception (flag, + final_exception, + ex.name.in (), + ex.reason.in ()); - ::Deployment::DeploymentPlan plan; // TODO: MCO - this really sucks; if there is a real need - // for the plan here we should probably get the - // correct sub plan passed somehow like in - // prepare_instances () - this->handler_.remove_instance (plan, /* not needed at this time */ - 0, /* not needed at this time */ - ref); + } + catch (CORBA::Exception &ex) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("NodeApplication_Impl::remove_instances - ") + ACE_TEXT ("Caught CORBA Final_Exception %C\n"), + ex._info ().c_str ())); + Utility::test_and_set_exception (flag, + final_exception, + "Unknown CORBA Final_Exception", + ex._info ().c_str ()); + } + } + + for (std::list < Event_Future >::iterator i = removed_instances.begin (); + i != removed_instances.end (); + ++i) + { + try + { + Event_Result event; + + if (i->get (event, + 0 /* need to wait based on spawn delay */) != 0) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("NodeApplication_Impl::remove_instances - ") + ACE_TEXT ("Failed to get future value for current instance\n"))); + continue; + } + + using DAnCE::Utility::extract_and_throw_exception; + + if (event.exception_ && + !(extract_and_throw_exception< ::Deployment::StopError > (event.contents_.in ()))) + { + DANCE_ERROR (1, (LM_ERROR, DLINFO + ACE_TEXT ("NodeApplication_Impl::remove_instances - ") + ACE_TEXT ("Unexpected exception thrown during removal of ") + ACE_TEXT ("instance <%C>\n"), + event.id_.c_str ())); + + throw ::Deployment::StopError (event.id_.c_str (), + "Unknown exception thrown from remove_instance\n"); + } } catch (::Deployment::StopError &ex) @@ -330,9 +452,9 @@ NodeApplication_Impl::remove_instances (void) ex.name.in (), ex.reason.in ())); Utility::test_and_set_exception (flag, - final_exception, - ex.name.in (), - ex.reason.in ()); + final_exception, + ex.name.in (), + ex.reason.in ()); } catch (CORBA::Exception &ex) @@ -342,14 +464,12 @@ NodeApplication_Impl::remove_instances (void) ACE_TEXT ("Caught CORBA Final_Exception %C\n"), ex._info ().c_str ())); Utility::test_and_set_exception (flag, - final_exception, - "Unknown CORBA Final_Exception", - ex._info ().c_str ()); + final_exception, + "Unknown CORBA Final_Exception", + ex._info ().c_str ()); } - - //this->localities_.erase (i); + } - - if (flag) - throw final_exception; + if (flag) + throw final_exception; } diff --git a/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.h b/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.h index 8c2de19ef10..8f77c9b2cfd 100644 --- a/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.h +++ b/CIAO/DAnCE/NodeApplication/NodeApplication_Impl.h @@ -29,8 +29,9 @@ #include "Deployment/Deployment_DeploymentPlanC.h" #include "Deployment/DeploymentC.h" #include "DAnCE/DAnCE_Utility.h" +#include "DAnCE/DAnCE_LocalityManagerC.h" +#include "LocalityManager/Scheduler/Deployment_Scheduler.h" #include "DAnCE/DAnCE_ArtifactInstallationC.h" -#include "LocalityManager/Handler/Locality_Manager_Handler_Impl.h" #include "Split_Plan/Locality_Splitter.h" #include "Split_Plan/Split_Plan.h" @@ -45,7 +46,7 @@ namespace DAnCE typedef DAnCE::Split_Plan < DAnCE::Locality_Splitter > LocalitySplitter; class NodeManager_Impl; - + class NodeApplication_Export NodeApplication_Impl : public virtual POA_Deployment::NodeApplication { @@ -70,6 +71,13 @@ namespace DAnCE void remove_instances (void); + typedef std::map <std::string, ::DAnCE::LocalityManager_var> + LOCALITY_MAP; + + + typedef std::pair <CORBA::ULong, ::Deployment::DeploymentPlan> SUB_PLAN; + typedef std::map <std::string, SUB_PLAN> PLAN_MAP; + protected: void prepare_instance (const char *name, const ::Deployment::DeploymentPlan &plan); @@ -82,14 +90,11 @@ namespace DAnCE ACE_CString node_name_; - PROPERTY_MAP properties_; - - DAnCE::Locality_Handler_i handler_; - - typedef std::map <ACE_CString, ::DAnCE::LocalityManager_var> - LOCALITY_MAP; LOCALITY_MAP localities_; + + DAnCE::Deployment_Scheduler scheduler_; + PLAN_MAP sub_plans_; }; } #endif /*NODEAPPLICATION_IMPL_H_*/ |