summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-01-31 20:25:36 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-01-31 20:25:36 +0000
commit2347c840e0e9faeae9671dac0942a7b5bf0383c8 (patch)
tree2e0857853aef664805d28cc5df8ec942837b8085
parent7a088e42cd9ef3eca943d529c6a712c31a345179 (diff)
downloadATCD-2347c840e0e9faeae9671dac0942a7b5bf0383c8.tar.gz
ChangeLogTag: Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/Cache_Entries.cpp15
-rw-r--r--TAO/tao/Cache_Entries.h3
-rw-r--r--TAO/tao/Cache_Entries.inl6
-rw-r--r--TAO/tao/ChangeLog64
-rw-r--r--TAO/tao/Condition.cpp56
-rw-r--r--TAO/tao/Condition.h115
-rw-r--r--TAO/tao/Condition.inl50
-rw-r--r--TAO/tao/Resource_Factory.cpp14
-rw-r--r--TAO/tao/Resource_Factory.h10
-rw-r--r--TAO/tao/Transport_Cache_Manager.cpp207
-rw-r--r--TAO/tao/Transport_Cache_Manager.h86
-rw-r--r--TAO/tao/Transport_Cache_Manager.inl57
-rw-r--r--TAO/tao/default_resource.cpp29
-rw-r--r--TAO/tao/default_resource.h7
14 files changed, 560 insertions, 159 deletions
diff --git a/TAO/tao/Cache_Entries.cpp b/TAO/tao/Cache_Entries.cpp
index 9c7e17f0273..df610a378f6 100644
--- a/TAO/tao/Cache_Entries.cpp
+++ b/TAO/tao/Cache_Entries.cpp
@@ -24,13 +24,14 @@ TAO_Cache_IntId::~TAO_Cache_IntId (void)
TAO_Cache_IntId&
TAO_Cache_IntId::operator= (const TAO_Cache_IntId &rhs)
{
- if (this != &rhs) {
- this->recycle_state_ = rhs.recycle_state_;
+ if (this != &rhs)
+ {
+ this->recycle_state_ = rhs.recycle_state_;
+
+ TAO_Transport* old_transport = this->transport_;
+ this->transport_ = TAO_Transport::_duplicate (rhs.transport_);
+ TAO_Transport::release (old_transport);
+ }
- TAO_Transport* old_transport = this->transport_;
- this->transport_ = TAO_Transport::_duplicate (rhs.transport_);
- TAO_Transport::release (old_transport);
- }
return *this;
}
-
diff --git a/TAO/tao/Cache_Entries.h b/TAO/tao/Cache_Entries.h
index d1a2464d426..a51e504378d 100644
--- a/TAO/tao/Cache_Entries.h
+++ b/TAO/tao/Cache_Entries.h
@@ -156,6 +156,9 @@ public:
/// but for the TAO_Transport_Cache_Manager class.
void index (CORBA::ULong index);
+ /// Increment the index value
+ void incr_index (void);
+
// = Accessors
/// Get the underlying the property pointer
TAO_Transport_Descriptor_Interface *property (void) const;
diff --git a/TAO/tao/Cache_Entries.inl b/TAO/tao/Cache_Entries.inl
index 1e6bbc118fb..ec19b1ed1e1 100644
--- a/TAO/tao/Cache_Entries.inl
+++ b/TAO/tao/Cache_Entries.inl
@@ -180,6 +180,12 @@ TAO_Cache_ExtId::index (CORBA::ULong index)
this->index_ = index;
}
+ACE_INLINE void
+TAO_Cache_ExtId::incr_index (void)
+{
+ ++this->index_;
+}
+
ACE_INLINE TAO_Transport_Descriptor_Interface *
TAO_Cache_ExtId::property (void) const
{
diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog
new file mode 100644
index 00000000000..1b94ce800d9
--- /dev/null
+++ b/TAO/tao/ChangeLog
@@ -0,0 +1,64 @@
+Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Cache_Entries.cpp: Did a cosmetic fix.
+
+ * tao/Cache_Entries.h:
+ * tao/Cache_Entries.inl: Added a new method incr_index () to the
+ TAO_CacheExtId class.
+
+ * tao/Resource_Factory.cpp:
+ * tao/Resource_Factory.h:Added two new methods,
+ max_muxed_connections () and locked_transport_cache (). The
+ former returns the number of user specified muxed connections
+ with a particular property. The latter returns a boolean value
+ to indicate whether the transport cache needs to have a lock or
+ not .
+
+ * tao/default_resource.cpp:
+ * tao/default_resource.h: Concrete implementations for the methods
+ declared in Resource_Factory.
+
+ * tao/Transport_Cache_Manager.cpp:
+ * tao/Transport_Cache_Manager.h:
+ * tao/Transport_Cache_Manager.inl: Added support for counted muxed
+ connections ie. the user can limit the number of remote
+ connections (with a particular property). If the thread
+ searching the cache for a free connection doesnt find one that
+ is free, the thread will wait on a condition variable for the
+ connection to be released. To accomodate this the following set
+ of changes were made
+
+ - Added a condition variable to the class
+
+ - Create the type of lock that needs to be used for the cache
+ locally and use that lock to create the condition variable. If
+ the cache is free of any locks we dont create a condition
+ variable.
+
+ - The creation of lock has been moved to the
+ Transport_Cache_Manager from the Resource_Factory.
+
+ - Added two new methods, wait_for_connection () and
+ is_wakeup_useful (). The wait_for_connection () blocks the
+ thread searching for connection, if there is a limit on the
+ number of muxed connections and if there have been enough
+ connections created with the same property. The
+ is_wakeup_useful () method is called by an unblocked thread to
+ check whether the transport that was returned is the one the
+ thread was waiting for.
+
+ - Removed a version of rebind () and a couple of version of
+ unbind () calls as they were not used.
+
+
+ * tao/Condition.h:
+ * tao/Condition.cpp:.
+ * tao/Condition.inl: A simple wrapper that wraps a
+ TAO_SYNCH_CONDITION that can be used with different types of
+ Mutex classes.
+
+
+
+
+
+
diff --git a/TAO/tao/Condition.cpp b/TAO/tao/Condition.cpp
new file mode 100644
index 00000000000..d754007a77d
--- /dev/null
+++ b/TAO/tao/Condition.cpp
@@ -0,0 +1,56 @@
+#include "Condition.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "tao/Condition.inl"
+#endif /* __ACE_INLINE__ */
+
+
+ACE_RCSID (TAO,
+ Condition,
+ "$Id$")
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::TAO_Condition (MUTEX &m)
+
+ : mutex_ (&m),
+ delete_lock_ (0),
+ cond_ (0)
+{
+ // @@todo: Need to add the allocatore here..
+ ACE_NEW (this->cond_,
+ TAO_SYNCH_CONDITION (*this->mutex_));
+}
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::TAO_Condition (void)
+ : mutex_ (0),
+ delete_lock_ (0),
+ cond_ (0)
+
+{
+ // @@todo: Need to add the allocatore here..
+
+ ACE_NEW (this->mutex_,
+ MUTEX);
+
+ this->delete_lock_ = 1;
+
+ ACE_NEW (this->cond_,
+ TAO_SYNCH_CONDITION (*this->mutex_));
+}
+
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::~TAO_Condition (void)
+{
+ if (this->remove () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("TAO_Condition::~TAO_Condition")));
+
+ delete this->cond_;
+
+ if (this->delete_lock_)
+ delete this->mutex_;
+}
diff --git a/TAO/tao/Condition.h b/TAO/tao/Condition.h
new file mode 100644
index 00000000000..b9b85a4c817
--- /dev/null
+++ b/TAO/tao/Condition.h
@@ -0,0 +1,115 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file Condition.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_CONDITION_H
+#define TAO_CONDITION_H
+#include "ace/pre.h"
+
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+/**
+ * @class TAO_Condition
+ *
+ * @brief Same as to the ACE_Condition variable wrapper
+ *
+ * This class differs from ACE_Condition in that it uses a
+ * TAO_SYNCH_CONDITION instead of ACE_cond_t under the hood to
+ * provide blocking.
+ */
+template <class MUTEX>
+class TAO_Condition
+{
+public:
+
+ /// Useful typedef
+ typedef MUTEX LOCK;
+
+ // = Initialiation and termination methods.
+ /// Initialize the condition variable.
+ TAO_Condition (MUTEX &m);
+
+ /// A default constructor. Since no lock is provided by the user,
+ /// one will be created internally.
+ TAO_Condition (void);
+
+ /// Implicitly destroy the condition variable.
+ ~TAO_Condition (void);
+
+ // = Lock accessors.
+ /**
+ * Block on condition, or until absolute time-of-day has passed. If
+ * abstime == 0 use "blocking" <wait> semantics. Else, if <abstime>
+ * != 0 and the call times out before the condition is signaled
+ * <wait> returns -1 and sets errno to ETIME.
+ */
+ int wait (const ACE_Time_Value *abstime);
+
+ /// Block on condition.
+ int wait (void);
+
+ /**
+ * Block on condition or until absolute time-of-day has passed. If
+ * abstime == 0 use "blocking" wait() semantics on the <mutex>
+ * passed as a parameter (this is useful if you need to store the
+ * <Condition> in shared memory). Else, if <abstime> != 0 and the
+ * call times out before the condition is signaled <wait> returns -1
+ * and sets errno to ETIME.
+ */
+ int wait (MUTEX &mutex, const ACE_Time_Value *abstime = 0);
+
+ /// Signal one waiting thread.
+ int signal (void);
+
+ /// Signal *all* waiting threads.
+ int broadcast (void);
+
+ // = Utility methods.
+ /// Explicitly destroy the condition variable.
+ int remove (void);
+
+ /// Returns a reference to the underlying mutex_;
+ MUTEX *mutex (void);
+
+private:
+
+ /// Reference to mutex lock.
+ MUTEX *mutex_;
+
+ /// A flag to indicate whether the lock needs to be deleted.
+ int delete_lock_;
+
+ /// Condition variable.
+ TAO_SYNCH_CONDITION *cond_;
+
+ // = Prevent assignment and initialization.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Condition<MUTEX> &))
+ ACE_UNIMPLEMENTED_FUNC (TAO_Condition (const TAO_Condition<MUTEX> &))
+};
+
+#if defined (__ACE_INLINE__)
+#include "Condition.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Condition.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Condition.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /*TAO_CONDITION_H*/
diff --git a/TAO/tao/Condition.inl b/TAO/tao/Condition.inl
new file mode 100644
index 00000000000..089bc0d54be
--- /dev/null
+++ b/TAO/tao/Condition.inl
@@ -0,0 +1,50 @@
+/* -*- C++ -*- */
+//$Id$
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (void)
+{
+ return this->cond_->wait ();
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (MUTEX &mutex,
+ const ACE_Time_Value *abstime)
+{
+ return this->cond_->wait (mutex,
+ abstime);
+}
+
+// Peform an "alertable" timed wait. If the argument ABSTIME == 0
+// then we do a regular cond_wait(), else we do a timed wait for up to
+// ABSTIME using the Solaris cond_timedwait() function.
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (const ACE_Time_Value *abstime)
+{
+ return this->wait (this->mutex_, abstime);
+}
+
+template<class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::remove (void)
+{
+ this->cond_->remove ();
+}
+
+template<class MUTEX> ACE_INLINE MUTEX *
+TAO_Condition<MUTEX>::mutex (void)
+{
+ // ACE_TRACE ("ACE_Condition<MUTEX>::mutex");
+ return this->mutex_;
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::signal (void)
+{
+ return this->cond_->signal ();
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::broadcast (void)
+{
+ return this->cond_->broadcast ();
+}
diff --git a/TAO/tao/Resource_Factory.cpp b/TAO/tao/Resource_Factory.cpp
index 62795db02ba..9964e39bdb8 100644
--- a/TAO/tao/Resource_Factory.cpp
+++ b/TAO/tao/Resource_Factory.cpp
@@ -162,6 +162,13 @@ TAO_Resource_Factory::purge_percentage (void) const
}
int
+TAO_Resource_Factory::max_muxed_connections (void) const
+{
+ return 0;
+}
+
+
+int
TAO_Resource_Factory::get_parser_names (char **&,
int &)
{
@@ -175,6 +182,13 @@ TAO_Resource_Factory::create_cached_connection_lock (void)
}
int
+TAO_Resource_Factory::locked_transport_cache (void)
+{
+ return 0;
+}
+
+
+int
TAO_Resource_Factory::load_default_protocols (void)
{
return 0;
diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h
index 9b4694a4d66..470971e6ff1 100644
--- a/TAO/tao/Resource_Factory.h
+++ b/TAO/tao/Resource_Factory.h
@@ -178,12 +178,22 @@ public:
/// cache.
virtual int purge_percentage (void) const;
+ /// Return the number of muxed connections that are allowed for a
+ /// remote endpoint
+ virtual int max_muxed_connections (void) const;
+
virtual int get_parser_names (char **&names,
int &number_of_names);
/// Creates the lock for the lock needed in the Cache Map
+ /// @@todo: This method needs to go away as it doesnt make much
+ /// sense now.
virtual ACE_Lock *create_cached_connection_lock (void);
+ /// Should the transport cache have a lock or not? Return 1 if the
+ /// transport cache needs to be locked else return 0
+ virtual int locked_transport_cache (void);
+
/// Creates the flushing strategy. The new instance is owned by the
/// caller.
virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0;
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp
index 30b54ed2744..6e89c48a75e 100644
--- a/TAO/tao/Transport_Cache_Manager.cpp
+++ b/TAO/tao/Transport_Cache_Manager.cpp
@@ -6,7 +6,8 @@
#include "tao/ORB_Core.h"
#include "tao/Resource_Factory.h"
#include "tao/Connection_Purging_Strategy.h"
-
+#include "tao/Condition.h"
+#include "ace/Synch_T.h"
#include "ace/Handle_Set.h"
#if !defined (__ACE_INLINE__)
@@ -18,23 +19,65 @@ ACE_RCSID (TAO,
Transport_Cache_Manager,
"$Id$")
+
TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core)
: percent_ (orb_core.resource_factory ()->purge_percentage ()),
purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()),
cache_map_ (),
- cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ())
+ condition_ (0),
+ cache_lock_ (0),
+ muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()),
+ no_waiting_threads_ (0),
+ last_entry_returned_ (0)
{
+ if (orb_core.resource_factory ()->locked_transport_cache ())
+ {
+ ACE_NEW (this->condition_,
+ TAO_Condition <TAO_SYNCH_MUTEX>);
+
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ()));
+ }
+ else
+ {
+ /// If the cache is not going to be locked then dont create a
+ /// condition variable. Make the <muxed_number_> to 0, else a
+ /// single thread could get into waiting mode
+ this->muxed_number_ = 0;
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>);
+ }
}
TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void)
{
+ // Wakeup all the waiting threads threads before we shutdown stuff
+ if (this->no_waiting_threads_)
+ this->condition_->broadcast ();
+
// Delete the lock that we have
- delete this->cache_lock_;
+ if (this->cache_lock_)
+ {
+ delete this->cache_lock_;
+ this->cache_lock_ = 0;
+ }
// Delete the purging strategy
- delete this->purging_strategy_;
+ if (this->purging_strategy_)
+ {
+ delete this->purging_strategy_;
+ this->purging_strategy_ = 0;
+ }
+
+ // Delete the condition variable
+ if (this->condition_)
+ {
+ delete this->condition_;
+ this->condition_ = 0;
+ }
}
+
int
TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
TAO_Cache_IntId &int_id)
@@ -65,8 +108,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
{
// The entry has been added to cache succesfully
// Add the cache_map_entry to the transport
- int_id.transport () ->cache_map_entry (entry);
-
+ int_id.transport ()->cache_map_entry (entry);
}
else if (retval == 1)
{
@@ -100,13 +142,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
return retval;
}
-// Used to be in the .inl, but moved here b/c the use of
-// TAO_Transport::_duplicate() caused an include file cycle with
-// inlining turned on.
int
TAO_Transport_Cache_Manager::find_transport (
- TAO_Transport_Descriptor_Interface *prop,
- TAO_Transport *&transport)
+ TAO_Transport_Descriptor_Interface *prop,
+ TAO_Transport *&transport)
{
if (prop == 0)
{
@@ -129,6 +168,29 @@ TAO_Transport_Cache_Manager::find_transport (
}
int
+TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key,
+ TAO_Cache_IntId &value)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->cache_lock_,
+ -1));
+
+ int status = this->find_i (key,
+ value);
+
+ if (status == 0)
+ {
+ // Update the purging strategy information while we
+ // are holding our lock
+ this->purging_strategy_->update_item (value.transport ());
+ }
+ return status;
+}
+
+
+
+int
TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value)
{
@@ -137,20 +199,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
// Get the entry from the Hash Map
int retval = 0;
- // Get the index of the <key>
- int index = key.index ();
-
// Make a temporary object. It does not do a copy.
TAO_Cache_ExtId tmp_key (key.property ());
while (retval == 0)
{
+ // Wait for a connection..
+ this->wait_for_connection (tmp_key);
+
// Look for an entry in the map
retval = this->cache_map_.find (tmp_key,
entry);
- // We have an entry in the map, check whether it is idle. If it
- // is idle it would be marked as busy.
+ // We have an entry in the map, check whether it is idle.
if (entry)
{
CORBA::Boolean idle =
@@ -176,7 +237,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
// Bump the index up
- tmp_key.index (++index);
+ tmp_key.incr_index ();
}
// If we are here then it is an error
@@ -191,32 +252,13 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
int
-TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value)
-{
- return this->cache_map_.rebind (key,
- value);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key)
-{
- return this->cache_map_.unbind (key);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- return this->cache_map_.unbind (key,
- value);
-}
-
-int
TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
{
// First get the entry again (if at all things had changed in the
// cache map in the mean time)
+
+ // @todo: Is this required? Looks like a legacy one..
+
HASH_MAP_ENTRY *new_entry = 0;
int retval = this->cache_map_.find (entry->ext_id_,
new_entry);
@@ -227,6 +269,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE);
entry = new_entry;
+
+ // Does any one need waking?
+ if (this->no_waiting_threads_)
+ {
+ // We returned this entry to the map
+ this->last_entry_returned_ = &new_entry->ext_id_;
+
+ // Wake up a thread
+ this->condition_->signal ();
+ }
}
else if (TAO_debug_level > 0 && retval != 0)
{
@@ -252,23 +304,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered,
if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED)
{
-#if 0
- // @@ This code from Connection_Cache_Manager disappeared
- // during the changeover; we need the functional equivalent back.
- // The problem is that with the locking stuff that we're putting
- // in to the Transport, we might want to encapsulate the whole
- // exercise of adding to the handle set in a method on the transport
- // rather than doing it here. That way, the locking is correct.
- if ((*iter).int_id_.handler ()->is_registered ())
- {
- reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ());
- }
-#else
// Get the transport to fill its associated connection's handle in
// the handle sets.
(*iter).int_id_.transport ()->provide_handle (reactor_registered,
unregistered);
-#endif
// Inform the transport that has a reference to the entry in the
// map that we are *gone* now. So, the transport should not use
// the reference to the entry that he has, to acces us *at any
@@ -497,6 +536,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set,
sorted_set = 0;
}
+int
+TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid)
+{
+ if (this->muxed_number_ &&
+ this->muxed_number_ == extid.index ())
+ {
+ // If we have a limit on the number of muxed connections for
+ // a particular endpoint just wait to get the connection
+ ++this->no_waiting_threads_;
+
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Going to wait for connections.. \n"));
+
+ int ready_togo = 0;
+
+ while (ready_togo == 0)
+ {
+ this->condition_->wait ();
+
+ // Check whether we are waiting for this connection
+ ready_togo = this->is_wakeup_useful (extid);
+ }
+
+ // We are not waiting anymore
+ --this->no_waiting_threads_;
+ }
+
+ return 0;
+}
+
+int
+TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid)
+{
+ // Get the underlying property that we are looking for
+ TAO_Transport_Descriptor_Interface *prop = extid.property ();
+
+ // Just check the underlying property for equivalence. If the last
+ // connection that was returned had the same property just return
+ // 1.
+ if (this->last_entry_returned_ &&
+ prop->is_equivalent (this->last_entry_returned_->property ()))
+ {
+ // Set the index to be right so that we can pick teh connection
+ // right away..
+ extid.index (this->last_entry_returned_->index ());
+
+ // There is no more use for it ...
+ this->last_entry_returned_ = 0;
+
+ return 1;
+ }
+
+ // If there is an entry that was returned and if there are more
+ // threads just wake up the peer to check for the returned
+ // connection.
+ if (this->last_entry_returned_ &&
+ this->no_waiting_threads_ > 1)
+ {
+ this->condition_->signal ();
+ }
+
+ return 0;
+}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/Transport_Cache_Manager.h b/TAO/tao/Transport_Cache_Manager.h
index 818bc643a42..c2fa52aa903 100644
--- a/TAO/tao/Transport_Cache_Manager.h
+++ b/TAO/tao/Transport_Cache_Manager.h
@@ -15,16 +15,15 @@
#define TAO_CONNECTION_CACHE_MANAGER_H
#include "ace/pre.h"
-#include "ace/Hash_Map_Manager_T.h"
+#include "tao/Cache_Entries.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
#define ACE_LACKS_PRAGMA_ONCE
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "ace/Hash_Map_Manager_T.h"
+#include "ace/Synch_T.h"
-#include "tao/TAO_Export.h"
-#include "tao/Cache_Entries.h"
-
-#include "tao/Connection_Purging_Strategy.h"
#if defined(_MSC_VER)
#if (_MSC_VER >= 1200)
@@ -36,7 +35,9 @@
class TAO_ORB_Core;
class ACE_Handle_Set;
class TAO_Resource_Factory;
+class TAO_Connection_Purging_Strategy;
+template <class ACE_COND_MUTEX> class TAO_Condition;
typedef ACE_Unbounded_Set<ACE_Event_Handler*> TAO_EventHandlerSet;
typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*>
@@ -46,22 +47,18 @@ typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*>
* @class TAO_Transport_Cache_Manager
*
* @brief The Transport Cache Manager for TAO
+ *
+ * This class provides interfaces associating a TAO_Cache_ExtId
+ * & TAO_Cache_IntId. This class is wrapper around the
+ * ACE_Hash_Map_Manager class which is used as a container to Cache
+ * the connections. This class protects the entries with a lock. The
+ * map is updated only by holding the lock. The more compelling reason
+ * to have the lock in this class and not in the Hash_Map is that, we
+ * do quite a bit of work in this class for which we need a lock.
+ *
*/
class TAO_Export TAO_Transport_Cache_Manager
{
-
- // = DESCRIPTION
- // This class provides interfaces associating a TAO_Cache_ExtId
- // & TAO_Cache_IntId. This class manages a ACE_Hash_Map_Manager
- // class which is used as a container to Cache the
- // connections. This class protects the entries with a lock. The
- // map can be updated only by holding the lock.
-
- // General Note: This class at present has an interface that may
- // not be needed. But, the interface has just been copied from
- // the ACE Hash Map Manager classes. The interface wold be
- // pruned once I get the purging stuff also in. Till then let
- // the interface be there as it is.
public:
// Some useful typedef's
@@ -79,6 +76,8 @@ public:
TAO_Cache_IntId>
HASH_MAP_ENTRY;
+ typedef TAO_Condition<TAO_SYNCH_MUTEX> CONDITION;
+
// == Public methods
/// Constructor
@@ -133,19 +132,6 @@ private:
int find (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value);
- /// Reassociate the <key> with <value>. Grabs the lock and calls the
- /// implementation function find_i.
- int rebind (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value);
-
- /// Remove <key> from the cache.
- int unbind (const TAO_Cache_ExtId &key);
-
- /// Remove <key> from the cache, and return the <value> associated with
- /// <key>.
- int unbind (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value);
-
/**
* Non-Locking version and actual implementation of bind ()
* call. Calls bind on the Hash_Map_Manager that it holds. If the
@@ -166,17 +152,6 @@ private:
int find_i (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value);
- /// Non-locking version and actual implementation of rebind () call
- int rebind_i (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value);
-
- /// Non-locking version and actual implementation of unbind () call
- int unbind_i (const TAO_Cache_ExtId &key);
-
- /// Non-locking version and actual implementation of unbind () call
- int unbind_i (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value);
-
/// Non-locking version and actual implementation of make_idle ().
int make_idle_i (HASH_MAP_ENTRY *&entry);
@@ -229,7 +204,17 @@ private:
/// the required number of items in the set.
void close_entries (DESCRIPTOR_SET& sorted_set, int size);
+ /// Wait for connections if we have reached the limit on the number
+ /// of muxed connections. If not (ie. if we dont use a muxed
+ /// connection or if we have not reached the limit) this just
+ /// behaves as a no-op. <extid> has all the information about the
+ /// connection that is being searched.
+ int wait_for_connection (TAO_Cache_ExtId &extid);
+
+ /// Is the wakeup useful todo some work?
+ int is_wakeup_useful (TAO_Cache_ExtId &extid);
private:
+
/// The percentage of the cache to purge at one time
int percent_;
@@ -239,8 +224,23 @@ private:
/// The hash map that has the connections
HASH_MAP cache_map_;
- /// Lock for the map
+ /// The condition variable
+ CONDITION *condition_;
+
+ /// The lock that is used by the cache map
ACE_Lock *cache_lock_;
+
+ /// Number of allowed muxed connections
+ CORBA::ULong muxed_number_;
+
+ /// Number of threads waiting for connections
+ int no_waiting_threads_;
+
+ /// This is for optimization purposes. In a situation where number
+ /// of threads are waiting for connections, the last connection that
+ /// is put back is cached here. This should prevent all th threads
+ /// trying to search for their required entry.
+ TAO_Cache_ExtId *last_entry_returned_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Transport_Cache_Manager.inl b/TAO/tao/Transport_Cache_Manager.inl
index 0840e7961c9..2f44d1d2543 100644
--- a/TAO/tao/Transport_Cache_Manager.inl
+++ b/TAO/tao/Transport_Cache_Manager.inl
@@ -15,26 +15,7 @@ TAO_Transport_Cache_Manager::bind (TAO_Cache_ExtId &ext_id,
}
-ACE_INLINE int
-TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
- int status = this->find_i (key,
- value);
-
- if (status == 0)
- {
- // Update the purging strategy information while we
- // are holding our lock
- this->purging_strategy_->update_item (value.transport ());
- }
- return status;
-}
ACE_INLINE int
@@ -52,44 +33,6 @@ TAO_Transport_Cache_Manager::cache_transport (
}
ACE_INLINE int
-TAO_Transport_Cache_Manager::rebind (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->rebind_i (key,
- value);
-}
-
-ACE_INLINE int
-TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->unbind_i (key);
-}
-
-ACE_INLINE int
-TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->unbind_i (key,
- value);
-}
-
-
-ACE_INLINE int
TAO_Transport_Cache_Manager::purge (void)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0));
diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp
index fb23a7c068f..ffecd31ed47 100644
--- a/TAO/tao/default_resource.cpp
+++ b/TAO/tao/default_resource.cpp
@@ -40,6 +40,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void)
connection_caching_type_ (TAO_CONNECTION_CACHING_STRATEGY),
cache_maximum_ (TAO_CONNECTION_CACHE_MAXIMUM),
purge_percentage_ (TAO_PURGE_PERCENT),
+ max_muxed_connections_ (0),
reactor_mask_signals_ (1),
dynamically_allocated_reactor_ (0),
options_processed_ (0),
@@ -351,6 +352,17 @@ TAO_Default_Resource_Factory::init (int argc, char *argv[])
this->report_option_value_error ("-ORBFlushingStrategy", name);
}
}
+ else if (ACE_OS::strcasecmp (argv[curarg],
+ "-ORBMuxedConnectionMax") == 0)
+ {
+ curarg++;
+ if (curarg < argc)
+ this->max_muxed_connections_ =
+ ACE_OS::atoi (argv[curarg]);
+ else
+ this->report_option_value_error ("-ORBMuxedConnectionMax",
+ argv[curarg]);
+ }
else if (ACE_OS::strncmp (argv[curarg], "-ORB", 4) == 0)
{
// Can we assume there is an argument after the option?
@@ -738,6 +750,13 @@ TAO_Default_Resource_Factory::purge_percentage (void) const
return this->purge_percentage_;
}
+int
+TAO_Default_Resource_Factory::max_muxed_connections (void) const
+{
+ return this->max_muxed_connections_;
+}
+
+
ACE_Lock *
TAO_Default_Resource_Factory::create_cached_connection_lock (void)
{
@@ -755,6 +774,16 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void)
return the_lock;
}
+int
+TAO_Default_Resource_Factory::locked_transport_cache (void)
+{
+ if (this->cached_connection_lock_type_ == TAO_NULL_LOCK)
+ return 0;
+
+ return 1;
+}
+
+
TAO_Flushing_Strategy *
TAO_Default_Resource_Factory::create_flushing_strategy (void)
{
diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h
index e7097046bef..ee71341e4b8 100644
--- a/TAO/tao/default_resource.h
+++ b/TAO/tao/default_resource.h
@@ -116,7 +116,9 @@ public:
virtual TAO_Resource_Factory::Caching_Strategy connection_caching_strategy_type (void) const;
virtual int cache_maximum (void) const;
virtual int purge_percentage (void) const;
+ virtual int max_muxed_connections (void) const;
virtual ACE_Lock *create_cached_connection_lock (void);
+ virtual int locked_transport_cache (void);
virtual TAO_Flushing_Strategy *create_flushing_strategy (void);
virtual TAO_Connection_Purging_Strategy *create_purging_strategy (void);
virtual TAO_LF_Strategy *create_lf_strategy (void);
@@ -169,6 +171,11 @@ protected:
/// demand.
int purge_percentage_;
+ /// Specifies the limit on the number of muxed connections
+ /// allowed per-property for the ORB. A value of 0 indicates no
+ /// limit
+ size_t max_muxed_connections_;
+
/// If <0> then we create reactors with signal handling disabled.
int reactor_mask_signals_;