diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LoadBalancing/LB_LoadManager.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/LoadBalancing/LB_LoadManager.cpp | 621 |
1 files changed, 621 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/LB_LoadManager.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/LB_LoadManager.cpp new file mode 100644 index 00000000000..b47a3d92b60 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/LB_LoadManager.cpp @@ -0,0 +1,621 @@ +#include "LB_LoadManager.h" +#include "LB_MemberLocator.h" +#include "LB_conf.h" + +#include "orbsvcs/PortableGroup/PG_Property_Utils.h" +#include "orbsvcs/PortableGroup/PG_conf.h" + +#include "tao/ORB_Core.h" + +//#include "ace/Auto_Ptr.h" + + +ACE_RCSID (LoadBalancing, + LB_LoadManager, + "$Id$") + + +TAO_LB_LoadManager::TAO_LB_LoadManager (void) + : reactor_ (0), + poa_ (), + lock_ (), + monitor_map_ (TAO_PG_MAX_LOCATIONS), + object_group_manager_ (), + property_manager_ (object_group_manager_), + generic_factory_ (object_group_manager_, property_manager_), + pull_handler_ (), + timer_id_ (-1), + lm_ref_ () +{ + this->pull_handler_.initialize (&this->monitor_map_, this); +} + +TAO_LB_LoadManager::~TAO_LB_LoadManager (void) +{ +} + +void +TAO_LB_LoadManager::push_loads ( + const PortableGroup::Location & /* the_location */, + const CosLoadBalancing::LoadList & /* loads */ + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + CosLoadBalancing::StrategyNotAdaptive)) +{ + ACE_THROW (CORBA::NO_IMPLEMENT ()); +} + +void +TAO_LB_LoadManager::register_load_monitor ( + CosLoadBalancing::LoadMonitor_ptr load_monitor, + const PortableGroup::Location & the_location + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + CosLoadBalancing::MonitorAlreadyPresent)) +{ + if (CORBA::is_nil (load_monitor)) + ACE_THROW (CORBA::BAD_PARAM ()); + + const CosLoadBalancing::LoadMonitor_var the_monitor = + CosLoadBalancing::LoadMonitor::_duplicate (load_monitor); + + ACE_GUARD (TAO_SYNCH_MUTEX, + guard, + this->lock_); + + int result = this->monitor_map_.bind (the_location, the_monitor); + + if (result == 0 + && this->monitor_map_.current_size () == 1) + { + // Register the "pull monitoring" event handler only after the + // first load monitor is registered. This is an optimization to + // prevent unnecessary invocation of the "pull monitoring" event + // handler. + ACE_Time_Value interval (TAO_LB_PULL_HANDLER_INTERVAL, 0); + ACE_Time_Value restart (TAO_LB_PULL_HANDLER_RESTART, 0); + this->timer_id_ = this->reactor_->schedule_timer (&this->pull_handler_, + 0, + interval, + restart); + + if (this->timer_id_ == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + "TAO_LB_LoadManager::register_load_monitor: " + "Unable to schedule timer.\n")); + + ACE_THROW (CORBA::INTERNAL ()); + } + } + else if (result == 1) + ACE_THROW (CosLoadBalancing::MonitorAlreadyPresent ()); + else + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + "TAO_LB_LoadManager::register_load_monitor: " + "Unable to register load monitor.\n")); + + ACE_THROW (CORBA::INTERNAL ()); + } +} + +CosLoadBalancing::LoadMonitor_ptr +TAO_LB_LoadManager::get_load_monitor ( + const PortableGroup::Location & the_location + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + CosLoadBalancing::LocationNotFound)) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, + guard, + this->lock_, + CosLoadBalancing::LoadMonitor::_nil ()); + + TAO_LB_MonitorMap::ENTRY * entry; + if (this->monitor_map_.find (the_location, entry) == 0) + { + return + CosLoadBalancing::LoadMonitor::_duplicate (entry->int_id_.in ()); + } + + ACE_THROW_RETURN (CosLoadBalancing::LocationNotFound (), + CosLoadBalancing::LoadMonitor::_nil ()); +} + +void +TAO_LB_LoadManager::remove_load_monitor ( + const PortableGroup::Location & the_location + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + CosLoadBalancing::LocationNotFound)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, + guard, + this->lock_); + + if (this->monitor_map_.unbind (the_location) != 0) + ACE_THROW (CosLoadBalancing::LocationNotFound ()); + + // If no load monitors are registered with the load balancer than + // shutdown the "pull monitoring." + if (this->timer_id_ != -1 + && this->monitor_map_.current_size () == 0) + { + if (this->reactor_->cancel_timer (this->timer_id_) == 0) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + "TAO_LB_LoadManager::remove_load_monitor: " + "Unable to cancel timer.\n")); + + ACE_THROW (CORBA::INTERNAL ()); + } + + this->timer_id_ = -1; + } +} + +void +TAO_LB_LoadManager::set_default_properties ( + const PortableGroup::Properties & props + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::InvalidProperty, + PortableGroup::UnsupportedProperty)) +{ + this->property_manager_.set_default_properties (props + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::Properties * +TAO_LB_LoadManager::get_default_properties ( + ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return + this->property_manager_.get_default_properties ( + ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +TAO_LB_LoadManager::remove_default_properties ( + const PortableGroup::Properties &props + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::InvalidProperty, + PortableGroup::UnsupportedProperty)) +{ + this->property_manager_.remove_default_properties (props + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_LB_LoadManager::set_type_properties ( + const char *type_id, + const PortableGroup::Properties &overrides + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::InvalidProperty, + PortableGroup::UnsupportedProperty)) +{ + this->property_manager_.set_type_properties (type_id, + overrides + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::Properties * +TAO_LB_LoadManager::get_type_properties ( + const char *type_id + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return + this->property_manager_.get_type_properties (type_id + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_LB_LoadManager::remove_type_properties ( + const char *type_id, + const PortableGroup::Properties &props + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::InvalidProperty, + PortableGroup::UnsupportedProperty)) +{ + this->property_manager_.remove_type_properties (type_id, + props + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_LB_LoadManager::set_properties_dynamically ( + PortableGroup::ObjectGroup_ptr object_group, + const PortableGroup::Properties &overrides + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound, + PortableGroup::InvalidProperty, + PortableGroup::UnsupportedProperty)) +{ + this->property_manager_.set_properties_dynamically (object_group, + overrides + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::Properties * +TAO_LB_LoadManager::get_properties ( + PortableGroup::ObjectGroup_ptr object_group + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound)) +{ + return + this->property_manager_.get_properties (object_group + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::ObjectGroup_ptr +TAO_LB_LoadManager::create_member ( + PortableGroup::ObjectGroup_ptr object_group, + const PortableGroup::Location &the_location, + const char *type_id, + const PortableGroup::Criteria &the_criteria + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound, + PortableGroup::MemberAlreadyPresent, + PortableGroup::NoFactory, + PortableGroup::ObjectNotCreated, + PortableGroup::InvalidCriteria, + PortableGroup::CannotMeetCriteria)) +{ + return + this->object_group_manager_.create_member (object_group, + the_location, + type_id, + the_criteria + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::ObjectGroup_ptr +TAO_LB_LoadManager::add_member ( + PortableGroup::ObjectGroup_ptr object_group, + const PortableGroup::Location &the_location, + CORBA::Object_ptr member + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound, + PortableGroup::MemberAlreadyPresent, + PortableGroup::ObjectNotAdded)) +{ + return + this->object_group_manager_.add_member (object_group, + the_location, + member + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::ObjectGroup_ptr +TAO_LB_LoadManager::remove_member ( + PortableGroup::ObjectGroup_ptr object_group, + const PortableGroup::Location &the_location + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound, + PortableGroup::MemberNotFound)) +{ + return + this->object_group_manager_.remove_member (object_group, + the_location + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::Locations * +TAO_LB_LoadManager::locations_of_members ( + PortableGroup::ObjectGroup_ptr object_group + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound)) +{ + return + this->object_group_manager_.locations_of_members (object_group + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::ObjectGroupId +TAO_LB_LoadManager::get_object_group_id ( + PortableGroup::ObjectGroup_ptr object_group + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound)) +{ + return + this->object_group_manager_.get_object_group_id (object_group + ACE_ENV_ARG_PARAMETER); +} + +PortableGroup::ObjectGroup_ptr +TAO_LB_LoadManager::get_object_group_ref ( + PortableGroup::ObjectGroup_ptr object_group + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound)) +{ + return + this->object_group_manager_.get_object_group_ref (object_group + ACE_ENV_ARG_PARAMETER); +} + +CORBA::Object_ptr +TAO_LB_LoadManager::get_member_ref ( + PortableGroup::ObjectGroup_ptr object_group, + const PortableGroup::Location &the_location + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectGroupNotFound, + PortableGroup::MemberNotFound)) +{ + return + this->object_group_manager_.get_member_ref (object_group, + the_location + ACE_ENV_ARG_PARAMETER); +} + +CORBA::Object_ptr +TAO_LB_LoadManager::create_object ( + const char * type_id, + const PortableGroup::Criteria & the_criteria, + PortableGroup::GenericFactory::FactoryCreationId_out + factory_creation_id + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::NoFactory, + PortableGroup::ObjectNotCreated, + PortableGroup::InvalidCriteria, + PortableGroup::InvalidProperty, + PortableGroup::CannotMeetCriteria)) +{ +// this->init (ACE_ENV_SINGLE_ARG_PARAMETER); +// ACE_CHECK_RETURN (CORBA::Object::_nil ()); + + CORBA::Object_ptr obj = + this->generic_factory_.create_object (type_id, + the_criteria, + factory_creation_id + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (CORBA::Object::_nil ()); + + + return obj; +} + +#if 0 +void +TAO_LB_LoadManager::process_criteria ( + const PortableGroup::Criteria & the_criteria + ACE_ENV_ARG_DECL) +{ + // List of invalid criteria. If this list has a length greater than + // zero, then the PortableGroup::InvalidCriteria exception will + // be thrown. + PortableGroup::Criteria invalid_criteria; + + int found_factory = 0; // If factory was found in the_criteria, then + // set to 1. + + // Parse the criteria. + CORBA::ULong criteria_count = the_criteria.length (); + for (CORBA::ULong i = 0; i < criteria_size; ++i) + { + CORBA::UShort initial_number_replicas = 0; + PortableGroup::FactoryInfos factory_infos; + + // Obtain the InitialNumberMembers from the_criteria. + if (this->get_initial_number_replicas (type_id, + the_criteria[i], + initial_number_replicas) != 0) + { + CORBA::ULong len = invalid_criteria.length (); + invalid_criteria.length (len + 1); + invalid_criteria[len] = the_criteria[i]; + } + + // Obtain the FactoryInfos from the_criteria. This method also + // ensures that GenericFactories at different locations are used. + else if (this->get_factory_infos (type_id, + the_criteria[i], + factory_infos) == 0) + found_factory = 1; + + // Unknown property + else + ACE_THROW (PortableGroup::InvalidProperty (the_criteria[i].nam, + the_criteria[i].val)); + } + + if (invalid_criteria.length () != 0) + ACE_THROW (PortableGroup::InvalidCriteria (invalid_criteria)); + + if (found_factory == 0) + ACE_THROW (PortableGroup::NoFactory ()); +} +#endif /* 0 */ + +void +TAO_LB_LoadManager::delete_object ( + const PortableGroup::GenericFactory::FactoryCreationId & + factory_creation_id + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableGroup::ObjectNotFound)) +{ + this->generic_factory_.delete_object (factory_creation_id + ACE_ENV_ARG_PARAMETER); +} + +CORBA::Object_ptr +TAO_LB_LoadManager::member (const PortableServer::ObjectId & oid + ACE_ENV_ARG_DECL) +{ + // Pass the cached loads to the balancing strategy so that the + // strategy may choose which member to forward requests to next. + + PortableGroup::ObjectGroup_var object_group = + this->object_group_manager_.object_group (oid); + + if (CORBA::is_nil (object_group.in ())) + ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), + CORBA::Object::_nil ()); + + PortableGroup::Properties_var properties = + this->get_properties (object_group.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (CORBA::Object::_nil ()); + + // @todo Cache this property name. No need to create it each time + // this method is called. + PortableGroup::Name balancing_strategy_name (1); + balancing_strategy_name.length (1); + balancing_strategy_name[0].id = + CORBA::string_dup ("org.omg.CosLoadBalancing.Strategy"); + + PortableGroup::Value value; + CosLoadBalancing::Strategy_var balancing_strategy; + if (TAO_PG::get_property_value (balancing_strategy_name, + properties.in (), + value) + && (value >>= balancing_strategy.inout ()) + && !CORBA::is_nil (balancing_strategy.in ())) + { + // @todo Cache this ObjectGroupManager reference. + PortableGroup::ObjectGroupManager_var group_manager = + this->object_group_manager_._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (CORBA::Object::_nil ()); + + return balancing_strategy->next_member (object_group.in (), + this->lm_ref_.in () + ACE_ENV_ARG_PARAMETER); + } + else + { + ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), + CORBA::Object::_nil ()); + } +} + +void +TAO_LB_LoadManager::init ( + CORBA::ORB_ptr orb, + PortableServer::POA_ptr root_poa + ACE_ENV_ARG_DECL) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, + guard, + this->lock_); + + if (CORBA::is_nil (this->poa_.in ())) + { + // Create a new transient servant manager object in the child + // POA. + PortableServer::ServantManager_ptr tmp; + ACE_NEW_THROW_EX (tmp, + TAO_LB_MemberLocator (this), + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO_DEFAULT_MINOR_CODE, + ENOMEM), + CORBA::COMPLETED_NO)); + ACE_CHECK; + + PortableServer::ServantManager_var member_locator = tmp; + + // Create the appropriate RequestProcessingPolicy + // (USE_SERVANT_MANAGER) and ServantRetentionPolicy (NON_RETAIN) + // for a ServantLocator. + PortableServer::RequestProcessingPolicy_var request = + root_poa->create_request_processing_policy ( + PortableServer::USE_SERVANT_MANAGER + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::ServantRetentionPolicy_var retention = + root_poa->create_servant_retention_policy ( + PortableServer::NON_RETAIN + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Create the PolicyList containing the policies necessary for + // the POA to support ServantLocators. + CORBA::PolicyList policy_list; + policy_list.length (2); + policy_list[0] = + PortableServer::RequestProcessingPolicy::_duplicate ( + request.in ()); + policy_list[1] = + PortableServer::ServantRetentionPolicy::_duplicate ( + retention.in ()); + + // Create the child POA with the above ServantManager policies. + // The ServantManager will be the MemberLocator. + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + + // The child POA's name will consist of a string that includes + // the current time in milliseconds in hexidecimal format (only + // four bytes will be used). This is an attempt to prevent + // different load manager servants within the same ORB from + // using the same POA. + const ACE_Time_Value tv = ACE_OS::gettimeofday (); + const CORBA::Long time = + ACE_static_cast (CORBA::Long, + tv.msec ()); // Time in milliseconds. + + char poa_name[] = "TAO_LB_LoadManager_POA - 0xZZZZZZZZ"; + char * astr = + poa_name + + sizeof (poa_name) + - 9 /* 8 + 1 */; + + // Overwrite the last 8 characters in the POA name. + ACE_OS::sprintf (astr, "%d", time); + + this->poa_ = root_poa->create_POA (poa_name, + poa_manager.in (), + policy_list + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + request->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + retention->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Now set the MemberLocator as the child POA's Servant + // Manager. + this->poa_->set_servant_manager (member_locator.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->object_group_manager_.poa (this->poa_.in ()); + this->generic_factory_.poa (this->poa_.in ()); + + // Activate the child POA. + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->reactor_ = orb->orb_core ()->reactor (); + } + + if (CORBA::is_nil (this->lm_ref_.in ())) + { + this->lm_ref_ = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + } +} |