diff options
Diffstat (limited to 'TAO/tao/Transport_Cache_Manager.cpp')
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.cpp | 723 |
1 files changed, 723 insertions, 0 deletions
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp new file mode 100644 index 00000000000..de734fc9975 --- /dev/null +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -0,0 +1,723 @@ +//$Id$ +#include "tao/Transport_Cache_Manager.h" +#include "tao/Transport.h" +#include "tao/debug.h" +#include "tao/ORB_Core.h" +#include "tao/Connection_Purging_Strategy.h" +#include "tao/Client_Strategy_Factory.h" +#include "tao/Condition.h" +#include "tao/Wait_Strategy.h" +#include "ace/ACE.h" +#include "ace/Reactor.h" + +#if !defined (__ACE_INLINE__) +# include "tao/Transport_Cache_Manager.inl" +#endif /* __ACE_INLINE__ */ + + +ACE_RCSID (tao, + Transport_Cache_Manager, + "$Id$") + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + Transport_Cache_Manager::Transport_Cache_Manager (TAO_ORB_Core &orb_core) + : percent_ (orb_core.resource_factory ()->purge_percentage ()) + , purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()) + , cache_map_ (orb_core.resource_factory ()->cache_maximum ()) + , condition_ (0) + , cache_lock_ (0) + , muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()) + , no_waiting_threads_ (0) + , last_entry_returned_ (0) + { + if (orb_core.resource_factory ()->locked_transport_cache ()) + { + ACE_NEW (this->condition_, + TAO_Condition <TAO_SYNCH_MUTEX>); + + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ())); + } + else + { + /// If the cache is not going to be locked then dont create a + /// condition variable. Make the <muxed_number_> to 0, else a + /// single thread could get into waiting mode + this->muxed_number_ = 0; + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>); + } + } + + Transport_Cache_Manager::~Transport_Cache_Manager (void) + { + // Wakeup all the waiting threads threads before we shutdown stuff + if (this->no_waiting_threads_) + { + this->condition_->broadcast (); + } + + // Delete the lock that we have + if (this->cache_lock_) + { + delete this->cache_lock_; + this->cache_lock_ = 0; + } + + // Delete the purging strategy + if (this->purging_strategy_) + { + delete this->purging_strategy_; + this->purging_strategy_ = 0; + } + + // Delete the condition variable + if (this->condition_) + { + delete this->condition_; + this->condition_ = 0; + } + } + + + int + Transport_Cache_Manager::bind_i (Cache_ExtId &ext_id, + Cache_IntId &int_id) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "0x%x -> 0x%x Transport[%d]\n", + ext_id.property (), + int_id.transport (), + int_id.transport ()->id ())); + } + + // Get the entry too + HASH_MAP_ENTRY *entry = 0; + + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (int_id.transport ()); + + int retval = this->cache_map_.bind (ext_id, + int_id, + entry); + if (retval == 0) + { + // The entry has been added to cache succesfully + // Add the cache_map_entry to the transport + int_id.transport ()->cache_map_entry (entry); + } + else if (retval == 1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "unable to bind in the first attempt. " + "Trying with a new index\n")); + } + + // There was an entry like this before, so let us do some + // minor adjustments and rebind + retval = this->get_last_index_bind (ext_id, + int_id, + entry); + if (retval == 0) + { + int_id.transport ()->cache_map_entry (entry); + } + } + + if (TAO_debug_level > 5 && retval != 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "unable to bind\n")); + } + else if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "cache size is [%d]\n", + this->current_size ())); + } + + return retval; + } + + int + Transport_Cache_Manager::find_transport ( + TAO_Transport_Descriptor_Interface *prop, + TAO_Transport *&transport) + { + if (prop == 0) + { + transport = 0; + return -1; + } + + // Compose the ExternId + Cache_ExtId ext_id (prop); + Cache_IntId int_id; + + int const retval = this->find (ext_id, + int_id); + if (retval == 0) + { + transport = int_id.relinquish_transport (); + + if (transport->wait_strategy ()->non_blocking () == 0 && + transport->orb_core ()->client_factory ()->use_cleanup_options ()) + { + ACE_Event_Handler * const eh = + transport->event_handler_i (); + + ACE_Reactor * const r = + transport->orb_core ()->reactor (); + + if (eh && + r->remove_handler (eh, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::DONT_CALL) == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]") + ACE_TEXT ("::find_transport, remove_handler failed \n"), + transport->id ())); + } + else + { + transport->wait_strategy ()->is_registered (false); + } + } + } + + return retval; + } + + int + Transport_Cache_Manager::find (const Cache_ExtId &key, + Cache_IntId &value) + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->cache_lock_, + -1)); + + int const status = this->find_i (key, + value); + + if (status == 0) + { + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (value.transport ()); + } + + return status; + } + + int + Transport_Cache_Manager::find_i (const Cache_ExtId &key, + Cache_IntId &value) + { + HASH_MAP_ENTRY *entry = 0; + + // Get the entry from the Hash Map + int retval = 0; + + // Make a temporary object. It does not do a copy. + Cache_ExtId tmp_key (key.property ()); + + while (retval == 0) + { + // Wait for a connection.. + this->wait_for_connection (tmp_key); + + // Look for an entry in the map + retval = this->cache_map_.find (tmp_key, + entry); + + // We have an entry in the map, check whether it is idle. + if (entry) + { + CORBA::Boolean idle = + this->is_entry_idle (entry); + + if (idle) + { + // Successfully found a TAO_Transport. + + entry->int_id_.recycle_state (ENTRY_BUSY); + + // NOTE: This assignment operator indirectly incurs two + // lock operations since it duplicates and releases + // TAO_Transport objects. + value = entry->int_id_; + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") + ACE_TEXT("at index %d (Transport[%d]) - idle\n"), + entry->ext_id_.index (), + entry->int_id_.transport ()->id ())); + } + + return 0; + } + else if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") + ACE_TEXT("at index %d (Transport[%d]) - not idle\n"), + entry->ext_id_.index (), + entry->int_id_.transport ()->id ())); + } + } + + // Bump the index up + tmp_key.incr_index (); + } + + // If we are here then it is an error + if (TAO_debug_level > 4 && retval != 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Cache_Manager::find_i, " + "no idle transport is available\n")); + } + + return retval; + } + + int + Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) + { + if (entry == 0) + return -1; + + entry->int_id_.recycle_state (ENTRY_IDLE_AND_PURGABLE); + + // Does any one need waking? + if (this->no_waiting_threads_) + { + // We returned this entry to the map + this->last_entry_returned_ = &entry->ext_id_; + + // Wake up a thread + this->condition_->signal (); + } + + return 0; + } + + int + Transport_Cache_Manager::update_entry (HASH_MAP_ENTRY *&entry) + { + if(entry == 0) + return -1; + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->cache_lock_, -1)); + + if (entry == 0) + return -1; + + TAO_Connection_Purging_Strategy *st = this->purging_strategy_; + (void) st->update_item (entry->int_id_.transport ()); + + return 0; + } + + int + Transport_Cache_Manager::close_i (Connection_Handler_Set &handlers) + { + HASH_MAP_ITER end_iter = this->cache_map_.end (); + + for (HASH_MAP_ITER iter = this->cache_map_.begin (); + iter != end_iter; + ++iter) + { + // Get the transport to fill its associated connection's handler. + (*iter).int_id_.transport ()->provide_handler (handlers); + + // Inform the transport that has a reference to the entry in the + // map that we are *gone* now. So, the transport should not use + // the reference to the entry that he has, to acces us *at any + // time*. + (*iter).int_id_.transport ()->cache_map_entry (0); + } + + // Unbind all the entries in the map + this->cache_map_.unbind_all (); + + return 0; + } + + bool + Transport_Cache_Manager::blockable_client_transports_i ( + Connection_Handler_Set &h) + { + HASH_MAP_ITER end_iter = this->cache_map_.end (); + + for (HASH_MAP_ITER iter = this->cache_map_.begin (); + iter != end_iter; + ++iter) + { + // Get the transport to fill its associated connection's + // handler. + bool retval = + (*iter).int_id_.transport ()->provide_blockable_handler (h); + + // Do not mark the entry as closed if we don't have a + // blockable handler added + if (retval) + (*iter).int_id_.recycle_state (ENTRY_CLOSED); + } + + return true; + } + + int + Transport_Cache_Manager::purge_entry_i (HASH_MAP_ENTRY *&entry) + { + if (entry == 0) + { + return 0; + } + + // Remove the entry from the Map + int retval = this->cache_map_.unbind (entry); + + // Set the entry pointer to zero + entry = 0; + + return retval; + } + + void + Transport_Cache_Manager::mark_invalid_i (HASH_MAP_ENTRY *&entry) + { + if (entry == 0) + { + return; + } + + // Mark the entry as not usable + entry->int_id_.recycle_state (ENTRY_PURGABLE_BUT_NOT_IDLE); + } + + int + Transport_Cache_Manager::get_last_index_bind (Cache_ExtId &key, + Cache_IntId &val, + HASH_MAP_ENTRY *&entry) + { + CORBA::ULong ctr = entry->ext_id_.index (); + int retval = 0; + + while (retval == 0) + { + // Set the index + key.index (++ctr); + + // Check to see if an element exists in the Map. If it exists we + // loop, else we drop out of the loop + retval = this->cache_map_.find (key); + } + + // Now do a bind again with the new index + return this->cache_map_.bind (key, + val, + entry); + } + + + bool + Transport_Cache_Manager::is_entry_idle (HASH_MAP_ENTRY *&entry) + { + Cache_Entries_State entry_state = + entry->int_id_.recycle_state (); + + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_idle, ") + ACE_TEXT("state is [%d]\n"), + entry_state)); + } + + if (entry_state == ENTRY_IDLE_AND_PURGABLE || + entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE) + return true; + + return false; + } + +#if !defined (ACE_LACKS_QSORT) + int + Transport_Cache_Manager::cpscmp(const void* a, const void* b) + { + const HASH_MAP_ENTRY** left = (const HASH_MAP_ENTRY**)a; + const HASH_MAP_ENTRY** right = (const HASH_MAP_ENTRY**)b; + + if ((*left)->int_id_.transport ()->purging_order () < + (*right)->int_id_.transport ()->purging_order ()) + return -1; + + if ((*left)->int_id_.transport ()->purging_order () > + (*right)->int_id_.transport ()->purging_order ()) + return 1; + + return 0; + } +#endif /* ACE_LACKS_QSORT */ + + int + Transport_Cache_Manager::purge (void) + { + ACE_Unbounded_Stack<TAO_Transport*> transports_to_be_closed; + + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0)); + + DESCRIPTOR_SET sorted_set = 0; + int sorted_size = this->fill_set_i (sorted_set); + + // Only call close_entries () if sorted_set != 0. It takes + // control of sorted_set and cleans up any allocated memory. If + // sorted_set == 0, then there is nothing to de-allocate. + if (sorted_set != 0) + { + // BEGIN FORMER close_entries + // Calculate the number of entries to purge + const int amount = (sorted_size * this->percent_) / 100; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::") + ACE_TEXT ("purge, purging %d of %d cache entries\n"), + amount, + sorted_size)); + } + + int count = 0; + + for (int i = 0; count < amount && i < sorted_size; ++i) + { + if (this->is_entry_idle (sorted_set[i])) + { + sorted_set[i]->int_id_.recycle_state (ENTRY_BUSY); + + TAO_Transport* transport = + sorted_set[i]->int_id_.transport (); + transport->add_reference (); + + if (transports_to_be_closed.push (transport) != 0) + { + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("Unable to push transport %u ") + ACE_TEXT ("on the to-be-closed stack, so ") + ACE_TEXT ("it will leak\n"), + transport->id ())); + } + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("Idle transport found in ") + ACE_TEXT ("cache: [%d] \n"), + transport->id ())); + } + + // Count this as a successful purged entry + count++; + } + } + + delete [] sorted_set; + sorted_set = 0; + // END FORMER close_entries + } + } + + // Now, without the lock held, lets go through and close all the transports. + TAO_Transport *transport = 0; + + while (! transports_to_be_closed.is_empty ()) + { + if (transports_to_be_closed.pop (transport) == 0) + { + if (transport) + { + transport->close_connection (); + transport->remove_reference (); + } + } + } + + return 0; + } + + + void + Transport_Cache_Manager::sort_set (DESCRIPTOR_SET& entries, + int current_size) + { +#if defined (ACE_LACKS_QSORT) + // Use insertion sort if we don't have qsort + for(int i = 1; i < current_size; ++i) + { + if (entries[i]->int_id_.transport ()->purging_order () < + entries[i - 1]->int_id_.transport ()->purging_order ()) + { + HASH_MAP_ENTRY* entry = entries[i]; + + for(int j = i; j > 0 && + entries[j - 1]->int_id_.transport ()->purging_order () > + entry->int_id_.transport ()->purging_order (); --j) + { + HASH_MAP_ENTRY* holder = entries[j]; + entries[j] = entries[j - 1]; + entries[j - 1] = holder; + } + } + } +#else + ACE_OS::qsort (entries, current_size, + sizeof (HASH_MAP_ENTRY*), (ACE_COMPARE_FUNC)cpscmp); +#endif /* ACE_LACKS_QSORT */ + } + + + int + Transport_Cache_Manager::fill_set_i (DESCRIPTOR_SET& sorted_set) + { + int current_size = 0; + int cache_maximum = this->purging_strategy_->cache_maximum (); + + // set sorted_set to 0. This signifies nothing to purge. + sorted_set = 0; + + // Do we need to worry about cache purging? + if (cache_maximum >= 0) + { + current_size = static_cast<int> (this->cache_map_.current_size ()); + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::fill_set_i, ") + ACE_TEXT("current_size = %d, cache_maximum = %d\n"), + current_size, cache_maximum)); + } + + if (current_size >= cache_maximum) + { + ACE_NEW_RETURN (sorted_set, HASH_MAP_ENTRY*[current_size], 0); + + HASH_MAP_ITER iter = this->cache_map_.begin (); + + for (int i = 0; i < current_size; ++i) + { + sorted_set[i] = &(*iter); + iter++; + } + + this->sort_set (sorted_set, current_size); + } + } + + return current_size; + } + + + int + Transport_Cache_Manager::wait_for_connection (Cache_ExtId &extid) + { + if (this->muxed_number_ && this->muxed_number_ == extid.index ()) + { + // If we have a limit on the number of muxed connections for + // a particular endpoint just wait to get the connection + ++this->no_waiting_threads_; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager") + ACE_TEXT("::wait_for_connection, ") + ACE_TEXT("entering wait loop\n"))); + } + + int ready_togo = 0; + + while (ready_togo == 0) + { + this->condition_->wait (); + + // Check whether we are waiting for this connection + ready_togo = this->is_wakeup_useful (extid); + } + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::wait_for_connection, ") + ACE_TEXT("left wait loop\n"))); + } + + // We are not waiting anymore + --this->no_waiting_threads_; + } + + return 0; + } + + int + Transport_Cache_Manager::is_wakeup_useful (Cache_ExtId &extid) + { + // Get the underlying property that we are looking for + TAO_Transport_Descriptor_Interface *prop = extid.property (); + + // Just check the underlying property for equivalence. If the last + // connection that was returned had the same property just return + // 1. + if (this->last_entry_returned_ && + prop->is_equivalent (this->last_entry_returned_->property ())) + { + // Set the index to be right so that we can pick teh connection + // right away.. + extid.index (this->last_entry_returned_->index ()); + + // There is no more use for it ... + this->last_entry_returned_ = 0; + + return 1; + } + + // If there is an entry that was returned and if there are more + // threads just wake up the peer to check for the returned + // connection. + if (this->last_entry_returned_ && + this->no_waiting_threads_ > 1) + { + this->condition_->signal (); + } + + return 0; + } + +} + +TAO_END_VERSIONED_NAMESPACE_DECL |