From 2347c840e0e9faeae9671dac0942a7b5bf0383c8 Mon Sep 17 00:00:00 2001 From: bala Date: Thu, 31 Jan 2002 20:25:36 +0000 Subject: ChangeLogTag: Thu Jan 31 11:23:59 2002 Balachandran Natarajan --- TAO/tao/Cache_Entries.cpp | 15 +-- TAO/tao/Cache_Entries.h | 3 + TAO/tao/Cache_Entries.inl | 6 ++ TAO/tao/ChangeLog | 64 +++++++++++ TAO/tao/Condition.cpp | 56 ++++++++++ TAO/tao/Condition.h | 115 ++++++++++++++++++++ TAO/tao/Condition.inl | 50 +++++++++ TAO/tao/Resource_Factory.cpp | 14 +++ TAO/tao/Resource_Factory.h | 10 ++ TAO/tao/Transport_Cache_Manager.cpp | 207 +++++++++++++++++++++++++++--------- TAO/tao/Transport_Cache_Manager.h | 86 +++++++-------- TAO/tao/Transport_Cache_Manager.inl | 57 ---------- TAO/tao/default_resource.cpp | 29 +++++ TAO/tao/default_resource.h | 7 ++ 14 files changed, 560 insertions(+), 159 deletions(-) create mode 100644 TAO/tao/ChangeLog create mode 100644 TAO/tao/Condition.cpp create mode 100644 TAO/tao/Condition.h create mode 100644 TAO/tao/Condition.inl diff --git a/TAO/tao/Cache_Entries.cpp b/TAO/tao/Cache_Entries.cpp index 9c7e17f0273..df610a378f6 100644 --- a/TAO/tao/Cache_Entries.cpp +++ b/TAO/tao/Cache_Entries.cpp @@ -24,13 +24,14 @@ TAO_Cache_IntId::~TAO_Cache_IntId (void) TAO_Cache_IntId& TAO_Cache_IntId::operator= (const TAO_Cache_IntId &rhs) { - if (this != &rhs) { - this->recycle_state_ = rhs.recycle_state_; + if (this != &rhs) + { + this->recycle_state_ = rhs.recycle_state_; + + TAO_Transport* old_transport = this->transport_; + this->transport_ = TAO_Transport::_duplicate (rhs.transport_); + TAO_Transport::release (old_transport); + } - TAO_Transport* old_transport = this->transport_; - this->transport_ = TAO_Transport::_duplicate (rhs.transport_); - TAO_Transport::release (old_transport); - } return *this; } - diff --git a/TAO/tao/Cache_Entries.h b/TAO/tao/Cache_Entries.h index d1a2464d426..a51e504378d 100644 --- a/TAO/tao/Cache_Entries.h +++ b/TAO/tao/Cache_Entries.h @@ -156,6 +156,9 @@ public: /// but for the TAO_Transport_Cache_Manager class. void index (CORBA::ULong index); + /// Increment the index value + void incr_index (void); + // = Accessors /// Get the underlying the property pointer TAO_Transport_Descriptor_Interface *property (void) const; diff --git a/TAO/tao/Cache_Entries.inl b/TAO/tao/Cache_Entries.inl index 1e6bbc118fb..ec19b1ed1e1 100644 --- a/TAO/tao/Cache_Entries.inl +++ b/TAO/tao/Cache_Entries.inl @@ -180,6 +180,12 @@ TAO_Cache_ExtId::index (CORBA::ULong index) this->index_ = index; } +ACE_INLINE void +TAO_Cache_ExtId::incr_index (void) +{ + ++this->index_; +} + ACE_INLINE TAO_Transport_Descriptor_Interface * TAO_Cache_ExtId::property (void) const { diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog new file mode 100644 index 00000000000..1b94ce800d9 --- /dev/null +++ b/TAO/tao/ChangeLog @@ -0,0 +1,64 @@ +Thu Jan 31 11:23:59 2002 Balachandran Natarajan + + * tao/Cache_Entries.cpp: Did a cosmetic fix. + + * tao/Cache_Entries.h: + * tao/Cache_Entries.inl: Added a new method incr_index () to the + TAO_CacheExtId class. + + * tao/Resource_Factory.cpp: + * tao/Resource_Factory.h:Added two new methods, + max_muxed_connections () and locked_transport_cache (). The + former returns the number of user specified muxed connections + with a particular property. The latter returns a boolean value + to indicate whether the transport cache needs to have a lock or + not . + + * tao/default_resource.cpp: + * tao/default_resource.h: Concrete implementations for the methods + declared in Resource_Factory. + + * tao/Transport_Cache_Manager.cpp: + * tao/Transport_Cache_Manager.h: + * tao/Transport_Cache_Manager.inl: Added support for counted muxed + connections ie. the user can limit the number of remote + connections (with a particular property). If the thread + searching the cache for a free connection doesnt find one that + is free, the thread will wait on a condition variable for the + connection to be released. To accomodate this the following set + of changes were made + + - Added a condition variable to the class + + - Create the type of lock that needs to be used for the cache + locally and use that lock to create the condition variable. If + the cache is free of any locks we dont create a condition + variable. + + - The creation of lock has been moved to the + Transport_Cache_Manager from the Resource_Factory. + + - Added two new methods, wait_for_connection () and + is_wakeup_useful (). The wait_for_connection () blocks the + thread searching for connection, if there is a limit on the + number of muxed connections and if there have been enough + connections created with the same property. The + is_wakeup_useful () method is called by an unblocked thread to + check whether the transport that was returned is the one the + thread was waiting for. + + - Removed a version of rebind () and a couple of version of + unbind () calls as they were not used. + + + * tao/Condition.h: + * tao/Condition.cpp:. + * tao/Condition.inl: A simple wrapper that wraps a + TAO_SYNCH_CONDITION that can be used with different types of + Mutex classes. + + + + + + diff --git a/TAO/tao/Condition.cpp b/TAO/tao/Condition.cpp new file mode 100644 index 00000000000..d754007a77d --- /dev/null +++ b/TAO/tao/Condition.cpp @@ -0,0 +1,56 @@ +#include "Condition.h" + + +#if !defined (__ACE_INLINE__) +# include "tao/Condition.inl" +#endif /* __ACE_INLINE__ */ + + +ACE_RCSID (TAO, + Condition, + "$Id$") + +template +TAO_Condition::TAO_Condition (MUTEX &m) + + : mutex_ (&m), + delete_lock_ (0), + cond_ (0) +{ + // @@todo: Need to add the allocatore here.. + ACE_NEW (this->cond_, + TAO_SYNCH_CONDITION (*this->mutex_)); +} + +template +TAO_Condition::TAO_Condition (void) + : mutex_ (0), + delete_lock_ (0), + cond_ (0) + +{ + // @@todo: Need to add the allocatore here.. + + ACE_NEW (this->mutex_, + MUTEX); + + this->delete_lock_ = 1; + + ACE_NEW (this->cond_, + TAO_SYNCH_CONDITION (*this->mutex_)); +} + + +template +TAO_Condition::~TAO_Condition (void) +{ + if (this->remove () == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("TAO_Condition::~TAO_Condition"))); + + delete this->cond_; + + if (this->delete_lock_) + delete this->mutex_; +} diff --git a/TAO/tao/Condition.h b/TAO/tao/Condition.h new file mode 100644 index 00000000000..b9b85a4c817 --- /dev/null +++ b/TAO/tao/Condition.h @@ -0,0 +1,115 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file Condition.h + * + * $Id$ + * + * @author Douglas C. Schmidt + */ +//============================================================================= + +#ifndef TAO_CONDITION_H +#define TAO_CONDITION_H +#include "ace/pre.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + + +/** + * @class TAO_Condition + * + * @brief Same as to the ACE_Condition variable wrapper + * + * This class differs from ACE_Condition in that it uses a + * TAO_SYNCH_CONDITION instead of ACE_cond_t under the hood to + * provide blocking. + */ +template +class TAO_Condition +{ +public: + + /// Useful typedef + typedef MUTEX LOCK; + + // = Initialiation and termination methods. + /// Initialize the condition variable. + TAO_Condition (MUTEX &m); + + /// A default constructor. Since no lock is provided by the user, + /// one will be created internally. + TAO_Condition (void); + + /// Implicitly destroy the condition variable. + ~TAO_Condition (void); + + // = Lock accessors. + /** + * Block on condition, or until absolute time-of-day has passed. If + * abstime == 0 use "blocking" semantics. Else, if + * != 0 and the call times out before the condition is signaled + * returns -1 and sets errno to ETIME. + */ + int wait (const ACE_Time_Value *abstime); + + /// Block on condition. + int wait (void); + + /** + * Block on condition or until absolute time-of-day has passed. If + * abstime == 0 use "blocking" wait() semantics on the + * passed as a parameter (this is useful if you need to store the + * in shared memory). Else, if != 0 and the + * call times out before the condition is signaled returns -1 + * and sets errno to ETIME. + */ + int wait (MUTEX &mutex, const ACE_Time_Value *abstime = 0); + + /// Signal one waiting thread. + int signal (void); + + /// Signal *all* waiting threads. + int broadcast (void); + + // = Utility methods. + /// Explicitly destroy the condition variable. + int remove (void); + + /// Returns a reference to the underlying mutex_; + MUTEX *mutex (void); + +private: + + /// Reference to mutex lock. + MUTEX *mutex_; + + /// A flag to indicate whether the lock needs to be deleted. + int delete_lock_; + + /// Condition variable. + TAO_SYNCH_CONDITION *cond_; + + // = Prevent assignment and initialization. + ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Condition &)) + ACE_UNIMPLEMENTED_FUNC (TAO_Condition (const TAO_Condition &)) +}; + +#if defined (__ACE_INLINE__) +#include "Condition.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Condition.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Condition.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /*TAO_CONDITION_H*/ diff --git a/TAO/tao/Condition.inl b/TAO/tao/Condition.inl new file mode 100644 index 00000000000..089bc0d54be --- /dev/null +++ b/TAO/tao/Condition.inl @@ -0,0 +1,50 @@ +/* -*- C++ -*- */ +//$Id$ +template ACE_INLINE int +TAO_Condition::wait (void) +{ + return this->cond_->wait (); +} + +template ACE_INLINE int +TAO_Condition::wait (MUTEX &mutex, + const ACE_Time_Value *abstime) +{ + return this->cond_->wait (mutex, + abstime); +} + +// Peform an "alertable" timed wait. If the argument ABSTIME == 0 +// then we do a regular cond_wait(), else we do a timed wait for up to +// ABSTIME using the Solaris cond_timedwait() function. + +template ACE_INLINE int +TAO_Condition::wait (const ACE_Time_Value *abstime) +{ + return this->wait (this->mutex_, abstime); +} + +template ACE_INLINE int +TAO_Condition::remove (void) +{ + this->cond_->remove (); +} + +template ACE_INLINE MUTEX * +TAO_Condition::mutex (void) +{ + // ACE_TRACE ("ACE_Condition::mutex"); + return this->mutex_; +} + +template ACE_INLINE int +TAO_Condition::signal (void) +{ + return this->cond_->signal (); +} + +template ACE_INLINE int +TAO_Condition::broadcast (void) +{ + return this->cond_->broadcast (); +} diff --git a/TAO/tao/Resource_Factory.cpp b/TAO/tao/Resource_Factory.cpp index 62795db02ba..9964e39bdb8 100644 --- a/TAO/tao/Resource_Factory.cpp +++ b/TAO/tao/Resource_Factory.cpp @@ -161,6 +161,13 @@ TAO_Resource_Factory::purge_percentage (void) const return 0; } +int +TAO_Resource_Factory::max_muxed_connections (void) const +{ + return 0; +} + + int TAO_Resource_Factory::get_parser_names (char **&, int &) @@ -174,6 +181,13 @@ TAO_Resource_Factory::create_cached_connection_lock (void) return 0; } +int +TAO_Resource_Factory::locked_transport_cache (void) +{ + return 0; +} + + int TAO_Resource_Factory::load_default_protocols (void) { diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h index 9b4694a4d66..470971e6ff1 100644 --- a/TAO/tao/Resource_Factory.h +++ b/TAO/tao/Resource_Factory.h @@ -178,12 +178,22 @@ public: /// cache. virtual int purge_percentage (void) const; + /// Return the number of muxed connections that are allowed for a + /// remote endpoint + virtual int max_muxed_connections (void) const; + virtual int get_parser_names (char **&names, int &number_of_names); /// Creates the lock for the lock needed in the Cache Map + /// @@todo: This method needs to go away as it doesnt make much + /// sense now. virtual ACE_Lock *create_cached_connection_lock (void); + /// Should the transport cache have a lock or not? Return 1 if the + /// transport cache needs to be locked else return 0 + virtual int locked_transport_cache (void); + /// Creates the flushing strategy. The new instance is owned by the /// caller. virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0; diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp index 30b54ed2744..6e89c48a75e 100644 --- a/TAO/tao/Transport_Cache_Manager.cpp +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -6,7 +6,8 @@ #include "tao/ORB_Core.h" #include "tao/Resource_Factory.h" #include "tao/Connection_Purging_Strategy.h" - +#include "tao/Condition.h" +#include "ace/Synch_T.h" #include "ace/Handle_Set.h" #if !defined (__ACE_INLINE__) @@ -18,23 +19,65 @@ ACE_RCSID (TAO, Transport_Cache_Manager, "$Id$") + TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core) : percent_ (orb_core.resource_factory ()->purge_percentage ()), purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()), cache_map_ (), - cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ()) + condition_ (0), + cache_lock_ (0), + muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()), + no_waiting_threads_ (0), + last_entry_returned_ (0) { + if (orb_core.resource_factory ()->locked_transport_cache ()) + { + ACE_NEW (this->condition_, + TAO_Condition ); + + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter (*this->condition_->mutex ())); + } + else + { + /// If the cache is not going to be locked then dont create a + /// condition variable. Make the to 0, else a + /// single thread could get into waiting mode + this->muxed_number_ = 0; + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter); + } } TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void) { + // Wakeup all the waiting threads threads before we shutdown stuff + if (this->no_waiting_threads_) + this->condition_->broadcast (); + // Delete the lock that we have - delete this->cache_lock_; + if (this->cache_lock_) + { + delete this->cache_lock_; + this->cache_lock_ = 0; + } // Delete the purging strategy - delete this->purging_strategy_; + if (this->purging_strategy_) + { + delete this->purging_strategy_; + this->purging_strategy_ = 0; + } + + // Delete the condition variable + if (this->condition_) + { + delete this->condition_; + this->condition_ = 0; + } } + int TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, TAO_Cache_IntId &int_id) @@ -65,8 +108,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, { // The entry has been added to cache succesfully // Add the cache_map_entry to the transport - int_id.transport () ->cache_map_entry (entry); - + int_id.transport ()->cache_map_entry (entry); } else if (retval == 1) { @@ -100,13 +142,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, return retval; } -// Used to be in the .inl, but moved here b/c the use of -// TAO_Transport::_duplicate() caused an include file cycle with -// inlining turned on. int TAO_Transport_Cache_Manager::find_transport ( - TAO_Transport_Descriptor_Interface *prop, - TAO_Transport *&transport) + TAO_Transport_Descriptor_Interface *prop, + TAO_Transport *&transport) { if (prop == 0) { @@ -128,6 +167,29 @@ TAO_Transport_Cache_Manager::find_transport ( return retval; } +int +TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key, + TAO_Cache_IntId &value) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->cache_lock_, + -1)); + + int status = this->find_i (key, + value); + + if (status == 0) + { + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (value.transport ()); + } + return status; +} + + + int TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value) @@ -137,20 +199,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, // Get the entry from the Hash Map int retval = 0; - // Get the index of the - int index = key.index (); - // Make a temporary object. It does not do a copy. TAO_Cache_ExtId tmp_key (key.property ()); while (retval == 0) { + // Wait for a connection.. + this->wait_for_connection (tmp_key); + // Look for an entry in the map retval = this->cache_map_.find (tmp_key, entry); - // We have an entry in the map, check whether it is idle. If it - // is idle it would be marked as busy. + // We have an entry in the map, check whether it is idle. if (entry) { CORBA::Boolean idle = @@ -176,7 +237,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, } // Bump the index up - tmp_key.index (++index); + tmp_key.incr_index (); } // If we are here then it is an error @@ -190,33 +251,14 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, return retval; } -int -TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value) -{ - return this->cache_map_.rebind (key, - value); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key) -{ - return this->cache_map_.unbind (key); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - return this->cache_map_.unbind (key, - value); -} - int TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) { // First get the entry again (if at all things had changed in the // cache map in the mean time) + + // @todo: Is this required? Looks like a legacy one.. + HASH_MAP_ENTRY *new_entry = 0; int retval = this->cache_map_.find (entry->ext_id_, new_entry); @@ -227,6 +269,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE); entry = new_entry; + + // Does any one need waking? + if (this->no_waiting_threads_) + { + // We returned this entry to the map + this->last_entry_returned_ = &new_entry->ext_id_; + + // Wake up a thread + this->condition_->signal (); + } } else if (TAO_debug_level > 0 && retval != 0) { @@ -252,23 +304,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered, if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED) { -#if 0 - // @@ This code from Connection_Cache_Manager disappeared - // during the changeover; we need the functional equivalent back. - // The problem is that with the locking stuff that we're putting - // in to the Transport, we might want to encapsulate the whole - // exercise of adding to the handle set in a method on the transport - // rather than doing it here. That way, the locking is correct. - if ((*iter).int_id_.handler ()->is_registered ()) - { - reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ()); - } -#else // Get the transport to fill its associated connection's handle in // the handle sets. (*iter).int_id_.transport ()->provide_handle (reactor_registered, unregistered); -#endif // Inform the transport that has a reference to the entry in the // map that we are *gone* now. So, the transport should not use // the reference to the entry that he has, to acces us *at any @@ -497,6 +536,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set, sorted_set = 0; } +int +TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid) +{ + if (this->muxed_number_ && + this->muxed_number_ == extid.index ()) + { + // If we have a limit on the number of muxed connections for + // a particular endpoint just wait to get the connection + ++this->no_waiting_threads_; + + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Going to wait for connections.. \n")); + + int ready_togo = 0; + + while (ready_togo == 0) + { + this->condition_->wait (); + + // Check whether we are waiting for this connection + ready_togo = this->is_wakeup_useful (extid); + } + + // We are not waiting anymore + --this->no_waiting_threads_; + } + + return 0; +} + +int +TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid) +{ + // Get the underlying property that we are looking for + TAO_Transport_Descriptor_Interface *prop = extid.property (); + + // Just check the underlying property for equivalence. If the last + // connection that was returned had the same property just return + // 1. + if (this->last_entry_returned_ && + prop->is_equivalent (this->last_entry_returned_->property ())) + { + // Set the index to be right so that we can pick teh connection + // right away.. + extid.index (this->last_entry_returned_->index ()); + + // There is no more use for it ... + this->last_entry_returned_ = 0; + + return 1; + } + + // If there is an entry that was returned and if there are more + // threads just wake up the peer to check for the returned + // connection. + if (this->last_entry_returned_ && + this->no_waiting_threads_ > 1) + { + this->condition_->signal (); + } + + return 0; +} #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/Transport_Cache_Manager.h b/TAO/tao/Transport_Cache_Manager.h index 818bc643a42..c2fa52aa903 100644 --- a/TAO/tao/Transport_Cache_Manager.h +++ b/TAO/tao/Transport_Cache_Manager.h @@ -15,16 +15,15 @@ #define TAO_CONNECTION_CACHE_MANAGER_H #include "ace/pre.h" -#include "ace/Hash_Map_Manager_T.h" +#include "tao/Cache_Entries.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) #define ACE_LACKS_PRAGMA_ONCE #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "ace/Hash_Map_Manager_T.h" +#include "ace/Synch_T.h" -#include "tao/TAO_Export.h" -#include "tao/Cache_Entries.h" - -#include "tao/Connection_Purging_Strategy.h" #if defined(_MSC_VER) #if (_MSC_VER >= 1200) @@ -36,7 +35,9 @@ class TAO_ORB_Core; class ACE_Handle_Set; class TAO_Resource_Factory; +class TAO_Connection_Purging_Strategy; +template class TAO_Condition; typedef ACE_Unbounded_Set TAO_EventHandlerSet; typedef ACE_Unbounded_Set_Iterator @@ -46,22 +47,18 @@ typedef ACE_Unbounded_Set_Iterator * @class TAO_Transport_Cache_Manager * * @brief The Transport Cache Manager for TAO + * + * This class provides interfaces associating a TAO_Cache_ExtId + * & TAO_Cache_IntId. This class is wrapper around the + * ACE_Hash_Map_Manager class which is used as a container to Cache + * the connections. This class protects the entries with a lock. The + * map is updated only by holding the lock. The more compelling reason + * to have the lock in this class and not in the Hash_Map is that, we + * do quite a bit of work in this class for which we need a lock. + * */ class TAO_Export TAO_Transport_Cache_Manager { - - // = DESCRIPTION - // This class provides interfaces associating a TAO_Cache_ExtId - // & TAO_Cache_IntId. This class manages a ACE_Hash_Map_Manager - // class which is used as a container to Cache the - // connections. This class protects the entries with a lock. The - // map can be updated only by holding the lock. - - // General Note: This class at present has an interface that may - // not be needed. But, the interface has just been copied from - // the ACE Hash Map Manager classes. The interface wold be - // pruned once I get the purging stuff also in. Till then let - // the interface be there as it is. public: // Some useful typedef's @@ -79,6 +76,8 @@ public: TAO_Cache_IntId> HASH_MAP_ENTRY; + typedef TAO_Condition CONDITION; + // == Public methods /// Constructor @@ -133,19 +132,6 @@ private: int find (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value); - /// Reassociate the with . Grabs the lock and calls the - /// implementation function find_i. - int rebind (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value); - - /// Remove from the cache. - int unbind (const TAO_Cache_ExtId &key); - - /// Remove from the cache, and return the associated with - /// . - int unbind (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value); - /** * Non-Locking version and actual implementation of bind () * call. Calls bind on the Hash_Map_Manager that it holds. If the @@ -166,17 +152,6 @@ private: int find_i (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value); - /// Non-locking version and actual implementation of rebind () call - int rebind_i (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value); - - /// Non-locking version and actual implementation of unbind () call - int unbind_i (const TAO_Cache_ExtId &key); - - /// Non-locking version and actual implementation of unbind () call - int unbind_i (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value); - /// Non-locking version and actual implementation of make_idle (). int make_idle_i (HASH_MAP_ENTRY *&entry); @@ -229,7 +204,17 @@ private: /// the required number of items in the set. void close_entries (DESCRIPTOR_SET& sorted_set, int size); + /// Wait for connections if we have reached the limit on the number + /// of muxed connections. If not (ie. if we dont use a muxed + /// connection or if we have not reached the limit) this just + /// behaves as a no-op. has all the information about the + /// connection that is being searched. + int wait_for_connection (TAO_Cache_ExtId &extid); + + /// Is the wakeup useful todo some work? + int is_wakeup_useful (TAO_Cache_ExtId &extid); private: + /// The percentage of the cache to purge at one time int percent_; @@ -239,8 +224,23 @@ private: /// The hash map that has the connections HASH_MAP cache_map_; - /// Lock for the map + /// The condition variable + CONDITION *condition_; + + /// The lock that is used by the cache map ACE_Lock *cache_lock_; + + /// Number of allowed muxed connections + CORBA::ULong muxed_number_; + + /// Number of threads waiting for connections + int no_waiting_threads_; + + /// This is for optimization purposes. In a situation where number + /// of threads are waiting for connections, the last connection that + /// is put back is cached here. This should prevent all th threads + /// trying to search for their required entry. + TAO_Cache_ExtId *last_entry_returned_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Transport_Cache_Manager.inl b/TAO/tao/Transport_Cache_Manager.inl index 0840e7961c9..2f44d1d2543 100644 --- a/TAO/tao/Transport_Cache_Manager.inl +++ b/TAO/tao/Transport_Cache_Manager.inl @@ -15,26 +15,7 @@ TAO_Transport_Cache_Manager::bind (TAO_Cache_ExtId &ext_id, } -ACE_INLINE int -TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - int status = this->find_i (key, - value); - - if (status == 0) - { - // Update the purging strategy information while we - // are holding our lock - this->purging_strategy_->update_item (value.transport ()); - } - return status; -} ACE_INLINE int @@ -51,44 +32,6 @@ TAO_Transport_Cache_Manager::cache_transport ( } -ACE_INLINE int -TAO_Transport_Cache_Manager::rebind (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->rebind_i (key, - value); -} - -ACE_INLINE int -TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->unbind_i (key); -} - -ACE_INLINE int -TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->unbind_i (key, - value); -} - - ACE_INLINE int TAO_Transport_Cache_Manager::purge (void) { diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp index fb23a7c068f..ffecd31ed47 100644 --- a/TAO/tao/default_resource.cpp +++ b/TAO/tao/default_resource.cpp @@ -40,6 +40,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void) connection_caching_type_ (TAO_CONNECTION_CACHING_STRATEGY), cache_maximum_ (TAO_CONNECTION_CACHE_MAXIMUM), purge_percentage_ (TAO_PURGE_PERCENT), + max_muxed_connections_ (0), reactor_mask_signals_ (1), dynamically_allocated_reactor_ (0), options_processed_ (0), @@ -351,6 +352,17 @@ TAO_Default_Resource_Factory::init (int argc, char *argv[]) this->report_option_value_error ("-ORBFlushingStrategy", name); } } + else if (ACE_OS::strcasecmp (argv[curarg], + "-ORBMuxedConnectionMax") == 0) + { + curarg++; + if (curarg < argc) + this->max_muxed_connections_ = + ACE_OS::atoi (argv[curarg]); + else + this->report_option_value_error ("-ORBMuxedConnectionMax", + argv[curarg]); + } else if (ACE_OS::strncmp (argv[curarg], "-ORB", 4) == 0) { // Can we assume there is an argument after the option? @@ -738,6 +750,13 @@ TAO_Default_Resource_Factory::purge_percentage (void) const return this->purge_percentage_; } +int +TAO_Default_Resource_Factory::max_muxed_connections (void) const +{ + return this->max_muxed_connections_; +} + + ACE_Lock * TAO_Default_Resource_Factory::create_cached_connection_lock (void) { @@ -755,6 +774,16 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void) return the_lock; } +int +TAO_Default_Resource_Factory::locked_transport_cache (void) +{ + if (this->cached_connection_lock_type_ == TAO_NULL_LOCK) + return 0; + + return 1; +} + + TAO_Flushing_Strategy * TAO_Default_Resource_Factory::create_flushing_strategy (void) { diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h index e7097046bef..ee71341e4b8 100644 --- a/TAO/tao/default_resource.h +++ b/TAO/tao/default_resource.h @@ -116,7 +116,9 @@ public: virtual TAO_Resource_Factory::Caching_Strategy connection_caching_strategy_type (void) const; virtual int cache_maximum (void) const; virtual int purge_percentage (void) const; + virtual int max_muxed_connections (void) const; virtual ACE_Lock *create_cached_connection_lock (void); + virtual int locked_transport_cache (void); virtual TAO_Flushing_Strategy *create_flushing_strategy (void); virtual TAO_Connection_Purging_Strategy *create_purging_strategy (void); virtual TAO_LF_Strategy *create_lf_strategy (void); @@ -169,6 +171,11 @@ protected: /// demand. int purge_percentage_; + /// Specifies the limit on the number of muxed connections + /// allowed per-property for the ORB. A value of 0 indicates no + /// limit + size_t max_muxed_connections_; + /// If <0> then we create reactors with signal handling disabled. int reactor_mask_signals_; -- cgit v1.2.1