diff options
author | wilsond <wilsond@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-08-01 16:41:24 +0000 |
---|---|---|
committer | wilsond <wilsond@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-08-01 16:41:24 +0000 |
commit | 7080c327984fcbb5b914275437bc5b5c72a873c2 (patch) | |
tree | 3d7a9ff3fac09e8978f7f43e0e099e671b4f85ed /TAO/tao/Transport_Cache_Manager.cpp | |
parent | 219b6ccbc8c406b5eba3731619f7aea36060bf65 (diff) | |
download | ATCD-7080c327984fcbb5b914275437bc5b5c72a873c2.tar.gz |
ChangeLogTag: Wed Aug 1 15:54:01 UTC 2007 Dale Wilson <wilsond@ociweb.com>
Diffstat (limited to 'TAO/tao/Transport_Cache_Manager.cpp')
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.cpp | 368 |
1 files changed, 172 insertions, 196 deletions
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp index de734fc9975..ca19193907a 100644 --- a/TAO/tao/Transport_Cache_Manager.cpp +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -28,26 +28,15 @@ namespace TAO : percent_ (orb_core.resource_factory ()->purge_percentage ()) , purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()) , cache_map_ (orb_core.resource_factory ()->cache_maximum ()) - , condition_ (0) , cache_lock_ (0) - , muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()) - , no_waiting_threads_ (0) - , last_entry_returned_ (0) { if (orb_core.resource_factory ()->locked_transport_cache ()) { - ACE_NEW (this->condition_, - TAO_Condition <TAO_SYNCH_MUTEX>); - ACE_NEW (this->cache_lock_, - ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ())); + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (this->cache_map_mutex_)); } else { - /// If the cache is not going to be locked then dont create a - /// condition variable. Make the <muxed_number_> to 0, else a - /// single thread could get into waiting mode - this->muxed_number_ = 0; ACE_NEW (this->cache_lock_, ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>); } @@ -55,12 +44,6 @@ namespace TAO Transport_Cache_Manager::~Transport_Cache_Manager (void) { - // Wakeup all the waiting threads threads before we shutdown stuff - if (this->no_waiting_threads_) - { - this->condition_->broadcast (); - } - // Delete the lock that we have if (this->cache_lock_) { @@ -75,12 +58,6 @@ namespace TAO this->purging_strategy_ = 0; } - // Delete the condition variable - if (this->condition_) - { - delete this->condition_; - this->condition_ = 0; - } } @@ -92,10 +69,12 @@ namespace TAO { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " - "0x%x -> 0x%x Transport[%d]\n", + "0x%x {%d:%d} -> 0x%x Transport[%d]\n", ext_id.property (), - int_id.transport (), - int_id.transport ()->id ())); + ext_id.hash (), + ext_id.index (), + int_id.transport (), + int_id.transport ()->id ())); } // Get the entry too @@ -116,23 +95,40 @@ namespace TAO } else if (retval == 1) { - if (TAO_debug_level > 4) + // if this is already in the cache, just ignore the request + // this happens because some protocols bind their transport early + // to avoid duplication simultaneous connection attempts + if (entry != 0 && entry->int_id_.transport () == int_id.transport ()) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " - "unable to bind in the first attempt. " - "Trying with a new index\n")); + // rebind this entry to update cache status + retval = this->cache_map_.rebind (ext_id, + int_id, + entry); + if(retval == 1) // 1 from rebind means replaced + { + retval = 0; + } } + else + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Cache_Manager::bind_i, " + "unable to bind in the first attempt. " + "Trying with a new index\n")); + } - // There was an entry like this before, so let us do some - // minor adjustments and rebind - retval = this->get_last_index_bind (ext_id, - int_id, - entry); - if (retval == 0) - { - int_id.transport ()->cache_map_entry (entry); - } + // There was an entry like this before, so let us do some + // minor adjustments and rebind + retval = this->get_last_index_bind (ext_id, + int_id, + entry); + if (retval == 0) + { + int_id.transport ()->cache_map_entry (entry); + } + } } if (TAO_debug_level > 5 && retval != 0) @@ -152,70 +148,73 @@ namespace TAO return retval; } - int + Transport_Cache_Manager::Find_Result Transport_Cache_Manager::find_transport ( TAO_Transport_Descriptor_Interface *prop, - TAO_Transport *&transport) + TAO_Transport *&transport, + unsigned int &busy_count) { if (prop == 0) { transport = 0; - return -1; + return CACHE_FOUND_NONE; } // Compose the ExternId Cache_ExtId ext_id (prop); Cache_IntId int_id; - int const retval = this->find (ext_id, - int_id); - if (retval == 0) + Transport_Cache_Manager::Find_Result find_result = this->find (ext_id, + int_id, busy_count); + if (find_result != CACHE_FOUND_NONE) { transport = int_id.relinquish_transport (); - - if (transport->wait_strategy ()->non_blocking () == 0 && - transport->orb_core ()->client_factory ()->use_cleanup_options ()) + if (find_result == CACHE_FOUND_AVAILABLE) { - ACE_Event_Handler * const eh = - transport->event_handler_i (); + if (transport->wait_strategy ()->non_blocking () == 0 && + transport->orb_core ()->client_factory ()->use_cleanup_options ()) + { + ACE_Event_Handler * const eh = + transport->event_handler_i (); - ACE_Reactor * const r = - transport->orb_core ()->reactor (); + ACE_Reactor * const r = + transport->orb_core ()->reactor (); - if (eh && - r->remove_handler (eh, - ACE_Event_Handler::READ_MASK | - ACE_Event_Handler::DONT_CALL) == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]") - ACE_TEXT ("::find_transport, remove_handler failed \n"), - transport->id ())); - } - else - { - transport->wait_strategy ()->is_registered (false); + if (eh && + r->remove_handler (eh, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::DONT_CALL) == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]") + ACE_TEXT ("::find_transport, remove_handler failed \n"), + transport->id ())); + } + else + { + transport->wait_strategy ()->is_registered (false); + } } } } - - return retval; + return find_result; } - int + Transport_Cache_Manager::Find_Result Transport_Cache_Manager::find (const Cache_ExtId &key, - Cache_IntId &value) + Cache_IntId &value, + unsigned int &busy_count) { ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->cache_lock_, - -1)); + Transport_Cache_Manager::CACHE_FOUND_NONE)); - int const status = this->find_i (key, - value); + Transport_Cache_Manager::Find_Result status = this->find_i (key, + value, busy_count); - if (status == 0) + if (status != CACHE_FOUND_NONE) { // Update the purging strategy information while we // are holding our lock @@ -225,37 +224,35 @@ namespace TAO return status; } - int + Transport_Cache_Manager::Find_Result Transport_Cache_Manager::find_i (const Cache_ExtId &key, - Cache_IntId &value) + Cache_IntId &value, + unsigned int & busy_count) { HASH_MAP_ENTRY *entry = 0; + busy_count = 0; // Get the entry from the Hash Map - int retval = 0; + Transport_Cache_Manager::Find_Result found = CACHE_FOUND_NONE; // Make a temporary object. It does not do a copy. Cache_ExtId tmp_key (key.property ()); + int cache_status = 0; - while (retval == 0) + // loop until we find a usable transport, or until we've checked + // all cached entries for this endpoint + while (found != CACHE_FOUND_AVAILABLE && cache_status == 0) { - // Wait for a connection.. - this->wait_for_connection (tmp_key); - - // Look for an entry in the map - retval = this->cache_map_.find (tmp_key, + entry = 0; + cache_status = this->cache_map_.find (tmp_key, entry); - - // We have an entry in the map, check whether it is idle. - if (entry) + if (cache_status == 0 && entry) { - CORBA::Boolean idle = - this->is_entry_idle (entry); - - if (idle) + if (this->is_entry_available (*entry)) { // Successfully found a TAO_Transport. + found = CACHE_FOUND_AVAILABLE; entry->int_id_.recycle_state (ENTRY_BUSY); // NOTE: This assignment operator indirectly incurs two @@ -267,36 +264,71 @@ namespace TAO { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") - ACE_TEXT("at index %d (Transport[%d]) - idle\n"), + ACE_TEXT("{%d:%d} (Transport %x[%d]) - idle\n"), + entry->ext_id_.hash (), entry->ext_id_.index (), + entry->int_id_.transport (), entry->int_id_.transport ()->id ())); } - - return 0; } - else if (TAO_debug_level > 6) + else if (this->is_entry_connecting (*entry)) + { + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") - ACE_TEXT("at index %d (Transport[%d]) - not idle\n"), + ACE_TEXT("{%d:%d} (Transport %x[%d]) - connecting\n"), + entry->ext_id_.hash (), entry->ext_id_.index (), + entry->int_id_.transport (), entry->int_id_.transport ()->id ())); } + // if this is the first interesting entry + if (found != CACHE_FOUND_CONNECTING) + { + // NOTE: This assignment operator indirectly incurs two + // lock operations since it duplicates and releases + // TAO_Transport objects. + value = entry->int_id_; + found = CACHE_FOUND_CONNECTING; + } + } + else + { + // if this is the first busy entry + if (found == CACHE_FOUND_NONE && busy_count == 0) + { + value = entry->int_id_; + found = CACHE_FOUND_BUSY; + } + busy_count += 1; + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ") + ACE_TEXT("{%d:%d} (Transport %x[%d]) - busy\n"), + entry->ext_id_.hash (), + entry->ext_id_.index (), + entry->int_id_.transport (), + entry->int_id_.transport ()->id ())); + } + } } // Bump the index up tmp_key.incr_index (); } - // If we are here then it is an error - if (TAO_debug_level > 4 && retval != 0) + if (TAO_debug_level > 4 && found != CACHE_FOUND_AVAILABLE) { ACE_ERROR ((LM_ERROR, "TAO (%P|%t) - Transport_Cache_Manager::find_i, " - "no idle transport is available\n")); + "no idle transport is available for {%d}\n", + tmp_key.hash () + )); } - return retval; + return found; } int @@ -307,16 +339,6 @@ namespace TAO entry->int_id_.recycle_state (ENTRY_IDLE_AND_PURGABLE); - // Does any one need waking? - if (this->no_waiting_threads_) - { - // We returned this entry to the map - this->last_entry_returned_ = &entry->ext_id_; - - // Wake up a thread - this->condition_->signal (); - } - return 0; } @@ -443,24 +465,54 @@ namespace TAO bool - Transport_Cache_Manager::is_entry_idle (HASH_MAP_ENTRY *&entry) + Transport_Cache_Manager::is_entry_available (const HASH_MAP_ENTRY &entry) { Cache_Entries_State entry_state = - entry->int_id_.recycle_state (); + entry.int_id_.recycle_state (); + bool result = (entry_state == ENTRY_IDLE_AND_PURGABLE || + entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE); + + if (result && entry.int_id_.transport () != 0) + { + // if it's not connected, it's not available + result = entry.int_id_.transport ()->is_connected(); + } if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_idle, ") + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_available: %c ") + ACE_TEXT("state is [%d]\n"), + (result?'T':'F'), + entry_state)); + } + + return result; + } + + bool + Transport_Cache_Manager::is_entry_connecting (const HASH_MAP_ENTRY &entry) + { + Cache_Entries_State entry_state = + entry.int_id_.recycle_state (); + bool result = (entry_state == ENTRY_CONNECTING); + if (!result && entry.int_id_.transport () != 0) + { + // if we're not connected, that counts, too. + // Can this happen? Not sure <wilsond@ociweb.com> + result = !entry.int_id_.transport ()->is_connected(); + } + + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_connecting: %c ") ACE_TEXT("state is [%d]\n"), + (result?'T':'F'), entry_state)); } - if (entry_state == ENTRY_IDLE_AND_PURGABLE || - entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE) - return true; - - return false; + return result; } #if !defined (ACE_LACKS_QSORT) @@ -515,7 +567,7 @@ namespace TAO for (int i = 0; count < amount && i < sorted_size; ++i) { - if (this->is_entry_idle (sorted_set[i])) + if (this->is_entry_available (*sorted_set[i])) { sorted_set[i]->int_id_.recycle_state (ENTRY_BUSY); @@ -642,82 +694,6 @@ namespace TAO return current_size; } - - int - Transport_Cache_Manager::wait_for_connection (Cache_ExtId &extid) - { - if (this->muxed_number_ && this->muxed_number_ == extid.index ()) - { - // If we have a limit on the number of muxed connections for - // a particular endpoint just wait to get the connection - ++this->no_waiting_threads_; - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager") - ACE_TEXT("::wait_for_connection, ") - ACE_TEXT("entering wait loop\n"))); - } - - int ready_togo = 0; - - while (ready_togo == 0) - { - this->condition_->wait (); - - // Check whether we are waiting for this connection - ready_togo = this->is_wakeup_useful (extid); - } - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::wait_for_connection, ") - ACE_TEXT("left wait loop\n"))); - } - - // We are not waiting anymore - --this->no_waiting_threads_; - } - - return 0; - } - - int - Transport_Cache_Manager::is_wakeup_useful (Cache_ExtId &extid) - { - // Get the underlying property that we are looking for - TAO_Transport_Descriptor_Interface *prop = extid.property (); - - // Just check the underlying property for equivalence. If the last - // connection that was returned had the same property just return - // 1. - if (this->last_entry_returned_ && - prop->is_equivalent (this->last_entry_returned_->property ())) - { - // Set the index to be right so that we can pick teh connection - // right away.. - extid.index (this->last_entry_returned_->index ()); - - // There is no more use for it ... - this->last_entry_returned_ = 0; - - return 1; - } - - // If there is an entry that was returned and if there are more - // threads just wake up the peer to check for the returned - // connection. - if (this->last_entry_returned_ && - this->no_waiting_threads_ > 1) - { - this->condition_->signal (); - } - - return 0; - } - } TAO_END_VERSIONED_NAMESPACE_DECL |