diff options
Diffstat (limited to 'TAO/tao/Transport_Cache_Manager.cpp')
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.cpp | 207 |
1 files changed, 155 insertions, 52 deletions
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp index 30b54ed2744..6e89c48a75e 100644 --- a/TAO/tao/Transport_Cache_Manager.cpp +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -6,7 +6,8 @@ #include "tao/ORB_Core.h" #include "tao/Resource_Factory.h" #include "tao/Connection_Purging_Strategy.h" - +#include "tao/Condition.h" +#include "ace/Synch_T.h" #include "ace/Handle_Set.h" #if !defined (__ACE_INLINE__) @@ -18,23 +19,65 @@ ACE_RCSID (TAO, Transport_Cache_Manager, "$Id$") + TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core) : percent_ (orb_core.resource_factory ()->purge_percentage ()), purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()), cache_map_ (), - cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ()) + condition_ (0), + cache_lock_ (0), + muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()), + no_waiting_threads_ (0), + last_entry_returned_ (0) { + if (orb_core.resource_factory ()->locked_transport_cache ()) + { + ACE_NEW (this->condition_, + TAO_Condition <TAO_SYNCH_MUTEX>); + + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ())); + } + else + { + /// If the cache is not going to be locked then dont create a + /// condition variable. Make the <muxed_number_> to 0, else a + /// single thread could get into waiting mode + this->muxed_number_ = 0; + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>); + } } TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void) { + // Wakeup all the waiting threads threads before we shutdown stuff + if (this->no_waiting_threads_) + this->condition_->broadcast (); + // Delete the lock that we have - delete this->cache_lock_; + if (this->cache_lock_) + { + delete this->cache_lock_; + this->cache_lock_ = 0; + } // Delete the purging strategy - delete this->purging_strategy_; + if (this->purging_strategy_) + { + delete this->purging_strategy_; + this->purging_strategy_ = 0; + } + + // Delete the condition variable + if (this->condition_) + { + delete this->condition_; + this->condition_ = 0; + } } + int TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, TAO_Cache_IntId &int_id) @@ -65,8 +108,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, { // The entry has been added to cache succesfully // Add the cache_map_entry to the transport - int_id.transport () ->cache_map_entry (entry); - + int_id.transport ()->cache_map_entry (entry); } else if (retval == 1) { @@ -100,13 +142,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, return retval; } -// Used to be in the .inl, but moved here b/c the use of -// TAO_Transport::_duplicate() caused an include file cycle with -// inlining turned on. int TAO_Transport_Cache_Manager::find_transport ( - TAO_Transport_Descriptor_Interface *prop, - TAO_Transport *&transport) + TAO_Transport_Descriptor_Interface *prop, + TAO_Transport *&transport) { if (prop == 0) { @@ -129,6 +168,29 @@ TAO_Transport_Cache_Manager::find_transport ( } int +TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key, + TAO_Cache_IntId &value) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->cache_lock_, + -1)); + + int status = this->find_i (key, + value); + + if (status == 0) + { + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (value.transport ()); + } + return status; +} + + + +int TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value) { @@ -137,20 +199,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, // Get the entry from the Hash Map int retval = 0; - // Get the index of the <key> - int index = key.index (); - // Make a temporary object. It does not do a copy. TAO_Cache_ExtId tmp_key (key.property ()); while (retval == 0) { + // Wait for a connection.. + this->wait_for_connection (tmp_key); + // Look for an entry in the map retval = this->cache_map_.find (tmp_key, entry); - // We have an entry in the map, check whether it is idle. If it - // is idle it would be marked as busy. + // We have an entry in the map, check whether it is idle. if (entry) { CORBA::Boolean idle = @@ -176,7 +237,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, } // Bump the index up - tmp_key.index (++index); + tmp_key.incr_index (); } // If we are here then it is an error @@ -191,32 +252,13 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, } int -TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value) -{ - return this->cache_map_.rebind (key, - value); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key) -{ - return this->cache_map_.unbind (key); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - return this->cache_map_.unbind (key, - value); -} - -int TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) { // First get the entry again (if at all things had changed in the // cache map in the mean time) + + // @todo: Is this required? Looks like a legacy one.. + HASH_MAP_ENTRY *new_entry = 0; int retval = this->cache_map_.find (entry->ext_id_, new_entry); @@ -227,6 +269,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE); entry = new_entry; + + // Does any one need waking? + if (this->no_waiting_threads_) + { + // We returned this entry to the map + this->last_entry_returned_ = &new_entry->ext_id_; + + // Wake up a thread + this->condition_->signal (); + } } else if (TAO_debug_level > 0 && retval != 0) { @@ -252,23 +304,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered, if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED) { -#if 0 - // @@ This code from Connection_Cache_Manager disappeared - // during the changeover; we need the functional equivalent back. - // The problem is that with the locking stuff that we're putting - // in to the Transport, we might want to encapsulate the whole - // exercise of adding to the handle set in a method on the transport - // rather than doing it here. That way, the locking is correct. - if ((*iter).int_id_.handler ()->is_registered ()) - { - reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ()); - } -#else // Get the transport to fill its associated connection's handle in // the handle sets. (*iter).int_id_.transport ()->provide_handle (reactor_registered, unregistered); -#endif // Inform the transport that has a reference to the entry in the // map that we are *gone* now. So, the transport should not use // the reference to the entry that he has, to acces us *at any @@ -497,6 +536,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set, sorted_set = 0; } +int +TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid) +{ + if (this->muxed_number_ && + this->muxed_number_ == extid.index ()) + { + // If we have a limit on the number of muxed connections for + // a particular endpoint just wait to get the connection + ++this->no_waiting_threads_; + + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Going to wait for connections.. \n")); + + int ready_togo = 0; + + while (ready_togo == 0) + { + this->condition_->wait (); + + // Check whether we are waiting for this connection + ready_togo = this->is_wakeup_useful (extid); + } + + // We are not waiting anymore + --this->no_waiting_threads_; + } + + return 0; +} + +int +TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid) +{ + // Get the underlying property that we are looking for + TAO_Transport_Descriptor_Interface *prop = extid.property (); + + // Just check the underlying property for equivalence. If the last + // connection that was returned had the same property just return + // 1. + if (this->last_entry_returned_ && + prop->is_equivalent (this->last_entry_returned_->property ())) + { + // Set the index to be right so that we can pick teh connection + // right away.. + extid.index (this->last_entry_returned_->index ()); + + // There is no more use for it ... + this->last_entry_returned_ = 0; + + return 1; + } + + // If there is an entry that was returned and if there are more + // threads just wake up the peer to check for the returned + // connection. + if (this->last_entry_returned_ && + this->no_waiting_threads_ > 1) + { + this->condition_->signal (); + } + + return 0; +} #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) |