diff options
author | Ossama Othman <ossama-othman@users.noreply.github.com> | 2000-06-16 02:41:19 +0000 |
---|---|---|
committer | Ossama Othman <ossama-othman@users.noreply.github.com> | 2000-06-16 02:41:19 +0000 |
commit | 2a060da95faddedde53be3d4c7b87037b1452901 (patch) | |
tree | 91b48b335e7b3c92d985ab374961e4f114c597fd | |
parent | f6bf610aad4a7f9ee3f84c8aefea3895a769b561 (diff) | |
download | ATCD-2a060da95faddedde53be3d4c7b87037b1452901.tar.gz |
ChangeLogTag:Thu Jun 15 15:27:34 2000 Ossama Othman <ossama@uci.edu>
27 files changed, 827 insertions, 533 deletions
diff --git a/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.cpp b/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.cpp deleted file mode 100644 index 851de3a88d0..00000000000 --- a/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// -*- C++ -*- - -// $Id$ - -#include "DSI_ForwardingProxy.h" -#include "LoadBalancer_i.h" - -ACE_RCSID(orbsvcs, DSI_ForwardingProxy, "$Id$") - -TAO_LB_DSI_ForwardingProxy::TAO_LB_DSI_ForwardingProxy ( - TAO_LB_LoadBalancer *lb, - const char *id) - : load_balancer_ (lb), // Hopefully these pointers won't be zero! - interface_id_ (id) -{ - // @@ Ossama: why is this comment useful? - // @@ Carlos: You are correct. It servers no purpose other than to - // take up space. It is simply there to make it easier for me to - // spot the function body. It is just my coding style. - - // Nothing else -} - -void -TAO_LB_DSI_ForwardingProxy::invoke (CORBA::ServerRequest_ptr /* request */, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - PortableServer::ForwardRequest)) -{ - CORBA::Object_ptr replica = - this->load_balancer_->replica (ACE_TRY_ENV); - ACE_CHECK; - - // Throw a forward exception to force the client to redirect its - // requests to the Replica chosen by the LoadBalancer. - - // @@ We should NOT be throwing this exception inside user code. - // Instead we should be using servant locators. - ACE_THROW (PortableServer::ForwardRequest ( - CORBA::Object::_duplicate (replica))); -} - -CORBA::RepositoryId -TAO_LB_DSI_ForwardingProxy::_primary_interface ( - const PortableServer::ObjectId &, - PortableServer::POA_ptr, - CORBA::Environment &) - ACE_THROW_SPEC (()) -{ - return CORBA::string_dup (this->interface_id_.in ()); -} diff --git a/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.h b/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.h deleted file mode 100644 index ec3fcdeb7d8..00000000000 --- a/TAO/orbsvcs/orbsvcs/DSI_ForwardingProxy.h +++ /dev/null @@ -1,78 +0,0 @@ -// -*- C++ -*- - -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// TAO_LoadBalancing -// -// = FILENAME -// DSI_ForwardingProxy.h -// -// = AUTHOR -// Ossama Othman <ossama@uci.edu> -// -// ============================================================================ - -#ifndef DSI_FORWARDING_PROXY_H -#define DSI_FORWARDING_PROXY_H - -#include "ace/pre.h" - -#include "orbsvcs/LoadBalancingS.h" -#include "LoadBalancing_export.h" - -# if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -# endif /* ACE_LACKS_PRAGMA_ONCE */ - -// Forward declaration. -class TAO_LB_LoadBalancer; - -// @@ Ossama: we have to change the implementation to use a -// ServantLocator, that can actually raise the ForwardingRequest -// exception. - -class TAO_LoadBalancing_Export TAO_LB_DSI_ForwardingProxy : public PortableServer::DynamicImplementation -{ - // = TITLE - // Class that provides request forwarding. - - // = DESCRIPTION - // Using the Dynamic Skeleton Interface, this class intercepts - // requests from a client, and throws a location forwarding - // exception that contains the location of the server to which the - // client should redirect its requests. - -public: - TAO_LB_DSI_ForwardingProxy (TAO_LB_LoadBalancer *load_balancer, - const char *interface_id); - // Constructor that sets the interface repository ID this DSI object - // is associated with. - - // = The DynamicImplementation methods - - virtual void invoke (CORBA::ServerRequest_ptr request, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - PortableServer::ForwardRequest)); - - virtual CORBA::RepositoryId _primary_interface ( - const PortableServer::ObjectId &oid, - PortableServer::POA_ptr poa, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC (()); - -private: - TAO_LB_LoadBalancer *load_balancer_; - // The load balancer implementation. - - CORBA::String_var interface_id_; - // The interface repository ID of the target object. - -}; - -#include "ace/post.h" - -#endif /* DSI_FORWARDING_PROXY_H */ diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.cpp index 7662af085ff..58a597fe088 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.cpp +++ b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.cpp @@ -12,14 +12,14 @@ ACE_RCSID(orbsvcs, LoadBalancer, "$Id$") #endif /* __ACE_INLINE__ */ TAO_LB_LoadBalancer::TAO_LB_LoadBalancer ( - const char *interface_id, - TAO_LB_LoadBalancing_Strategy *strategy, - PortableServer::POA_ptr poa) - : redirector_ (this, interface_id), + const char * interface_id, + TAO_LB_LoadBalancing_Strategy *strategy, + PortableServer::POA_ptr root_poa) + : locator_ (this), strategy_ (strategy), - poa_ (PortableServer::POA::_duplicate (poa)) + poa_ () { - (void) this->init (); + (void) this->init (interface_id, root_poa); } TAO_LB_LoadBalancer::~TAO_LB_LoadBalancer (void) @@ -74,23 +74,86 @@ TAO_LB_LoadBalancer::load_changed (TAO_LB_ReplicaProxy *proxy, } int -TAO_LB_LoadBalancer::init (void) +TAO_LB_LoadBalancer::init (const char * repository_id, + PortableServer::POA_ptr root_poa) { ACE_TRY_NEW_ENV { - PortableServer::ObjectId_var oid = - this->poa_->activate_object (&this->redirector_, - ACE_TRY_ENV); + // Create a new transient servant manager object in the Root + // POA. + PortableServer::ServantManager_var servant_manager = + this->locator_._this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Create the appropriate RequestProcessingPolicy + // (USE_SERVANT_MANAGER) and ServantRetentionPolicy (NON_RETAIN) + // for a ServantLocator. + PortableServer::RequestProcessingPolicy_var request = + root_poa->create_request_processing_policy ( + PortableServer::USE_SERVANT_MANAGER, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::ServantRetentionPolicy_var retention = + root_poa->create_servant_retention_policy ( + PortableServer::NON_RETAIN, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Create the PolicyList. + CORBA::PolicyList policy_list; + policy_list.length (2); + policy_list[0] = + PortableServer::RequestProcessingPolicy::_duplicate ( + request.in ()); + policy_list[1] = + PortableServer::ServantRetentionPolicy::_duplicate ( + retention. in ()); + + // Create the child POA with the ServantManager (ReplicaLocator) + // above policies. + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + this->poa_ = root_poa->create_POA ("TAO_LB_ReplicaLocator_POA", + poa_manager.in (), + policy_list, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Activate the child POA. + poa_manager->activate (ACE_TRY_ENV); ACE_TRY_CHECK; + request->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + retention->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Now set the ReplicaLocator as the child POA's Servant + // Manager. + this->poa_->set_servant_manager (servant_manager.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // @@ What ObjectId should be used? + PortableServer::ObjectId_var oid = + PortableServer::string_to_ObjectId ("TAO_LB_ObjectGroup"); + this->group_identity_ = - this->poa_->id_to_reference (oid.in (), - ACE_TRY_ENV); + this->poa_->create_reference_with_id (oid.in (), + repository_id, + ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY { // @@ Should we do anything here? + + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "(%P|%t) Load Balancer initialization:"); + return -1; } ACE_ENDTRY; diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.h b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.h index 3a174658c15..1b711b531e6 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.h +++ b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.h @@ -22,7 +22,7 @@ #include "orbsvcs/LoadBalancingS.h" #include "ReplicaProxy.h" -#include "DSI_ForwardingProxy.h" +#include "ReplicaLocator.h" #include "LoadBalancing_Strategy.h" #include "LoadBalancing_export.h" @@ -43,7 +43,7 @@ class TAO_LoadBalancing_Export TAO_LB_LoadBalancer : public virtual POA_LoadBala public: TAO_LB_LoadBalancer (const char *interface_id, TAO_LB_LoadBalancing_Strategy *strategy, - PortableServer::POA_ptr poa); + PortableServer::POA_ptr root_poa); // Constructor that initializes this Load Balancer for use with a // Replica that has the specified interface repository ID. @@ -86,11 +86,12 @@ public: ACE_THROW_SPEC ((CORBA::SystemException)); private: - int init (void); - // Initialize the <redirector_> DSI forwarding proxy. + int init (const char * repository_id, + PortableServer::POA_ptr root_poa); + // Initialize the <locator_> ReplicaLocator. private: - TAO_LB_DSI_ForwardingProxy redirector_; + TAO_LB_ReplicaLocator locator_; // The object that tells the invoking client to forward its requests // from the LoadBalancer to an actual replica. @@ -98,7 +99,7 @@ private: // The underlying load balancing strategy. PortableServer::POA_var poa_; - // The POA where the forwarding proxy is activated. + // The POA that dispatches requests to the ReplicaLocator. CORBA::Object_var group_identity_; // The group identity diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.i b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.i index d7938c563d8..884b8ab95ab 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancer_i.i +++ b/TAO/orbsvcs/orbsvcs/LoadBalancer_i.i @@ -12,6 +12,8 @@ TAO_LB_LoadBalancer::disconnect (TAO_LB_ReplicaProxy *proxy, (void) this->strategy_->remove (proxy); + // @@ Deactivate the proxy servant. + #if 0 if (this->strategy_->remove (proxy) != 0) ACE_THROW (LoadBalancing::LoadBalancer::InvalidReplicaProxy ()); diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.cpp deleted file mode 100644 index 851de3a88d0..00000000000 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// -*- C++ -*- - -// $Id$ - -#include "DSI_ForwardingProxy.h" -#include "LoadBalancer_i.h" - -ACE_RCSID(orbsvcs, DSI_ForwardingProxy, "$Id$") - -TAO_LB_DSI_ForwardingProxy::TAO_LB_DSI_ForwardingProxy ( - TAO_LB_LoadBalancer *lb, - const char *id) - : load_balancer_ (lb), // Hopefully these pointers won't be zero! - interface_id_ (id) -{ - // @@ Ossama: why is this comment useful? - // @@ Carlos: You are correct. It servers no purpose other than to - // take up space. It is simply there to make it easier for me to - // spot the function body. It is just my coding style. - - // Nothing else -} - -void -TAO_LB_DSI_ForwardingProxy::invoke (CORBA::ServerRequest_ptr /* request */, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - PortableServer::ForwardRequest)) -{ - CORBA::Object_ptr replica = - this->load_balancer_->replica (ACE_TRY_ENV); - ACE_CHECK; - - // Throw a forward exception to force the client to redirect its - // requests to the Replica chosen by the LoadBalancer. - - // @@ We should NOT be throwing this exception inside user code. - // Instead we should be using servant locators. - ACE_THROW (PortableServer::ForwardRequest ( - CORBA::Object::_duplicate (replica))); -} - -CORBA::RepositoryId -TAO_LB_DSI_ForwardingProxy::_primary_interface ( - const PortableServer::ObjectId &, - PortableServer::POA_ptr, - CORBA::Environment &) - ACE_THROW_SPEC (()) -{ - return CORBA::string_dup (this->interface_id_.in ()); -} diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.h deleted file mode 100644 index ec3fcdeb7d8..00000000000 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/DSI_ForwardingProxy.h +++ /dev/null @@ -1,78 +0,0 @@ -// -*- C++ -*- - -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// TAO_LoadBalancing -// -// = FILENAME -// DSI_ForwardingProxy.h -// -// = AUTHOR -// Ossama Othman <ossama@uci.edu> -// -// ============================================================================ - -#ifndef DSI_FORWARDING_PROXY_H -#define DSI_FORWARDING_PROXY_H - -#include "ace/pre.h" - -#include "orbsvcs/LoadBalancingS.h" -#include "LoadBalancing_export.h" - -# if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -# endif /* ACE_LACKS_PRAGMA_ONCE */ - -// Forward declaration. -class TAO_LB_LoadBalancer; - -// @@ Ossama: we have to change the implementation to use a -// ServantLocator, that can actually raise the ForwardingRequest -// exception. - -class TAO_LoadBalancing_Export TAO_LB_DSI_ForwardingProxy : public PortableServer::DynamicImplementation -{ - // = TITLE - // Class that provides request forwarding. - - // = DESCRIPTION - // Using the Dynamic Skeleton Interface, this class intercepts - // requests from a client, and throws a location forwarding - // exception that contains the location of the server to which the - // client should redirect its requests. - -public: - TAO_LB_DSI_ForwardingProxy (TAO_LB_LoadBalancer *load_balancer, - const char *interface_id); - // Constructor that sets the interface repository ID this DSI object - // is associated with. - - // = The DynamicImplementation methods - - virtual void invoke (CORBA::ServerRequest_ptr request, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - PortableServer::ForwardRequest)); - - virtual CORBA::RepositoryId _primary_interface ( - const PortableServer::ObjectId &oid, - PortableServer::POA_ptr poa, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC (()); - -private: - TAO_LB_LoadBalancer *load_balancer_; - // The load balancer implementation. - - CORBA::String_var interface_id_; - // The interface repository ID of the target object. - -}; - -#include "ace/post.h" - -#endif /* DSI_FORWARDING_PROXY_H */ diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.cpp index 7662af085ff..58a597fe088 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.cpp +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.cpp @@ -12,14 +12,14 @@ ACE_RCSID(orbsvcs, LoadBalancer, "$Id$") #endif /* __ACE_INLINE__ */ TAO_LB_LoadBalancer::TAO_LB_LoadBalancer ( - const char *interface_id, - TAO_LB_LoadBalancing_Strategy *strategy, - PortableServer::POA_ptr poa) - : redirector_ (this, interface_id), + const char * interface_id, + TAO_LB_LoadBalancing_Strategy *strategy, + PortableServer::POA_ptr root_poa) + : locator_ (this), strategy_ (strategy), - poa_ (PortableServer::POA::_duplicate (poa)) + poa_ () { - (void) this->init (); + (void) this->init (interface_id, root_poa); } TAO_LB_LoadBalancer::~TAO_LB_LoadBalancer (void) @@ -74,23 +74,86 @@ TAO_LB_LoadBalancer::load_changed (TAO_LB_ReplicaProxy *proxy, } int -TAO_LB_LoadBalancer::init (void) +TAO_LB_LoadBalancer::init (const char * repository_id, + PortableServer::POA_ptr root_poa) { ACE_TRY_NEW_ENV { - PortableServer::ObjectId_var oid = - this->poa_->activate_object (&this->redirector_, - ACE_TRY_ENV); + // Create a new transient servant manager object in the Root + // POA. + PortableServer::ServantManager_var servant_manager = + this->locator_._this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Create the appropriate RequestProcessingPolicy + // (USE_SERVANT_MANAGER) and ServantRetentionPolicy (NON_RETAIN) + // for a ServantLocator. + PortableServer::RequestProcessingPolicy_var request = + root_poa->create_request_processing_policy ( + PortableServer::USE_SERVANT_MANAGER, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::ServantRetentionPolicy_var retention = + root_poa->create_servant_retention_policy ( + PortableServer::NON_RETAIN, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Create the PolicyList. + CORBA::PolicyList policy_list; + policy_list.length (2); + policy_list[0] = + PortableServer::RequestProcessingPolicy::_duplicate ( + request.in ()); + policy_list[1] = + PortableServer::ServantRetentionPolicy::_duplicate ( + retention. in ()); + + // Create the child POA with the ServantManager (ReplicaLocator) + // above policies. + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + this->poa_ = root_poa->create_POA ("TAO_LB_ReplicaLocator_POA", + poa_manager.in (), + policy_list, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Activate the child POA. + poa_manager->activate (ACE_TRY_ENV); ACE_TRY_CHECK; + request->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + retention->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Now set the ReplicaLocator as the child POA's Servant + // Manager. + this->poa_->set_servant_manager (servant_manager.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // @@ What ObjectId should be used? + PortableServer::ObjectId_var oid = + PortableServer::string_to_ObjectId ("TAO_LB_ObjectGroup"); + this->group_identity_ = - this->poa_->id_to_reference (oid.in (), - ACE_TRY_ENV); + this->poa_->create_reference_with_id (oid.in (), + repository_id, + ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY { // @@ Should we do anything here? + + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "(%P|%t) Load Balancer initialization:"); + return -1; } ACE_ENDTRY; diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.h index 3a174658c15..1b711b531e6 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.h +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.h @@ -22,7 +22,7 @@ #include "orbsvcs/LoadBalancingS.h" #include "ReplicaProxy.h" -#include "DSI_ForwardingProxy.h" +#include "ReplicaLocator.h" #include "LoadBalancing_Strategy.h" #include "LoadBalancing_export.h" @@ -43,7 +43,7 @@ class TAO_LoadBalancing_Export TAO_LB_LoadBalancer : public virtual POA_LoadBala public: TAO_LB_LoadBalancer (const char *interface_id, TAO_LB_LoadBalancing_Strategy *strategy, - PortableServer::POA_ptr poa); + PortableServer::POA_ptr root_poa); // Constructor that initializes this Load Balancer for use with a // Replica that has the specified interface repository ID. @@ -86,11 +86,12 @@ public: ACE_THROW_SPEC ((CORBA::SystemException)); private: - int init (void); - // Initialize the <redirector_> DSI forwarding proxy. + int init (const char * repository_id, + PortableServer::POA_ptr root_poa); + // Initialize the <locator_> ReplicaLocator. private: - TAO_LB_DSI_ForwardingProxy redirector_; + TAO_LB_ReplicaLocator locator_; // The object that tells the invoking client to forward its requests // from the LoadBalancer to an actual replica. @@ -98,7 +99,7 @@ private: // The underlying load balancing strategy. PortableServer::POA_var poa_; - // The POA where the forwarding proxy is activated. + // The POA that dispatches requests to the ReplicaLocator. CORBA::Object_var group_identity_; // The group identity diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.i b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.i index d7938c563d8..884b8ab95ab 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.i +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/LoadBalancer_i.i @@ -12,6 +12,8 @@ TAO_LB_LoadBalancer::disconnect (TAO_LB_ReplicaProxy *proxy, (void) this->strategy_->remove (proxy); + // @@ Deactivate the proxy servant. + #if 0 if (this->strategy_->remove (proxy) != 0) ACE_THROW (LoadBalancing::LoadBalancer::InvalidReplicaProxy ()); diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.cpp index 13a24bbe92b..37acf0d0490 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.cpp +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.cpp @@ -10,16 +10,22 @@ ACE_RCSID(orbsvcs, Minimum_Dispersion, "$Id$") TAO_LB_Minimum_Dispersion_Strategy::TAO_LB_Minimum_Dispersion_Strategy (void) - : proxies_ () + : proxies_ (), + lock_ () { } TAO_LB_Minimum_Dispersion_Strategy::~TAO_LB_Minimum_Dispersion_Strategy (void) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); + // @@ Bad deactivation! + for (TAO_LB_ReplicaProxySetIterator i = begin; i != end; ++i) @@ -33,9 +39,18 @@ CORBA::Object_ptr TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ Ossama: more code that is not thread safe - while (!this->proxies_.is_empty ()) + for ( ; ; ) { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + CORBA::Object::_nil ()); + + if (this->proxies_.is_empty ()) + // @@ What do we do if the set is empty? + ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), + CORBA::Object::_nil ()); + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); @@ -52,25 +67,36 @@ TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) } } - // @@ Ossama: we should setup a timeout policy here... - ACE_TRY - { - // Before returning an object reference to the client - // validate it first. - CORBA::Object_var object = - proxy->replica (); - CORBA::Boolean non_existent = - object->_non_existent (ACE_TRY_ENV); - ACE_TRY_CHECK; - if (!non_existent) - { - return object._retn (); - } - } - ACE_CATCHANY - { - } - ACE_ENDTRY; + // Before returning an object reference to the client + // validate it first. + CORBA::Object_var object = + proxy->replica (); + + { + ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); + + ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, + reverse_guard, + reverse_lock, + CORBA::Object::_nil ()); + + // @@ Ossama: we should setup a timeout policy here... + ACE_TRY + { + CORBA::Boolean non_existent = + object->_non_existent (ACE_TRY_ENV); + ACE_TRY_CHECK; + if (!non_existent) + { + return object._retn (); + } + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + } + // @@ Ossama: a bit melodramatic, we remove the object if *any* // exception is thrown. If the object really does not exist (we // get non_existent==1) then this is exactly what we want to do, @@ -78,22 +104,27 @@ TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) // something less drastic, or at least strategize it ;-) this->proxies_.remove (proxy); } - // @@ What do we do if the set is empty? - ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), - CORBA::Object::_nil ()); } int TAO_LB_Minimum_Dispersion_Strategy::insert (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + return this->proxies_.insert (proxy); } int TAO_LB_Minimum_Dispersion_Strategy::remove (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + return this->proxies_.remove (proxy); } @@ -101,42 +132,63 @@ void TAO_LB_Minimum_Dispersion_Strategy::load_changed (TAO_LB_ReplicaProxy *proxy, CORBA::Environment &ACE_TRY_ENV) { - // @@ Ossama: more code that is not thread safe - if (this->proxies_.is_empty ()) - return; + int send_load_advisory = 0; - TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); - TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); + { + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); - float s = 0; - CORBA::ULong n = 0; - TAO_LB_ReplicaProxySetIterator i = begin; - for (;i != end; ++i) - { - s += (*i)->current_load (); - n++; - } + if (this->proxies_.is_empty ()) + return; + + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); + TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); - float avg = (n == 0 ? s : s / n); - float cl = proxy->current_load (); + float s = 0; + CORBA::ULong n = 0; + TAO_LB_ReplicaProxySetIterator i = begin; + for (;i != end; ++i) + { + s += (*i)->current_load (); + n++; + } - if (avg == 0) - return; + float avg = (n == 0 ? s : s / n); + float cl = proxy->current_load (); - float relative_load = cl / avg; + if (avg == 0) + return; + + float relative_load = cl / avg; + + // @@ Ossama: Make the 1.5 factor adjustable, it is how much + // dispersion we tolerate before starting to send advisories. + if (relative_load > 1 + 1.5F / n) + { + proxy->has_high_load_ = 1; + send_load_advisory = 2; // 2 == Send high load advisory + } + + if (send_load_advisory == 1 && relative_load < 1 + 0.9F / n) + { + proxy->has_high_load_ = 0; + send_load_advisory = 1; // 1 == Send nominal load advisory + } + } // @@ Ossama: no debug messages in production code, my fault.... - ACE_DEBUG ((LM_DEBUG, "Load[%x] %f %f %f\n", - proxy, cl, avg, relative_load)); + // ACE_DEBUG ((LM_DEBUG, "Load[%x] %f %f %f\n", + // proxy, cl, avg, relative_load)); // @@ Ossama: Make the 1.5 factor adjustable, it is how much // dispersion we tolerate before starting to send advisories. - if (relative_load > 1 + 1.5F / n) + if (send_load_advisory == 2) { - proxy->has_high_load_ = 1; proxy->control_->high_load_advisory (ACE_TRY_ENV); ACE_CHECK; - return; + + return; // We may not throw an exception, so explicitly return. } // @@ Ossama: notice that we wait until the load is signifcantly @@ -145,9 +197,8 @@ TAO_LB_Minimum_Dispersion_Strategy::load_changed (TAO_LB_ReplicaProxy *proxy, // rejecting one client.... // @@ Ossama: make the 0.9 factor adjustable, at least at // construction time... - if (proxy->has_high_load_ && relative_load < 1 + 0.9F / n) + if (send_load_advisory == 1) { - proxy->has_high_load_ = 0; proxy->control_->nominal_load_advisory (ACE_TRY_ENV); ACE_CHECK; } diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.h index 278c96ef1fd..bd78e0acc1f 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.h +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/Minimum_Dispersion.h @@ -23,6 +23,7 @@ #include "LoadBalancing_Strategy.h" #include "orbsvcs/LoadBalancingS.h" #include "ace/Containers.h" +#include "ace/Synch.h" # if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -65,6 +66,10 @@ public: private: TAO_LB_ReplicaProxySet proxies_; // Set containing the ReplicaProxy servants. + + ACE_SYNCH_MUTEX lock_; + // Lock used to ensure atomic access to state retained by this + // class. }; #include "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.cpp new file mode 100644 index 00000000000..a3d067290cc --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.cpp @@ -0,0 +1,50 @@ +// -*- C++ -*- + +// $Id$ + +#include "ReplicaLocator.h" +#include "LoadBalancer_i.h" + +ACE_RCSID(orbsvcs, ReplicaLocator, "$Id$") + +TAO_LB_ReplicaLocator::TAO_LB_ReplicaLocator (TAO_LB_LoadBalancer *lb) + : load_balancer_ (lb) // This pointer shouldn't be zero! +{ + // Nothing else +} + +PortableServer::Servant +TAO_LB_ReplicaLocator::preinvoke ( + const PortableServer::ObjectId & /* oid */, + PortableServer::POA_ptr /* adapter */, + const char * /* operation */, + PortableServer::ServantLocator::Cookie & /* the_cookie */, + CORBA_Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableServer::ForwardRequest)) +{ + if (this->load_balancer_ == 0) + ACE_THROW_RETURN (CORBA::INTERNAL (), 0); + + CORBA::Object_ptr replica = + this->load_balancer_->replica (ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + // Throw a forward exception to force the client to redirect its + // requests to the Replica chosen by the LoadBalancer. + ACE_THROW_RETURN (PortableServer::ForwardRequest ( + CORBA::Object::_duplicate (replica)), + 0); +} + +void +TAO_LB_ReplicaLocator::postinvoke ( + const PortableServer::ObjectId & /* oid */, + PortableServer::POA_ptr /* adapter */, + const char * /* operation */, + PortableServer::ServantLocator::Cookie /* the_cookie */, + PortableServer::Servant /* the_servant */, + CORBA_Environment & /* ACE_TRY_ENV */) +{ + // No-op +} diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.h new file mode 100644 index 00000000000..4d79e393b98 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaLocator.h @@ -0,0 +1,72 @@ +// -*- C++ -*- + +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// TAO_LoadBalancing +// +// = FILENAME +// ReplicaLocator.h +// +// = AUTHOR +// Ossama Othman <ossama@uci.edu> +// +// ============================================================================ + +#ifndef TAO_REPLICA_LOCATOR_H +#define TAO_REPLICA_LOCATOR_H + +#include "ace/pre.h" + +#include "orbsvcs/LoadBalancingS.h" +#include "LoadBalancing_export.h" + +# if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +# endif /* ACE_LACKS_PRAGMA_ONCE */ + +// Forward declaration. +class TAO_LB_LoadBalancer; + +class TAO_LoadBalancing_Export TAO_LB_ReplicaLocator + : public virtual POA_PortableServer::ServantLocator +{ + // = TITLE + // Class that provides request forwarding. + + // = DESCRIPTION + // This is a Servant Locator implementation that forwards + // requests to a replica returned by the Load Balancer. + +public: + TAO_LB_ReplicaLocator (TAO_LB_LoadBalancer *load_balancer); + // Constructor + + // = The ServantLocator methods + virtual PortableServer::Servant preinvoke ( + const PortableServer::ObjectId &oid, + PortableServer::POA_ptr adapter, + const char *operation, + PortableServer::ServantLocator::Cookie & the_cookie, + CORBA_Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableServer::ForwardRequest)); + + virtual void postinvoke ( + const PortableServer::ObjectId &oid, + PortableServer::POA_ptr adapter, + const char *operation, + PortableServer::ServantLocator::Cookie the_cookie, + PortableServer::Servant the_servant, + CORBA_Environment &ACE_TRY_ENV); + +private: + TAO_LB_LoadBalancer *load_balancer_; + // The load balancer implementation. +}; + +#include "ace/post.h" + +#endif /* TAO_REPLICA_LOCATOR_H */ diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.cpp index 5f3ba7f9c39..9e7bbff3759 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.cpp +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.cpp @@ -13,6 +13,7 @@ ACE_RCSID(orbsvcs, ReplicaProxy, "$Id$") TAO_LB_ReplicaProxy::TAO_LB_ReplicaProxy (void) : has_high_load_ (0), + lock_ (), balancer_ (0), current_load_ (0), connected_ (0) @@ -28,6 +29,9 @@ TAO_LB_ReplicaProxy::current_load (CORBA::Float load, // @@ Ossama: this is the point were the load dampening should // happen. Probably strategized.... + // Do not lock here. Locking is done in the load_changed() method, + // below. + this->current_load_ = load; // ACE_DEBUG ((LM_DEBUG, "Load[%x] = %f\n", long(this), load)); @@ -39,7 +43,9 @@ TAO_LB_ReplicaProxy::disconnect (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((LoadBalancing::ReplicaProxy::NotConnected, CORBA::SystemException)) { - // @@ Ossama: this code is not thread safe... + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); if (this->connected_) { @@ -54,18 +60,18 @@ TAO_LB_ReplicaProxy::disconnect (CORBA::Environment &ACE_TRY_ENV) } } -void TAO_LB_ReplicaProxy::connect (TAO_LB_LoadBalancer *balancer, - LoadBalancing::ReplicaControl_ptr control, - CORBA::Object_ptr replica, - CORBA::Environment &ACE_TRY_ENV) +void +TAO_LB_ReplicaProxy::connect (TAO_LB_LoadBalancer *balancer, + LoadBalancing::ReplicaControl_ptr control, + CORBA::Object_ptr replica, + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((LoadBalancing::ReplicaProxy::NilControl, LoadBalancing::ReplicaProxy::NilReplica, CORBA::SystemException)) { - // @@ Ossama: this is a perfect example of code that is not thread - // safe: what if we get a 'current_load' message in another thread? - // Or a disconnect() message from a misbehaving replica? Or two - // calls to connect? + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); if (balancer == 0) ACE_THROW (CORBA::BAD_PARAM ( diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.h index 34c11aa8e54..68e8245a2b1 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.h +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/ReplicaProxy.h @@ -20,6 +20,7 @@ #include "ace/pre.h" +#include "ace/Synch.h" #include "orbsvcs/LoadBalancingS.h" #include "LoadBalancing_export.h" @@ -100,6 +101,10 @@ private: // with the ReplicaProxy. private: + + ACE_SYNCH_MUTEX lock_; + // Mutex used to ensure access ReplicaProxy state is atomic. + CORBA::Object_var replica_; // Reference to the Object being load balanced. diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.cpp b/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.cpp index 55cb5fef9d6..b537a3d572c 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.cpp @@ -11,12 +11,17 @@ ACE_RCSID(orbsvcs, Round_Robin_Strategy, "$Id$") TAO_LB_Round_Robin_Strategy::TAO_LB_Round_Robin_Strategy (void) : proxies_ (), - next_replica_ (this->proxies_.begin ()) + next_replica_ (this->proxies_.begin ()), + lock_ () { } TAO_LB_Round_Robin_Strategy::~TAO_LB_Round_Robin_Strategy (void) { + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); + // @@ Are the objects deactivated from the POA? And shouldn't this // be done by the LoadBalancing strategy *before* the destructor is // invoked? @@ -37,7 +42,11 @@ CORBA::Object_ptr TAO_LB_Round_Robin_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ Ossama: more code that is not thread safe + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + CORBA::Object::_nil ()); + if (this->proxies_.is_empty ()) { // @@ What do we do if the set is empty? @@ -77,7 +86,11 @@ TAO_LB_Round_Robin_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) int TAO_LB_Round_Robin_Strategy::insert (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + int r = this->proxies_.insert (proxy); this->next_replica_ = this->proxies_.begin (); return r; @@ -86,7 +99,11 @@ TAO_LB_Round_Robin_Strategy::insert (TAO_LB_ReplicaProxy *proxy) int TAO_LB_Round_Robin_Strategy::remove (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + int r = this->proxies_.remove (proxy); this->next_replica_ = this->proxies_.begin (); return r; diff --git a/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.h b/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.h index e8534da4d0d..8abedbc2edb 100644 --- a/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/LoadBalancing/Round_Robin_Strategy.h @@ -23,6 +23,7 @@ #include "LoadBalancing_Strategy.h" #include "orbsvcs/LoadBalancingS.h" #include "ace/Containers.h" +#include "ace/Synch.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -69,6 +70,9 @@ private: // Set containing the ReplicaProxy servants. ACE_Unbounded_Set_Iterator<TAO_LB_ReplicaProxy *> next_replica_; + + ACE_SYNCH_MUTEX lock_; + // Lock used to ensure access to state within this class is atomic. }; #include "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/Makefile.LoadBalancing b/TAO/orbsvcs/orbsvcs/Makefile.LoadBalancing index 1563be78df0..fa866e5e809 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile.LoadBalancing +++ b/TAO/orbsvcs/orbsvcs/Makefile.LoadBalancing @@ -36,9 +36,9 @@ override TAO_IDLFLAGS += \ IDL_SRCS += \ LoadBalancing CPP_SRCS += \ - LoadBalancing/DSI_ForwardingProxy \ LoadBalancing/LoadBalancer_i \ LoadBalancing/LoadBalancing_Strategy \ + LoadBalancing/ReplicaLocator \ LoadBalancing/ReplicaProxy \ LoadBalancing/Round_Robin_Strategy \ LoadBalancing/Minimum_Dispersion @@ -100,10 +100,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -111,6 +107,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -405,10 +405,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -416,6 +412,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -685,11 +685,11 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS.i \ LoadBalancing/LoadBalancing_export.h LoadBalancingC.i -.obj/DSI_ForwardingProxy.o .obj/DSI_ForwardingProxy.so .shobj/DSI_ForwardingProxy.o .shobj/DSI_ForwardingProxy.so: LoadBalancing/DSI_ForwardingProxy.cpp \ - LoadBalancing/DSI_ForwardingProxy.h \ +.obj/LoadBalancer_i.o .obj/LoadBalancer_i.so .shobj/LoadBalancer_i.o .shobj/LoadBalancer_i.so: LoadBalancing/LoadBalancer_i.cpp \ + LoadBalancing/LoadBalancer_i.h \ $(ACE_ROOT)/ace/pre.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ + LoadBalancingS.h \ + LoadBalancingC.h \ $(TAO_ROOT)/tao/corba.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ @@ -711,10 +711,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -722,6 +718,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -989,21 +989,21 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ - LoadBalancing/LoadBalancing_export.h LoadBalancing/LoadBalancer_i.h \ - LoadBalancing/ReplicaProxy.h LoadBalancing/ReplicaProxy.i \ + LoadBalancing/LoadBalancing_export.h \ + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ + LoadBalancing/ReplicaProxy.h LoadBalancing/LoadBalancing_export.h \ + LoadBalancing/ReplicaProxy.i LoadBalancing/ReplicaLocator.h \ LoadBalancing/LoadBalancing_Strategy.h LoadBalancing/LoadBalancer_i.i -.obj/LoadBalancer_i.o .obj/LoadBalancer_i.so .shobj/LoadBalancer_i.o .shobj/LoadBalancer_i.so: LoadBalancing/LoadBalancer_i.cpp \ - LoadBalancing/LoadBalancer_i.h \ +.obj/LoadBalancing_Strategy.o .obj/LoadBalancing_Strategy.so .shobj/LoadBalancing_Strategy.o .shobj/LoadBalancing_Strategy.so: LoadBalancing/LoadBalancing_Strategy.cpp \ + LoadBalancing/LoadBalancing_Strategy.h \ $(ACE_ROOT)/ace/pre.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ + LoadBalancingS.h \ + LoadBalancingC.h \ $(TAO_ROOT)/tao/corba.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ @@ -1025,10 +1025,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -1036,6 +1032,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -1303,21 +1303,20 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ - LoadBalancing/ReplicaProxy.h LoadBalancing/LoadBalancing_export.h \ - LoadBalancing/ReplicaProxy.i LoadBalancing/DSI_ForwardingProxy.h \ - LoadBalancing/LoadBalancing_Strategy.h LoadBalancing/LoadBalancer_i.i + LoadBalancing/LoadBalancing_export.h \ + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ + LoadBalancing/LoadBalancing_export.h \ + LoadBalancing/LoadBalancing_Strategy.i -.obj/LoadBalancing_Strategy.o .obj/LoadBalancing_Strategy.so .shobj/LoadBalancing_Strategy.o .shobj/LoadBalancing_Strategy.so: LoadBalancing/LoadBalancing_Strategy.cpp \ - LoadBalancing/LoadBalancing_Strategy.h \ +.obj/ReplicaLocator.o .obj/ReplicaLocator.so .shobj/ReplicaLocator.o .shobj/ReplicaLocator.so: LoadBalancing/ReplicaLocator.cpp \ + LoadBalancing/ReplicaLocator.h \ $(ACE_ROOT)/ace/pre.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ + LoadBalancingS.h \ + LoadBalancingC.h \ $(TAO_ROOT)/tao/corba.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ @@ -1339,10 +1338,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -1350,6 +1345,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -1617,21 +1616,21 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ LoadBalancing/LoadBalancing_export.h \ - LoadBalancing/LoadBalancing_Strategy.i + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ + LoadBalancing/LoadBalancing_export.h LoadBalancing/LoadBalancer_i.h \ + LoadBalancing/ReplicaProxy.h LoadBalancing/ReplicaProxy.i \ + LoadBalancing/LoadBalancing_Strategy.h LoadBalancing/LoadBalancer_i.i .obj/ReplicaProxy.o .obj/ReplicaProxy.so .shobj/ReplicaProxy.o .shobj/ReplicaProxy.so: LoadBalancing/ReplicaProxy.cpp \ LoadBalancing/ReplicaProxy.h \ $(ACE_ROOT)/ace/pre.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ - $(TAO_ROOT)/tao/corba.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ $(ACE_ROOT)/ace/ace_wchar.h \ @@ -1652,23 +1651,7 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ - $(TAO_ROOT)/tao/corbafwd.h \ - $(ACE_ROOT)/ace/CDR_Stream.h \ - $(ACE_ROOT)/ace/Message_Block.h \ - $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/Malloc.h \ - $(ACE_ROOT)/ace/Malloc_Base.h \ - $(ACE_ROOT)/ace/Based_Pointer_T.h \ - $(ACE_ROOT)/ace/Based_Pointer_T.i \ - $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ - $(ACE_ROOT)/ace/Based_Pointer_Repository.h \ - $(ACE_ROOT)/ace/Singleton.h \ - $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ @@ -1682,6 +1665,23 @@ realclean: clean $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + LoadBalancingS.h \ + LoadBalancingC.h \ + $(TAO_ROOT)/tao/corba.h \ + $(TAO_ROOT)/tao/corbafwd.h \ + $(ACE_ROOT)/ace/CDR_Stream.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.i \ + $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ + $(ACE_ROOT)/ace/Based_Pointer_Repository.h \ + $(ACE_ROOT)/ace/Singleton.h \ $(ACE_ROOT)/ace/Singleton.i \ $(ACE_ROOT)/ace/Singleton.cpp \ $(ACE_ROOT)/ace/Object_Manager.h \ @@ -1930,22 +1930,22 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ + LoadBalancing/LoadBalancing_export.h \ + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ LoadBalancing/LoadBalancing_export.h LoadBalancing/ReplicaProxy.i \ - LoadBalancing/LoadBalancer_i.h LoadBalancing/DSI_ForwardingProxy.h \ + LoadBalancing/LoadBalancer_i.h LoadBalancing/ReplicaLocator.h \ LoadBalancing/LoadBalancing_Strategy.h LoadBalancing/LoadBalancer_i.i .obj/Round_Robin_Strategy.o .obj/Round_Robin_Strategy.so .shobj/Round_Robin_Strategy.o .shobj/Round_Robin_Strategy.so: LoadBalancing/Round_Robin_Strategy.cpp \ LoadBalancing/Round_Robin_Strategy.h \ $(ACE_ROOT)/ace/pre.h \ LoadBalancing/LoadBalancing_Strategy.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ + LoadBalancingS.h \ + LoadBalancingC.h \ $(TAO_ROOT)/tao/corba.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ @@ -1967,10 +1967,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -1978,6 +1974,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -2245,12 +2245,12 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ + LoadBalancing/LoadBalancing_export.h \ + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ LoadBalancing/LoadBalancing_export.h LoadBalancing/ReplicaProxy.h \ LoadBalancing/ReplicaProxy.i @@ -2258,8 +2258,8 @@ realclean: clean LoadBalancing/Minimum_Dispersion.h \ $(ACE_ROOT)/ace/pre.h \ LoadBalancing/LoadBalancing_Strategy.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.h \ + LoadBalancingS.h \ + LoadBalancingC.h \ $(TAO_ROOT)/tao/corba.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ @@ -2281,10 +2281,6 @@ realclean: clean $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ $(TAO_ROOT)/tao/corbafwd.h \ $(ACE_ROOT)/ace/CDR_Stream.h \ $(ACE_ROOT)/ace/Message_Block.h \ @@ -2292,6 +2288,10 @@ realclean: clean $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ $(ACE_ROOT)/ace/Based_Pointer_T.i \ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ @@ -2559,12 +2559,12 @@ realclean: clean $(TAO_ROOT)/tao/MessagingS_T.i \ $(TAO_ROOT)/tao/MessagingS_T.cpp \ $(TAO_ROOT)/tao/MessagingS.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancing/LoadBalancing_export.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingC.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.h \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.i \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS_T.cpp \ - $(TAO_ROOT)/orbsvcs/orbsvcs/LoadBalancingS.i \ + LoadBalancing/LoadBalancing_export.h \ + LoadBalancingC.i \ + LoadBalancingS_T.h \ + LoadBalancingS_T.i \ + LoadBalancingS_T.cpp \ + LoadBalancingS.i \ LoadBalancing/LoadBalancing_export.h LoadBalancing/ReplicaProxy.h \ LoadBalancing/ReplicaProxy.i diff --git a/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.cpp b/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.cpp index 13a24bbe92b..37acf0d0490 100644 --- a/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.cpp +++ b/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.cpp @@ -10,16 +10,22 @@ ACE_RCSID(orbsvcs, Minimum_Dispersion, "$Id$") TAO_LB_Minimum_Dispersion_Strategy::TAO_LB_Minimum_Dispersion_Strategy (void) - : proxies_ () + : proxies_ (), + lock_ () { } TAO_LB_Minimum_Dispersion_Strategy::~TAO_LB_Minimum_Dispersion_Strategy (void) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); + // @@ Bad deactivation! + for (TAO_LB_ReplicaProxySetIterator i = begin; i != end; ++i) @@ -33,9 +39,18 @@ CORBA::Object_ptr TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ Ossama: more code that is not thread safe - while (!this->proxies_.is_empty ()) + for ( ; ; ) { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + CORBA::Object::_nil ()); + + if (this->proxies_.is_empty ()) + // @@ What do we do if the set is empty? + ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), + CORBA::Object::_nil ()); + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); @@ -52,25 +67,36 @@ TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) } } - // @@ Ossama: we should setup a timeout policy here... - ACE_TRY - { - // Before returning an object reference to the client - // validate it first. - CORBA::Object_var object = - proxy->replica (); - CORBA::Boolean non_existent = - object->_non_existent (ACE_TRY_ENV); - ACE_TRY_CHECK; - if (!non_existent) - { - return object._retn (); - } - } - ACE_CATCHANY - { - } - ACE_ENDTRY; + // Before returning an object reference to the client + // validate it first. + CORBA::Object_var object = + proxy->replica (); + + { + ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); + + ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, + reverse_guard, + reverse_lock, + CORBA::Object::_nil ()); + + // @@ Ossama: we should setup a timeout policy here... + ACE_TRY + { + CORBA::Boolean non_existent = + object->_non_existent (ACE_TRY_ENV); + ACE_TRY_CHECK; + if (!non_existent) + { + return object._retn (); + } + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + } + // @@ Ossama: a bit melodramatic, we remove the object if *any* // exception is thrown. If the object really does not exist (we // get non_existent==1) then this is exactly what we want to do, @@ -78,22 +104,27 @@ TAO_LB_Minimum_Dispersion_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) // something less drastic, or at least strategize it ;-) this->proxies_.remove (proxy); } - // @@ What do we do if the set is empty? - ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), - CORBA::Object::_nil ()); } int TAO_LB_Minimum_Dispersion_Strategy::insert (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + return this->proxies_.insert (proxy); } int TAO_LB_Minimum_Dispersion_Strategy::remove (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + return this->proxies_.remove (proxy); } @@ -101,42 +132,63 @@ void TAO_LB_Minimum_Dispersion_Strategy::load_changed (TAO_LB_ReplicaProxy *proxy, CORBA::Environment &ACE_TRY_ENV) { - // @@ Ossama: more code that is not thread safe - if (this->proxies_.is_empty ()) - return; + int send_load_advisory = 0; - TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); - TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); + { + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); - float s = 0; - CORBA::ULong n = 0; - TAO_LB_ReplicaProxySetIterator i = begin; - for (;i != end; ++i) - { - s += (*i)->current_load (); - n++; - } + if (this->proxies_.is_empty ()) + return; + + TAO_LB_ReplicaProxySetIterator begin = this->proxies_.begin (); + TAO_LB_ReplicaProxySetIterator end = this->proxies_.end (); - float avg = (n == 0 ? s : s / n); - float cl = proxy->current_load (); + float s = 0; + CORBA::ULong n = 0; + TAO_LB_ReplicaProxySetIterator i = begin; + for (;i != end; ++i) + { + s += (*i)->current_load (); + n++; + } - if (avg == 0) - return; + float avg = (n == 0 ? s : s / n); + float cl = proxy->current_load (); - float relative_load = cl / avg; + if (avg == 0) + return; + + float relative_load = cl / avg; + + // @@ Ossama: Make the 1.5 factor adjustable, it is how much + // dispersion we tolerate before starting to send advisories. + if (relative_load > 1 + 1.5F / n) + { + proxy->has_high_load_ = 1; + send_load_advisory = 2; // 2 == Send high load advisory + } + + if (send_load_advisory == 1 && relative_load < 1 + 0.9F / n) + { + proxy->has_high_load_ = 0; + send_load_advisory = 1; // 1 == Send nominal load advisory + } + } // @@ Ossama: no debug messages in production code, my fault.... - ACE_DEBUG ((LM_DEBUG, "Load[%x] %f %f %f\n", - proxy, cl, avg, relative_load)); + // ACE_DEBUG ((LM_DEBUG, "Load[%x] %f %f %f\n", + // proxy, cl, avg, relative_load)); // @@ Ossama: Make the 1.5 factor adjustable, it is how much // dispersion we tolerate before starting to send advisories. - if (relative_load > 1 + 1.5F / n) + if (send_load_advisory == 2) { - proxy->has_high_load_ = 1; proxy->control_->high_load_advisory (ACE_TRY_ENV); ACE_CHECK; - return; + + return; // We may not throw an exception, so explicitly return. } // @@ Ossama: notice that we wait until the load is signifcantly @@ -145,9 +197,8 @@ TAO_LB_Minimum_Dispersion_Strategy::load_changed (TAO_LB_ReplicaProxy *proxy, // rejecting one client.... // @@ Ossama: make the 0.9 factor adjustable, at least at // construction time... - if (proxy->has_high_load_ && relative_load < 1 + 0.9F / n) + if (send_load_advisory == 1) { - proxy->has_high_load_ = 0; proxy->control_->nominal_load_advisory (ACE_TRY_ENV); ACE_CHECK; } diff --git a/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.h b/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.h index 278c96ef1fd..bd78e0acc1f 100644 --- a/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.h +++ b/TAO/orbsvcs/orbsvcs/Minimum_Dispersion.h @@ -23,6 +23,7 @@ #include "LoadBalancing_Strategy.h" #include "orbsvcs/LoadBalancingS.h" #include "ace/Containers.h" +#include "ace/Synch.h" # if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -65,6 +66,10 @@ public: private: TAO_LB_ReplicaProxySet proxies_; // Set containing the ReplicaProxy servants. + + ACE_SYNCH_MUTEX lock_; + // Lock used to ensure atomic access to state retained by this + // class. }; #include "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/ReplicaLocator.cpp b/TAO/orbsvcs/orbsvcs/ReplicaLocator.cpp new file mode 100644 index 00000000000..a3d067290cc --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ReplicaLocator.cpp @@ -0,0 +1,50 @@ +// -*- C++ -*- + +// $Id$ + +#include "ReplicaLocator.h" +#include "LoadBalancer_i.h" + +ACE_RCSID(orbsvcs, ReplicaLocator, "$Id$") + +TAO_LB_ReplicaLocator::TAO_LB_ReplicaLocator (TAO_LB_LoadBalancer *lb) + : load_balancer_ (lb) // This pointer shouldn't be zero! +{ + // Nothing else +} + +PortableServer::Servant +TAO_LB_ReplicaLocator::preinvoke ( + const PortableServer::ObjectId & /* oid */, + PortableServer::POA_ptr /* adapter */, + const char * /* operation */, + PortableServer::ServantLocator::Cookie & /* the_cookie */, + CORBA_Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableServer::ForwardRequest)) +{ + if (this->load_balancer_ == 0) + ACE_THROW_RETURN (CORBA::INTERNAL (), 0); + + CORBA::Object_ptr replica = + this->load_balancer_->replica (ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + // Throw a forward exception to force the client to redirect its + // requests to the Replica chosen by the LoadBalancer. + ACE_THROW_RETURN (PortableServer::ForwardRequest ( + CORBA::Object::_duplicate (replica)), + 0); +} + +void +TAO_LB_ReplicaLocator::postinvoke ( + const PortableServer::ObjectId & /* oid */, + PortableServer::POA_ptr /* adapter */, + const char * /* operation */, + PortableServer::ServantLocator::Cookie /* the_cookie */, + PortableServer::Servant /* the_servant */, + CORBA_Environment & /* ACE_TRY_ENV */) +{ + // No-op +} diff --git a/TAO/orbsvcs/orbsvcs/ReplicaLocator.h b/TAO/orbsvcs/orbsvcs/ReplicaLocator.h new file mode 100644 index 00000000000..4d79e393b98 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ReplicaLocator.h @@ -0,0 +1,72 @@ +// -*- C++ -*- + +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// TAO_LoadBalancing +// +// = FILENAME +// ReplicaLocator.h +// +// = AUTHOR +// Ossama Othman <ossama@uci.edu> +// +// ============================================================================ + +#ifndef TAO_REPLICA_LOCATOR_H +#define TAO_REPLICA_LOCATOR_H + +#include "ace/pre.h" + +#include "orbsvcs/LoadBalancingS.h" +#include "LoadBalancing_export.h" + +# if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +# endif /* ACE_LACKS_PRAGMA_ONCE */ + +// Forward declaration. +class TAO_LB_LoadBalancer; + +class TAO_LoadBalancing_Export TAO_LB_ReplicaLocator + : public virtual POA_PortableServer::ServantLocator +{ + // = TITLE + // Class that provides request forwarding. + + // = DESCRIPTION + // This is a Servant Locator implementation that forwards + // requests to a replica returned by the Load Balancer. + +public: + TAO_LB_ReplicaLocator (TAO_LB_LoadBalancer *load_balancer); + // Constructor + + // = The ServantLocator methods + virtual PortableServer::Servant preinvoke ( + const PortableServer::ObjectId &oid, + PortableServer::POA_ptr adapter, + const char *operation, + PortableServer::ServantLocator::Cookie & the_cookie, + CORBA_Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableServer::ForwardRequest)); + + virtual void postinvoke ( + const PortableServer::ObjectId &oid, + PortableServer::POA_ptr adapter, + const char *operation, + PortableServer::ServantLocator::Cookie the_cookie, + PortableServer::Servant the_servant, + CORBA_Environment &ACE_TRY_ENV); + +private: + TAO_LB_LoadBalancer *load_balancer_; + // The load balancer implementation. +}; + +#include "ace/post.h" + +#endif /* TAO_REPLICA_LOCATOR_H */ diff --git a/TAO/orbsvcs/orbsvcs/ReplicaProxy.cpp b/TAO/orbsvcs/orbsvcs/ReplicaProxy.cpp index 5f3ba7f9c39..9e7bbff3759 100644 --- a/TAO/orbsvcs/orbsvcs/ReplicaProxy.cpp +++ b/TAO/orbsvcs/orbsvcs/ReplicaProxy.cpp @@ -13,6 +13,7 @@ ACE_RCSID(orbsvcs, ReplicaProxy, "$Id$") TAO_LB_ReplicaProxy::TAO_LB_ReplicaProxy (void) : has_high_load_ (0), + lock_ (), balancer_ (0), current_load_ (0), connected_ (0) @@ -28,6 +29,9 @@ TAO_LB_ReplicaProxy::current_load (CORBA::Float load, // @@ Ossama: this is the point were the load dampening should // happen. Probably strategized.... + // Do not lock here. Locking is done in the load_changed() method, + // below. + this->current_load_ = load; // ACE_DEBUG ((LM_DEBUG, "Load[%x] = %f\n", long(this), load)); @@ -39,7 +43,9 @@ TAO_LB_ReplicaProxy::disconnect (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((LoadBalancing::ReplicaProxy::NotConnected, CORBA::SystemException)) { - // @@ Ossama: this code is not thread safe... + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); if (this->connected_) { @@ -54,18 +60,18 @@ TAO_LB_ReplicaProxy::disconnect (CORBA::Environment &ACE_TRY_ENV) } } -void TAO_LB_ReplicaProxy::connect (TAO_LB_LoadBalancer *balancer, - LoadBalancing::ReplicaControl_ptr control, - CORBA::Object_ptr replica, - CORBA::Environment &ACE_TRY_ENV) +void +TAO_LB_ReplicaProxy::connect (TAO_LB_LoadBalancer *balancer, + LoadBalancing::ReplicaControl_ptr control, + CORBA::Object_ptr replica, + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((LoadBalancing::ReplicaProxy::NilControl, LoadBalancing::ReplicaProxy::NilReplica, CORBA::SystemException)) { - // @@ Ossama: this is a perfect example of code that is not thread - // safe: what if we get a 'current_load' message in another thread? - // Or a disconnect() message from a misbehaving replica? Or two - // calls to connect? + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); if (balancer == 0) ACE_THROW (CORBA::BAD_PARAM ( diff --git a/TAO/orbsvcs/orbsvcs/ReplicaProxy.h b/TAO/orbsvcs/orbsvcs/ReplicaProxy.h index 34c11aa8e54..68e8245a2b1 100644 --- a/TAO/orbsvcs/orbsvcs/ReplicaProxy.h +++ b/TAO/orbsvcs/orbsvcs/ReplicaProxy.h @@ -20,6 +20,7 @@ #include "ace/pre.h" +#include "ace/Synch.h" #include "orbsvcs/LoadBalancingS.h" #include "LoadBalancing_export.h" @@ -100,6 +101,10 @@ private: // with the ReplicaProxy. private: + + ACE_SYNCH_MUTEX lock_; + // Mutex used to ensure access ReplicaProxy state is atomic. + CORBA::Object_var replica_; // Reference to the Object being load balanced. diff --git a/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.cpp index 55cb5fef9d6..b537a3d572c 100644 --- a/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.cpp @@ -11,12 +11,17 @@ ACE_RCSID(orbsvcs, Round_Robin_Strategy, "$Id$") TAO_LB_Round_Robin_Strategy::TAO_LB_Round_Robin_Strategy (void) : proxies_ (), - next_replica_ (this->proxies_.begin ()) + next_replica_ (this->proxies_.begin ()), + lock_ () { } TAO_LB_Round_Robin_Strategy::~TAO_LB_Round_Robin_Strategy (void) { + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, + guard, + this->lock_)); + // @@ Are the objects deactivated from the POA? And shouldn't this // be done by the LoadBalancing strategy *before* the destructor is // invoked? @@ -37,7 +42,11 @@ CORBA::Object_ptr TAO_LB_Round_Robin_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ Ossama: more code that is not thread safe + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + CORBA::Object::_nil ()); + if (this->proxies_.is_empty ()) { // @@ What do we do if the set is empty? @@ -77,7 +86,11 @@ TAO_LB_Round_Robin_Strategy::replica (CORBA::Environment &ACE_TRY_ENV) int TAO_LB_Round_Robin_Strategy::insert (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + int r = this->proxies_.insert (proxy); this->next_replica_ = this->proxies_.begin (); return r; @@ -86,7 +99,11 @@ TAO_LB_Round_Robin_Strategy::insert (TAO_LB_ReplicaProxy *proxy) int TAO_LB_Round_Robin_Strategy::remove (TAO_LB_ReplicaProxy *proxy) { - // @@ Ossama: more code that is not thread safe + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->lock_, + -1)); + int r = this->proxies_.remove (proxy); this->next_replica_ = this->proxies_.begin (); return r; diff --git a/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.h b/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.h index e8534da4d0d..8abedbc2edb 100644 --- a/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/Round_Robin_Strategy.h @@ -23,6 +23,7 @@ #include "LoadBalancing_Strategy.h" #include "orbsvcs/LoadBalancingS.h" #include "ace/Containers.h" +#include "ace/Synch.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -69,6 +70,9 @@ private: // Set containing the ReplicaProxy servants. ACE_Unbounded_Set_Iterator<TAO_LB_ReplicaProxy *> next_replica_; + + ACE_SYNCH_MUTEX lock_; + // Lock used to ensure access to state within this class is atomic. }; #include "ace/post.h" |