summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport_Cache_Manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport_Cache_Manager.cpp')
-rw-r--r--TAO/tao/Transport_Cache_Manager.cpp207
1 files changed, 155 insertions, 52 deletions
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp
index 30b54ed2744..6e89c48a75e 100644
--- a/TAO/tao/Transport_Cache_Manager.cpp
+++ b/TAO/tao/Transport_Cache_Manager.cpp
@@ -6,7 +6,8 @@
#include "tao/ORB_Core.h"
#include "tao/Resource_Factory.h"
#include "tao/Connection_Purging_Strategy.h"
-
+#include "tao/Condition.h"
+#include "ace/Synch_T.h"
#include "ace/Handle_Set.h"
#if !defined (__ACE_INLINE__)
@@ -18,23 +19,65 @@ ACE_RCSID (TAO,
Transport_Cache_Manager,
"$Id$")
+
TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core)
: percent_ (orb_core.resource_factory ()->purge_percentage ()),
purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()),
cache_map_ (),
- cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ())
+ condition_ (0),
+ cache_lock_ (0),
+ muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()),
+ no_waiting_threads_ (0),
+ last_entry_returned_ (0)
{
+ if (orb_core.resource_factory ()->locked_transport_cache ())
+ {
+ ACE_NEW (this->condition_,
+ TAO_Condition <TAO_SYNCH_MUTEX>);
+
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ()));
+ }
+ else
+ {
+ /// If the cache is not going to be locked then dont create a
+ /// condition variable. Make the <muxed_number_> to 0, else a
+ /// single thread could get into waiting mode
+ this->muxed_number_ = 0;
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>);
+ }
}
TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void)
{
+ // Wakeup all the waiting threads threads before we shutdown stuff
+ if (this->no_waiting_threads_)
+ this->condition_->broadcast ();
+
// Delete the lock that we have
- delete this->cache_lock_;
+ if (this->cache_lock_)
+ {
+ delete this->cache_lock_;
+ this->cache_lock_ = 0;
+ }
// Delete the purging strategy
- delete this->purging_strategy_;
+ if (this->purging_strategy_)
+ {
+ delete this->purging_strategy_;
+ this->purging_strategy_ = 0;
+ }
+
+ // Delete the condition variable
+ if (this->condition_)
+ {
+ delete this->condition_;
+ this->condition_ = 0;
+ }
}
+
int
TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
TAO_Cache_IntId &int_id)
@@ -65,8 +108,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
{
// The entry has been added to cache succesfully
// Add the cache_map_entry to the transport
- int_id.transport () ->cache_map_entry (entry);
-
+ int_id.transport ()->cache_map_entry (entry);
}
else if (retval == 1)
{
@@ -100,13 +142,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
return retval;
}
-// Used to be in the .inl, but moved here b/c the use of
-// TAO_Transport::_duplicate() caused an include file cycle with
-// inlining turned on.
int
TAO_Transport_Cache_Manager::find_transport (
- TAO_Transport_Descriptor_Interface *prop,
- TAO_Transport *&transport)
+ TAO_Transport_Descriptor_Interface *prop,
+ TAO_Transport *&transport)
{
if (prop == 0)
{
@@ -129,6 +168,29 @@ TAO_Transport_Cache_Manager::find_transport (
}
int
+TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key,
+ TAO_Cache_IntId &value)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->cache_lock_,
+ -1));
+
+ int status = this->find_i (key,
+ value);
+
+ if (status == 0)
+ {
+ // Update the purging strategy information while we
+ // are holding our lock
+ this->purging_strategy_->update_item (value.transport ());
+ }
+ return status;
+}
+
+
+
+int
TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value)
{
@@ -137,20 +199,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
// Get the entry from the Hash Map
int retval = 0;
- // Get the index of the <key>
- int index = key.index ();
-
// Make a temporary object. It does not do a copy.
TAO_Cache_ExtId tmp_key (key.property ());
while (retval == 0)
{
+ // Wait for a connection..
+ this->wait_for_connection (tmp_key);
+
// Look for an entry in the map
retval = this->cache_map_.find (tmp_key,
entry);
- // We have an entry in the map, check whether it is idle. If it
- // is idle it would be marked as busy.
+ // We have an entry in the map, check whether it is idle.
if (entry)
{
CORBA::Boolean idle =
@@ -176,7 +237,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
// Bump the index up
- tmp_key.index (++index);
+ tmp_key.incr_index ();
}
// If we are here then it is an error
@@ -191,32 +252,13 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
int
-TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value)
-{
- return this->cache_map_.rebind (key,
- value);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key)
-{
- return this->cache_map_.unbind (key);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- return this->cache_map_.unbind (key,
- value);
-}
-
-int
TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
{
// First get the entry again (if at all things had changed in the
// cache map in the mean time)
+
+ // @todo: Is this required? Looks like a legacy one..
+
HASH_MAP_ENTRY *new_entry = 0;
int retval = this->cache_map_.find (entry->ext_id_,
new_entry);
@@ -227,6 +269,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE);
entry = new_entry;
+
+ // Does any one need waking?
+ if (this->no_waiting_threads_)
+ {
+ // We returned this entry to the map
+ this->last_entry_returned_ = &new_entry->ext_id_;
+
+ // Wake up a thread
+ this->condition_->signal ();
+ }
}
else if (TAO_debug_level > 0 && retval != 0)
{
@@ -252,23 +304,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered,
if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED)
{
-#if 0
- // @@ This code from Connection_Cache_Manager disappeared
- // during the changeover; we need the functional equivalent back.
- // The problem is that with the locking stuff that we're putting
- // in to the Transport, we might want to encapsulate the whole
- // exercise of adding to the handle set in a method on the transport
- // rather than doing it here. That way, the locking is correct.
- if ((*iter).int_id_.handler ()->is_registered ())
- {
- reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ());
- }
-#else
// Get the transport to fill its associated connection's handle in
// the handle sets.
(*iter).int_id_.transport ()->provide_handle (reactor_registered,
unregistered);
-#endif
// Inform the transport that has a reference to the entry in the
// map that we are *gone* now. So, the transport should not use
// the reference to the entry that he has, to acces us *at any
@@ -497,6 +536,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set,
sorted_set = 0;
}
+int
+TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid)
+{
+ if (this->muxed_number_ &&
+ this->muxed_number_ == extid.index ())
+ {
+ // If we have a limit on the number of muxed connections for
+ // a particular endpoint just wait to get the connection
+ ++this->no_waiting_threads_;
+
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Going to wait for connections.. \n"));
+
+ int ready_togo = 0;
+
+ while (ready_togo == 0)
+ {
+ this->condition_->wait ();
+
+ // Check whether we are waiting for this connection
+ ready_togo = this->is_wakeup_useful (extid);
+ }
+
+ // We are not waiting anymore
+ --this->no_waiting_threads_;
+ }
+
+ return 0;
+}
+
+int
+TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid)
+{
+ // Get the underlying property that we are looking for
+ TAO_Transport_Descriptor_Interface *prop = extid.property ();
+
+ // Just check the underlying property for equivalence. If the last
+ // connection that was returned had the same property just return
+ // 1.
+ if (this->last_entry_returned_ &&
+ prop->is_equivalent (this->last_entry_returned_->property ()))
+ {
+ // Set the index to be right so that we can pick teh connection
+ // right away..
+ extid.index (this->last_entry_returned_->index ());
+
+ // There is no more use for it ...
+ this->last_entry_returned_ = 0;
+
+ return 1;
+ }
+
+ // If there is an entry that was returned and if there are more
+ // threads just wake up the peer to check for the returned
+ // connection.
+ if (this->last_entry_returned_ &&
+ this->no_waiting_threads_ > 1)
+ {
+ this->condition_->signal ();
+ }
+
+ return 0;
+}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)