diff options
Diffstat (limited to 'TAO/tao/Transport_Cache_Manager.cpp')
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.cpp | 557 |
1 files changed, 253 insertions, 304 deletions
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp index 2ff69e5c90b..b1012a4a842 100644 --- a/TAO/tao/Transport_Cache_Manager.cpp +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -19,6 +19,12 @@ ACE_RCSID (tao, Transport_Cache_Manager, "$Id$") +// notes on debug level and LM_xxxx codes for transport cache +// TAO_debug_level > 0: recoverable error condition (LM_ERROR) +// TAO_debug_level > 4: normal transport cache operations (LM_INFO) +// TAO_debug_level > 6: detailed cache operations (LM_DEBUG) +// TAO_debug_level > 8: for debugging the cache itself (LM_DEBUG) + TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -28,26 +34,15 @@ namespace TAO : percent_ (orb_core.resource_factory ()->purge_percentage ()) , purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()) , cache_map_ (orb_core.resource_factory ()->cache_maximum ()) - , condition_ (0) , cache_lock_ (0) - , muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()) - , no_waiting_threads_ (0) - , last_entry_returned_ (0) { if (orb_core.resource_factory ()->locked_transport_cache ()) { - ACE_NEW (this->condition_, - TAO_Condition <TAO_SYNCH_MUTEX>); - ACE_NEW (this->cache_lock_, - ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ())); + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (this->cache_map_mutex_)); } else { - /// If the cache is not going to be locked then dont create a - /// condition variable. Make the <muxed_number_> to 0, else a - /// single thread could get into waiting mode - this->muxed_number_ = 0; ACE_NEW (this->cache_lock_, ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>); } @@ -55,12 +50,6 @@ namespace TAO Transport_Cache_Manager::~Transport_Cache_Manager (void) { - // Wakeup all the waiting threads threads before we shutdown stuff - if (this->no_waiting_threads_) - { - this->condition_->broadcast (); - } - // Delete the lock that we have if (this->cache_lock_) { @@ -75,12 +64,6 @@ namespace TAO this->purging_strategy_ = 0; } - // Delete the condition variable - if (this->condition_) - { - delete this->condition_; - this->condition_ = 0; - } } @@ -88,13 +71,14 @@ namespace TAO Transport_Cache_Manager::bind_i (Cache_ExtId &ext_id, Cache_IntId &int_id) { - if (TAO_debug_level > 0) + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_INFO, - ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i, ") - ACE_TEXT ("Transport[%d]; hash %d\n"), + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i: ") + ACE_TEXT ("Transport[%d] @ hash:index{%d:%d}\n"), int_id.transport ()->id (), - ext_id.hash () + ext_id.hash (), + ext_id.index () )); } @@ -104,207 +88,231 @@ namespace TAO // Update the purging strategy information while we // are holding our lock this->purging_strategy_->update_item (int_id.transport ()); - - int retval = this->cache_map_.bind (ext_id, int_id, entry); - if (retval == 0) - { - // The entry has been added to cache succesfully - // Add the cache_map_entry to the transport - int_id.transport ()->cache_map_entry (entry); - } - else if (retval == 1) + int retval = 0; + bool more_to_do = true; + while (more_to_do) { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " - "unable to bind in the first attempt. " - "Trying with a new index\n")); - } - - // There was an entry like this before, so let us do some - // minor adjustments and rebind - retval = this->get_last_index_bind (ext_id, int_id, entry); + retval = this->cache_map_.bind (ext_id, int_id, entry); if (retval == 0) { + // The entry has been added to cache succesfully + // Add the cache_map_entry to the transport int_id.transport ()->cache_map_entry (entry); + more_to_do = false; + } + else if (retval == 1) + { + if (entry->item ().transport () == int_id.transport ()) + { + // update the cache status + // we are already holding the lock, do not call set_entry_state + entry->item ().recycle_state (int_id.recycle_state ()); + retval = 0; + more_to_do = false; + } + else + { + ext_id.index (ext_id.index() + 1); + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i: ") + ACE_TEXT ("Unable to bind Transport[%d] @ hash:index{%d:%d}. ") + ACE_TEXT ("Trying with a new index \n"), + int_id.transport ()->id (), + ext_id.hash (), + ext_id.index () + )); + } + } + } + else + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "ERROR: unable to bind transport\n")); + } + more_to_do = false; } } - - if (TAO_debug_level > 5 && retval != 0) - { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " - "unable to bind\n")); - } - else if (TAO_debug_level > 3) + if (retval == 0) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " - "cache size is [%d]\n", - this->current_size ())); + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i: ") + ACE_TEXT ("Success Transport[%d] @ hash:index{%d:%d}. ") + ACE_TEXT ("Cache size is [%d]\n"), + int_id.transport ()->id (), + ext_id.hash (), + ext_id.index (), + this->current_size () + )); + } } return retval; } - int + Transport_Cache_Manager::Find_Result Transport_Cache_Manager::find_transport ( TAO_Transport_Descriptor_Interface *prop, - TAO_Transport *&transport) + TAO_Transport *&transport, + size_t &busy_count) { if (prop == 0) { transport = 0; - return -1; + return CACHE_FOUND_NONE; } - // Compose the ExternId - Cache_ExtId ext_id (prop); - Cache_IntId int_id; - - int const retval = this->find (ext_id, int_id); - if (retval == 0) + Transport_Cache_Manager::Find_Result find_result = this->find ( + prop, transport, busy_count); + if (find_result != CACHE_FOUND_NONE) { - transport = int_id.relinquish_transport (); - - if (transport->wait_strategy ()->non_blocking () == 0 && - transport->orb_core ()->client_factory ()->use_cleanup_options ()) + if (find_result == CACHE_FOUND_AVAILABLE) { - ACE_Event_Handler * const eh = transport->event_handler_i (); - ACE_Reactor * const r = transport->orb_core ()->reactor (); - - if (eh && - r->remove_handler (eh, - ACE_Event_Handler::READ_MASK | - ACE_Event_Handler::DONT_CALL) == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]") - ACE_TEXT ("::find_transport, remove_handler failed \n"), - transport->id ())); - } - else + if (transport->wait_strategy ()->non_blocking () == 0 && + transport->orb_core ()->client_factory ()->use_cleanup_options ()) { - transport->wait_strategy ()->is_registered (false); - } - } - } + ACE_Event_Handler * const eh = + transport->event_handler_i (); - return retval; - } - - int - Transport_Cache_Manager::find (const Cache_ExtId &key, - Cache_IntId &value) - { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); + ACE_Reactor * const r = + transport->orb_core ()->reactor (); - int const status = this->find_i (key, value); - - if (status == 0) - { - // Update the purging strategy information while we - // are holding our lock - this->purging_strategy_->update_item (value.transport ()); + if (eh && + r->remove_handler (eh, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::DONT_CALL) == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]") + ACE_TEXT ("::find_transport, remove_handler failed \n"), + transport->id ())); + } + else + { + transport->wait_strategy ()->is_registered (false); + } + } + } } - - return status; + return find_result; } - int - Transport_Cache_Manager::find_i (const Cache_ExtId &key, - Cache_IntId &value) + Transport_Cache_Manager::Find_Result + Transport_Cache_Manager::find_i ( + TAO_Transport_Descriptor_Interface *prop, + TAO_Transport *&transport, + size_t &busy_count) { - HASH_MAP_ENTRY *entry = 0; - // Get the entry from the Hash Map - int retval = 0; + Transport_Cache_Manager::Find_Result found = CACHE_FOUND_NONE; // Make a temporary object. It does not do a copy. - Cache_ExtId tmp_key (key.property ()); + Cache_ExtId key (prop); + HASH_MAP_ENTRY *entry = 0; + busy_count = 0; + int cache_status = 0; + HASH_MAP_ENTRY *found_entry = 0; - while (retval == 0) + // loop until we find a usable transport, or until we've checked + // all cached entries for this endpoint + while (found != CACHE_FOUND_AVAILABLE && cache_status == 0) { - // Wait for a connection.. - this->wait_for_connection (tmp_key); - - // Look for an entry in the map - retval = this->cache_map_.find (tmp_key, entry); - - // We have an entry in the map, check whether it is idle. - if (entry) + entry = 0; + cache_status = this->cache_map_.find (key, + entry); + if (cache_status == 0 && entry) { - CORBA::Boolean const idle = this->is_entry_idle (entry); - - if (idle) + if (this->is_entry_available (*entry)) { // Successfully found a TAO_Transport. - entry->int_id_.recycle_state (ENTRY_BUSY); + found = CACHE_FOUND_AVAILABLE; + found_entry = entry; + entry->item ().recycle_state (ENTRY_BUSY); - // NOTE: This assignment operator indirectly incurs two - // lock operations since it duplicates and releases - // TAO_Transport objects. - value = entry->int_id_; - - if (TAO_debug_level > 4) + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") - ACE_TEXT("at index %d (Transport[%d]) - idle\n"), - entry->ext_id_.index (), - entry->int_id_.transport ()->id ())); + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::find_i: ") + ACE_TEXT ("Found available Transport[%d] @hash:index {%d:%d}\n"), + entry->item ().transport ()->id (), + entry->ext_id_.hash (), + entry->ext_id_.index () + )); + } + } + else if (this->is_entry_connecting (*entry)) + { + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::find_i: ") + ACE_TEXT ("Found connecting Transport[%d] @hash:index {%d:%d}\n"), + entry->item ().transport ()->id (), + entry->ext_id_.hash (), + entry->ext_id_.index () + )); + } + // if this is the first interesting entry + if (found != CACHE_FOUND_CONNECTING) + { + found_entry = entry; + found = CACHE_FOUND_CONNECTING; } - - return 0; } - else if (TAO_debug_level > 6) + else + { + // if this is the first busy entry + if (found == CACHE_FOUND_NONE && busy_count == 0) + { + found_entry = entry; + found = CACHE_FOUND_BUSY; + } + busy_count += 1; + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") - ACE_TEXT("at index %d (Transport[%d]) - not idle\n"), - entry->ext_id_.index (), - entry->int_id_.transport ()->id ())); + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::find_i: ") + ACE_TEXT ("Found busy Transport[%d] @hash:index {%d:%d}\n"), + entry->item ().transport ()->id (), + entry->ext_id_.hash (), + entry->ext_id_.index () + )); } + } } // Bump the index up - tmp_key.incr_index (); + key.incr_index (); } - - // If we are here then it is an error - if (TAO_debug_level > 4 && retval != 0) - { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Cache_Manager::find_i, " - "no idle transport is available\n")); - } - - return retval; + if (found_entry != 0) + { + transport = found_entry->item ().transport (); + transport->add_reference (); + if (found == CACHE_FOUND_AVAILABLE) + { + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (transport); + } + } + return found; } int - Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) + Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *entry) { if (entry == 0) return -1; - entry->int_id_.recycle_state (ENTRY_IDLE_AND_PURGABLE); - - // Does any one need waking? - if (this->no_waiting_threads_) - { - // We returned this entry to the map - this->last_entry_returned_ = &entry->ext_id_; - - // Wake up a thread - this->condition_->signal (); - } + entry->item ().recycle_state (ENTRY_IDLE_AND_PURGABLE); return 0; } @@ -323,7 +331,7 @@ namespace TAO return -1; TAO_Connection_Purging_Strategy *st = this->purging_strategy_; - (void) st->update_item (entry->int_id_.transport ()); + (void) st->update_item (entry->item ().transport ()); return 0; } @@ -395,7 +403,7 @@ namespace TAO } void - Transport_Cache_Manager::mark_invalid_i (HASH_MAP_ENTRY *&entry) + Transport_Cache_Manager::mark_invalid_i (HASH_MAP_ENTRY *entry) { if (entry == 0) { @@ -403,53 +411,58 @@ namespace TAO } // Mark the entry as not usable - entry->int_id_.recycle_state (ENTRY_PURGABLE_BUT_NOT_IDLE); + entry->item ().recycle_state (ENTRY_PURGABLE_BUT_NOT_IDLE); } - int - Transport_Cache_Manager::get_last_index_bind (Cache_ExtId &key, - Cache_IntId &val, - HASH_MAP_ENTRY *&entry) + bool + Transport_Cache_Manager::is_entry_available (const HASH_MAP_ENTRY &entry) { - CORBA::ULong ctr = entry->ext_id_.index (); - int retval = 0; + Cache_Entries_State entry_state = + entry.int_id_.recycle_state (); + bool result = (entry_state == ENTRY_IDLE_AND_PURGABLE || + entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE); - while (retval == 0) - { - // Set the index - key.index (++ctr); + if (result && entry.int_id_.transport () != 0) + { + // if it's not connected, it's not available + result = entry.int_id_.is_connected_; + } - // Check to see if an element exists in the Map. If it exists we - // loop, else we drop out of the loop - retval = this->cache_map_.find (key); + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::is_entry_available:") + ACE_TEXT ("returns %s state is [%d]\n"), + (result?"True":"False"), + entry_state)); } - // Now do a bind again with the new index - return this->cache_map_.bind (key, - val, - entry); + return result; } - bool - Transport_Cache_Manager::is_entry_idle (HASH_MAP_ENTRY *&entry) + Transport_Cache_Manager::is_entry_connecting (const HASH_MAP_ENTRY &entry) { Cache_Entries_State entry_state = - entry->int_id_.recycle_state (); + entry.int_id_.recycle_state (); + bool result = (entry_state == ENTRY_CONNECTING); + if (!result && entry.int_id_.transport () != 0) + { + // if we're not connected, that counts, too. + // Can this happen? Not sure <wilsond@ociweb.com> + result = !entry.int_id_.is_connected_; + } - if (TAO_debug_level) + if (TAO_debug_level > 8) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_idle, ") - ACE_TEXT("state is [%d]\n"), - entry_state)); + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::is_entry_connecting: ") + ACE_TEXT ("Returns %s, state is [%d]\n"), + (result?"True":"False"), + entry_state)); } - if (entry_state == ENTRY_IDLE_AND_PURGABLE || - entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE) - return true; - - return false; + return result; } #if !defined (ACE_LACKS_QSORT) @@ -491,20 +504,20 @@ namespace TAO // Calculate the number of entries to purge int const amount = (sorted_size * this->percent_) / 100; - if (TAO_debug_level > 0) + if (TAO_debug_level > 4) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::") - ACE_TEXT ("purge, purging %d of %d cache entries\n"), - amount, - sorted_size)); + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::purge: ") + ACE_TEXT ("Purging %d of %d cache entries\n"), + amount, + sorted_size)); } int count = 0; for (int i = 0; count < amount && i < sorted_size; ++i) { - if (this->is_entry_idle (sorted_set[i])) + if (this->is_entry_available (*sorted_set[i])) { sorted_set[i]->int_id_.recycle_state (ENTRY_BUSY); @@ -512,22 +525,26 @@ namespace TAO sorted_set[i]->int_id_.transport (); transport->add_reference (); - if (transports_to_be_closed.push (transport) != 0) + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_INFO, - ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("Unable to push transport %u ") - ACE_TEXT ("on the to-be-closed stack, so ") - ACE_TEXT ("it will leak\n"), - transport->id ())); + ACE_TEXT ("TAO (%P|%t) Transport_Cache_Manager::purge ") + ACE_TEXT ("Purgable Transport[%d] found in ") + ACE_TEXT ("cache\n"), + transport->id ())); } - if (TAO_debug_level > 0) + if (transports_to_be_closed.push (transport) != 0) { - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("Idle transport found in ") - ACE_TEXT ("cache: [%d] \n"), - transport->id ())); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::purge: ") + ACE_TEXT ("Unable to push transport[%u] ") + ACE_TEXT ("on the to-be-closed stack, so ") + ACE_TEXT ("it will leak\n"), + transport->id ())); + } } // Count this as a successful purged entry @@ -555,6 +572,14 @@ namespace TAO } } } + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::purge: ") + ACE_TEXT ("Cache size after purging is [%d]\n"), + this->current_size () + )); + } return 0; } @@ -575,7 +600,7 @@ namespace TAO for(int j = i; j > 0 && entries[j - 1]->int_id_.transport ()->purging_order () > - entry->int_id_.transport ()->purging_order (); --j) + entry->item ().transport ()->purging_order (); --j) { HASH_MAP_ENTRY* holder = entries[j]; entries[j] = entries[j - 1]; @@ -602,14 +627,14 @@ namespace TAO // Do we need to worry about cache purging? if (cache_maximum >= 0) { - current_size = static_cast<int> (this->cache_map_.current_size ()); + current_size = static_cast<int> (this->current_size ()); - if (TAO_debug_level > 0) + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::fill_set_i, ") - ACE_TEXT("current_size = %d, cache_maximum = %d\n"), - current_size, cache_maximum)); + ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::fill_set_i: ") + ACE_TEXT ("current_size = %d, cache_maximum = %d\n"), + current_size, cache_maximum)); } if (current_size >= cache_maximum) @@ -631,82 +656,6 @@ namespace TAO return current_size; } - - int - Transport_Cache_Manager::wait_for_connection (Cache_ExtId &extid) - { - if (this->muxed_number_ && this->muxed_number_ == extid.index ()) - { - // If we have a limit on the number of muxed connections for - // a particular endpoint just wait to get the connection - ++this->no_waiting_threads_; - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager") - ACE_TEXT("::wait_for_connection, ") - ACE_TEXT("entering wait loop\n"))); - } - - int ready_togo = 0; - - while (ready_togo == 0) - { - this->condition_->wait (); - - // Check whether we are waiting for this connection - ready_togo = this->is_wakeup_useful (extid); - } - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::wait_for_connection, ") - ACE_TEXT("left wait loop\n"))); - } - - // We are not waiting anymore - --this->no_waiting_threads_; - } - - return 0; - } - - int - Transport_Cache_Manager::is_wakeup_useful (Cache_ExtId &extid) - { - // Get the underlying property that we are looking for - TAO_Transport_Descriptor_Interface *prop = extid.property (); - - // Just check the underlying property for equivalence. If the last - // connection that was returned had the same property just return - // 1. - if (this->last_entry_returned_ && - prop->is_equivalent (this->last_entry_returned_->property ())) - { - // Set the index to be right so that we can pick teh connection - // right away.. - extid.index (this->last_entry_returned_->index ()); - - // There is no more use for it ... - this->last_entry_returned_ = 0; - - return 1; - } - - // If there is an entry that was returned and if there are more - // threads just wake up the peer to check for the returned - // connection. - if (this->last_entry_returned_ && - this->no_waiting_threads_ > 1) - { - this->condition_->signal (); - } - - return 0; - } - } TAO_END_VERSIONED_NAMESPACE_DECL |