summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-04-04 04:26:10 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-04-04 04:26:10 +0000
commit725e73703a67326ed06452ca975b5378ad84c03b (patch)
tree866f51c285d8f0d3c1cd2a53a0194d493825e847
parentddf00484367e672294353272d1c670964963ba00 (diff)
downloadATCD-725e73703a67326ed06452ca975b5378ad84c03b.tar.gz
ChangeLogTag: Wed Apr 3 22:18:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a107
-rw-r--r--TAO/tao/Cache_Entries.cpp15
-rw-r--r--TAO/tao/Cache_Entries.h6
-rw-r--r--TAO/tao/Cache_Entries.inl6
-rw-r--r--TAO/tao/ChangeLog96
-rw-r--r--TAO/tao/Condition.cpp56
-rw-r--r--TAO/tao/Condition.h115
-rw-r--r--TAO/tao/Condition.inl50
-rw-r--r--TAO/tao/Exclusive_TMS.cpp18
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp39
-rw-r--r--TAO/tao/IIOP_Transport.cpp158
-rw-r--r--TAO/tao/Invocation.cpp6
-rw-r--r--TAO/tao/Muxed_TMS.cpp27
-rw-r--r--TAO/tao/Muxed_TMS.h13
-rw-r--r--TAO/tao/Resource_Factory.cpp15
-rw-r--r--TAO/tao/Resource_Factory.h10
-rw-r--r--TAO/tao/Transport.cpp47
-rw-r--r--TAO/tao/Transport.h4
-rw-r--r--TAO/tao/Transport_Cache_Manager.cpp208
-rw-r--r--TAO/tao/Transport_Cache_Manager.h88
-rw-r--r--TAO/tao/Transport_Cache_Manager.inl60
-rw-r--r--TAO/tao/default_resource.cpp35
-rw-r--r--TAO/tao/default_resource.h7
23 files changed, 896 insertions, 290 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 8cf5f0d3f21..c3fda4b0d8d 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,110 @@
+Wed Apr 3 22:18:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ Changes to wide variety of stuff from my branch. The checkin
+ contains the following:
+
+ 1. Couple of bug fixes (#1129 and #1164)
+ 2. A new policy to regulate the number of connections allowed by
+ the ORB for every QoS property.
+
+ The details of the checkin follows
+
+ Sun Mar 24 08:57:37 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.cpp: Fix for bug 1164. When big messages are
+ being read, read it in a loop till you get all the bytes or till
+ you get an error. We used to do only one read before and looks
+ like this had performance impacts. Thanks to James Kanyok
+ <james.kanyok@lmco.com> for reporting the problem. This will go
+ into the main trunk once we get a feedback from James.
+
+ Tue Feb 19 07:45:05 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/IIOP_Connection_Handler.cpp: Added a fix for Jody ie. bug
+ #1129. Yet, to get a reply whether the fix is right or
+ wrong. Now, we close the socket with the first call to
+ handle_close () and delete it when the upcall count gets to
+ zero.
+
+ Mon Feb 11 14:03:18 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.h: Removed the virtual declaration from the method
+ bidirectional_flag (). It shouldnt have been virtual to start
+ with.
+
+ * tao/Muxed_TMS.h: Removed some old comments
+
+ * tao/Muxed_TMS.cpp (request_id):
+ * tao/Exclusive_TMS.cpp: Added a condition within request_id
+ (). The request_id that is generated will obey BiDirGIOP rules,
+ ie. the originator will send even number requests and the
+ receiver send odd numbered requests. The old method of
+ generation was giving problems in MT cases.
+
+ * tao/Invocation.cpp: Minor cosmetic changes.
+ * tao/IIOP_Transport.cpp: Modify the request id only for the first
+ request during BiDir connection origination.
+
+
+ Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Cache_Entries.cpp: Did a cosmetic fix.
+
+ * tao/Cache_Entries.h:
+ * tao/Cache_Entries.inl: Added a new method incr_index () to the
+ TAO_CacheExtId class.
+
+ * tao/Resource_Factory.cpp:
+ * tao/Resource_Factory.h:Added two new methods,
+ max_muxed_connections () and locked_transport_cache (). The
+ former returns the number of user specified muxed connections
+ with a particular property. The latter returns a boolean value
+ to indicate whether the transport cache needs to have a lock or
+ not .
+
+ * tao/default_resource.cpp:
+ * tao/default_resource.h: Concrete implementations for the methods
+ declared in Resource_Factory.
+
+ * tao/Transport_Cache_Manager.cpp:
+ * tao/Transport_Cache_Manager.h:
+ * tao/Transport_Cache_Manager.inl: Added support for counted muxed
+ connections ie. the user can limit the number of remote
+ connections (with a particular property). If the thread
+ searching the cache for a free connection doesnt find one that
+ is free, the thread will wait on a condition variable for the
+ connection to be released. To accomodate this the following set
+ of changes were made
+
+ - Added a condition variable to the class
+
+ - Create the type of lock that needs to be used for the cache
+ locally and use that lock to create the condition variable. If
+ the cache is free of any locks we dont create a condition
+ variable.
+
+ - The creation of lock has been moved to the
+ Transport_Cache_Manager from the Resource_Factory.
+
+ - Added two new methods, wait_for_connection () and
+ is_wakeup_useful (). The wait_for_connection () blocks the
+ thread searching for connection, if there is a limit on the
+ number of muxed connections and if there have been enough
+ connections created with the same property. The
+ is_wakeup_useful () method is called by an unblocked thread to
+ check whether the transport that was returned is the one the
+ thread was waiting for.
+
+ - Removed a version of rebind () and a couple of version of
+ unbind () calls as they were not used.
+
+
+ * tao/Condition.h:
+ * tao/Condition.cpp:.
+ * tao/Condition.inl: A simple wrapper that wraps a
+ TAO_SYNCH_CONDITION that can be used with different types of
+ Mutex classes.
+
Tue Apr 3 12:44:51 UTC 2002 Don Hinton <dhinton@ieee.org>
* tao/docs/pluggable_protocols/index.html:
diff --git a/TAO/tao/Cache_Entries.cpp b/TAO/tao/Cache_Entries.cpp
index 9c7e17f0273..df610a378f6 100644
--- a/TAO/tao/Cache_Entries.cpp
+++ b/TAO/tao/Cache_Entries.cpp
@@ -24,13 +24,14 @@ TAO_Cache_IntId::~TAO_Cache_IntId (void)
TAO_Cache_IntId&
TAO_Cache_IntId::operator= (const TAO_Cache_IntId &rhs)
{
- if (this != &rhs) {
- this->recycle_state_ = rhs.recycle_state_;
+ if (this != &rhs)
+ {
+ this->recycle_state_ = rhs.recycle_state_;
+
+ TAO_Transport* old_transport = this->transport_;
+ this->transport_ = TAO_Transport::_duplicate (rhs.transport_);
+ TAO_Transport::release (old_transport);
+ }
- TAO_Transport* old_transport = this->transport_;
- this->transport_ = TAO_Transport::_duplicate (rhs.transport_);
- TAO_Transport::release (old_transport);
- }
return *this;
}
-
diff --git a/TAO/tao/Cache_Entries.h b/TAO/tao/Cache_Entries.h
index 0907db2f944..4a616d7b664 100644
--- a/TAO/tao/Cache_Entries.h
+++ b/TAO/tao/Cache_Entries.h
@@ -6,6 +6,7 @@
*
* $Id$
*
+ *
* @author Bala Natarajan <bala@cs.wustl.edu>
*/
//=============================================================================
@@ -106,7 +107,7 @@ private:
* <value> for a <key> in a hash table holding the state of the
* Transport Cache.
*/
-class TAO_Cache_ExtId
+class TAO_Export TAO_Cache_ExtId
{
public:
@@ -147,6 +148,9 @@ public:
/// but for the TAO_Transport_Cache_Manager class.
void index (CORBA::ULong index);
+ /// Increment the index value
+ void incr_index (void);
+
// = Accessors
/// Get the underlying the property pointer
TAO_Transport_Descriptor_Interface *property (void) const;
diff --git a/TAO/tao/Cache_Entries.inl b/TAO/tao/Cache_Entries.inl
index 1e6bbc118fb..ec19b1ed1e1 100644
--- a/TAO/tao/Cache_Entries.inl
+++ b/TAO/tao/Cache_Entries.inl
@@ -180,6 +180,12 @@ TAO_Cache_ExtId::index (CORBA::ULong index)
this->index_ = index;
}
+ACE_INLINE void
+TAO_Cache_ExtId::incr_index (void)
+{
+ ++this->index_;
+}
+
ACE_INLINE TAO_Transport_Descriptor_Interface *
TAO_Cache_ExtId::property (void) const
{
diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog
new file mode 100644
index 00000000000..e581b088baa
--- /dev/null
+++ b/TAO/tao/ChangeLog
@@ -0,0 +1,96 @@
+Sun Mar 24 08:57:37 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.cpp: Fix for bug 1164. When big messages are
+ being read, read it in a loop till you get all the bytes or till
+ you get an error. We used to do only one read before and looks
+ like this had performance impacts. Thanks to James Kanyok
+ <james.kanyok@lmco.com> for reporting the problem. This will go
+ into the main trunk once we get a feedback from James.
+
+Tue Feb 19 07:45:05 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/IIOP_Connection_Handler.cpp: Added a fix for Jody ie. bug
+ #1129. Yet, to get a reply whether the fix is right or
+ wrong. Now, we close the socket with the first call to
+ handle_close () and delete it when the upcall count gets to
+ zero.
+
+Mon Feb 11 14:03:18 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.h: Removed the virtual declaration from the method
+ bidirectional_flag (). It shouldnt have been virtual to start
+ with.
+
+ * tao/Muxed_TMS.h: Removed some old comments
+
+ * tao/Muxed_TMS.cpp (request_id):
+ * tao/Exclusive_TMS.cpp: Added a condition within request_id
+ (). The request_id that is generated will obey BiDirGIOP rules,
+ ie. the originator will send even number requests and the
+ receiver send odd numbered requests. The old method of
+ generation was giving problems in MT cases.
+
+ * tao/Invocation.cpp: Minor cosmetic changes.
+ * tao/IIOP_Transport.cpp: Modify the request id only for the first
+ request during BiDir connection origination.
+
+
+Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Cache_Entries.cpp: Did a cosmetic fix.
+
+ * tao/Cache_Entries.h:
+ * tao/Cache_Entries.inl: Added a new method incr_index () to the
+ TAO_CacheExtId class.
+
+ * tao/Resource_Factory.cpp:
+ * tao/Resource_Factory.h:Added two new methods,
+ max_muxed_connections () and locked_transport_cache (). The
+ former returns the number of user specified muxed connections
+ with a particular property. The latter returns a boolean value
+ to indicate whether the transport cache needs to have a lock or
+ not .
+
+ * tao/default_resource.cpp:
+ * tao/default_resource.h: Concrete implementations for the methods
+ declared in Resource_Factory.
+
+ * tao/Transport_Cache_Manager.cpp:
+ * tao/Transport_Cache_Manager.h:
+ * tao/Transport_Cache_Manager.inl: Added support for counted muxed
+ connections ie. the user can limit the number of remote
+ connections (with a particular property). If the thread
+ searching the cache for a free connection doesnt find one that
+ is free, the thread will wait on a condition variable for the
+ connection to be released. To accomodate this the following set
+ of changes were made
+
+ - Added a condition variable to the class
+
+ - Create the type of lock that needs to be used for the cache
+ locally and use that lock to create the condition variable. If
+ the cache is free of any locks we dont create a condition
+ variable.
+
+ - The creation of lock has been moved to the
+ Transport_Cache_Manager from the Resource_Factory.
+
+ - Added two new methods, wait_for_connection () and
+ is_wakeup_useful (). The wait_for_connection () blocks the
+ thread searching for connection, if there is a limit on the
+ number of muxed connections and if there have been enough
+ connections created with the same property. The
+ is_wakeup_useful () method is called by an unblocked thread to
+ check whether the transport that was returned is the one the
+ thread was waiting for.
+
+ - Removed a version of rebind () and a couple of version of
+ unbind () calls as they were not used.
+
+
+ * tao/Condition.h:
+ * tao/Condition.cpp:.
+ * tao/Condition.inl: A simple wrapper that wraps a
+ TAO_SYNCH_CONDITION that can be used with different types of
+ Mutex classes.
+
diff --git a/TAO/tao/Condition.cpp b/TAO/tao/Condition.cpp
new file mode 100644
index 00000000000..d754007a77d
--- /dev/null
+++ b/TAO/tao/Condition.cpp
@@ -0,0 +1,56 @@
+#include "Condition.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "tao/Condition.inl"
+#endif /* __ACE_INLINE__ */
+
+
+ACE_RCSID (TAO,
+ Condition,
+ "$Id$")
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::TAO_Condition (MUTEX &m)
+
+ : mutex_ (&m),
+ delete_lock_ (0),
+ cond_ (0)
+{
+ // @@todo: Need to add the allocatore here..
+ ACE_NEW (this->cond_,
+ TAO_SYNCH_CONDITION (*this->mutex_));
+}
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::TAO_Condition (void)
+ : mutex_ (0),
+ delete_lock_ (0),
+ cond_ (0)
+
+{
+ // @@todo: Need to add the allocatore here..
+
+ ACE_NEW (this->mutex_,
+ MUTEX);
+
+ this->delete_lock_ = 1;
+
+ ACE_NEW (this->cond_,
+ TAO_SYNCH_CONDITION (*this->mutex_));
+}
+
+
+template <class MUTEX>
+TAO_Condition<MUTEX>::~TAO_Condition (void)
+{
+ if (this->remove () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("TAO_Condition::~TAO_Condition")));
+
+ delete this->cond_;
+
+ if (this->delete_lock_)
+ delete this->mutex_;
+}
diff --git a/TAO/tao/Condition.h b/TAO/tao/Condition.h
new file mode 100644
index 00000000000..b9b85a4c817
--- /dev/null
+++ b/TAO/tao/Condition.h
@@ -0,0 +1,115 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file Condition.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_CONDITION_H
+#define TAO_CONDITION_H
+#include "ace/pre.h"
+
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+/**
+ * @class TAO_Condition
+ *
+ * @brief Same as to the ACE_Condition variable wrapper
+ *
+ * This class differs from ACE_Condition in that it uses a
+ * TAO_SYNCH_CONDITION instead of ACE_cond_t under the hood to
+ * provide blocking.
+ */
+template <class MUTEX>
+class TAO_Condition
+{
+public:
+
+ /// Useful typedef
+ typedef MUTEX LOCK;
+
+ // = Initialiation and termination methods.
+ /// Initialize the condition variable.
+ TAO_Condition (MUTEX &m);
+
+ /// A default constructor. Since no lock is provided by the user,
+ /// one will be created internally.
+ TAO_Condition (void);
+
+ /// Implicitly destroy the condition variable.
+ ~TAO_Condition (void);
+
+ // = Lock accessors.
+ /**
+ * Block on condition, or until absolute time-of-day has passed. If
+ * abstime == 0 use "blocking" <wait> semantics. Else, if <abstime>
+ * != 0 and the call times out before the condition is signaled
+ * <wait> returns -1 and sets errno to ETIME.
+ */
+ int wait (const ACE_Time_Value *abstime);
+
+ /// Block on condition.
+ int wait (void);
+
+ /**
+ * Block on condition or until absolute time-of-day has passed. If
+ * abstime == 0 use "blocking" wait() semantics on the <mutex>
+ * passed as a parameter (this is useful if you need to store the
+ * <Condition> in shared memory). Else, if <abstime> != 0 and the
+ * call times out before the condition is signaled <wait> returns -1
+ * and sets errno to ETIME.
+ */
+ int wait (MUTEX &mutex, const ACE_Time_Value *abstime = 0);
+
+ /// Signal one waiting thread.
+ int signal (void);
+
+ /// Signal *all* waiting threads.
+ int broadcast (void);
+
+ // = Utility methods.
+ /// Explicitly destroy the condition variable.
+ int remove (void);
+
+ /// Returns a reference to the underlying mutex_;
+ MUTEX *mutex (void);
+
+private:
+
+ /// Reference to mutex lock.
+ MUTEX *mutex_;
+
+ /// A flag to indicate whether the lock needs to be deleted.
+ int delete_lock_;
+
+ /// Condition variable.
+ TAO_SYNCH_CONDITION *cond_;
+
+ // = Prevent assignment and initialization.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Condition<MUTEX> &))
+ ACE_UNIMPLEMENTED_FUNC (TAO_Condition (const TAO_Condition<MUTEX> &))
+};
+
+#if defined (__ACE_INLINE__)
+#include "Condition.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Condition.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Condition.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /*TAO_CONDITION_H*/
diff --git a/TAO/tao/Condition.inl b/TAO/tao/Condition.inl
new file mode 100644
index 00000000000..089bc0d54be
--- /dev/null
+++ b/TAO/tao/Condition.inl
@@ -0,0 +1,50 @@
+/* -*- C++ -*- */
+//$Id$
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (void)
+{
+ return this->cond_->wait ();
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (MUTEX &mutex,
+ const ACE_Time_Value *abstime)
+{
+ return this->cond_->wait (mutex,
+ abstime);
+}
+
+// Peform an "alertable" timed wait. If the argument ABSTIME == 0
+// then we do a regular cond_wait(), else we do a timed wait for up to
+// ABSTIME using the Solaris cond_timedwait() function.
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::wait (const ACE_Time_Value *abstime)
+{
+ return this->wait (this->mutex_, abstime);
+}
+
+template<class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::remove (void)
+{
+ this->cond_->remove ();
+}
+
+template<class MUTEX> ACE_INLINE MUTEX *
+TAO_Condition<MUTEX>::mutex (void)
+{
+ // ACE_TRACE ("ACE_Condition<MUTEX>::mutex");
+ return this->mutex_;
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::signal (void)
+{
+ return this->cond_->signal ();
+}
+
+template <class MUTEX> ACE_INLINE int
+TAO_Condition<MUTEX>::broadcast (void)
+{
+ return this->cond_->broadcast ();
+}
diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp
index 95cd9d1bd09..0f17f8eddca 100644
--- a/TAO/tao/Exclusive_TMS.cpp
+++ b/TAO/tao/Exclusive_TMS.cpp
@@ -27,11 +27,27 @@ TAO_Exclusive_TMS::~TAO_Exclusive_TMS (void)
CORBA::ULong
TAO_Exclusive_TMS::request_id (void)
{
+ this->request_id_generator_++;
+
+ // if TAO_Transport::bidirectional_flag_
+ // == 1 --> originating side
+ // == 0 --> other side
+ // == -1 --> no bi-directional connection was negotiated
+ // The originating side must have an even request ID, and the other
+ // side must have an odd request ID. Make sure that is the case.
+ int bidir_flag =
+ this->transport_->bidirectional_flag ();
+
+ if ((bidir_flag == 1 && ACE_ODD (this->request_id_generator_))
+ || (bidir_flag == 0 && ACE_EVEN (this->request_id_generator_)))
+ ++this->request_id_generator_;
+
if (TAO_debug_level > 4)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) TAO_Exclusive_TMS::request_id - <%d>\n"),
this->request_id_generator_));
- return this->request_id_generator_++;
+
+ return this->request_id_generator_;
}
// Bind the handler with the request id.
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index 50db44db5fb..5821c6ab8b4 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -22,6 +22,7 @@ ACE_RCSID (tao,
IIOP_Connection_Handler,
"$Id$")
+
TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (ACE_Thread_Manager *t)
: TAO_IIOP_SVC_HANDLER (t, 0 , 0),
TAO_Connection_Handler (0),
@@ -206,7 +207,19 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle,
long upcalls = this->decr_pending_upcalls ();
- ACE_ASSERT (upcalls >= 0);
+ // Just return incase the upcall count goes below 0.
+ if (upcalls < 0)
+ return 0;
+
+ if (this->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Just close the socket irrespective of what the upcall count
+ // is.
+ this->peer().close ();
+
+ // Set the handle to be INVALID_HANDLE
+ this->set_handle (ACE_INVALID_HANDLE);
+ }
// If the upcall count is zero start the cleanup.
if (upcalls == 0)
@@ -238,16 +251,13 @@ TAO_IIOP_Connection_Handler::handle_close_i (void)
}
// Close the handle..
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Remove the entry as it is invalid
- this->transport ()->purge_entry ();
+ // Remove the entry as it is invalid
+ this->transport ()->purge_entry ();
- // Signal the transport that we will no longer have
- // a reference to it. This will eventually call
- // TAO_Transport::release ().
- this->transport (0);
- }
+ // Signal the transport that we will no longer have
+ // a reference to it. This will eventually call
+ // TAO_Transport::release ().
+ this->transport (0);
// Follow usual Reactor-style lifecycle semantics and commit
// suicide.
@@ -353,8 +363,6 @@ TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE)
// The upcall is done. Bump down the reference count
upcalls = this->decr_pending_upcalls ();
- ACE_ASSERT (upcalls >= 0);
-
if (upcalls == 0)
{
this->handle_close_i ();
@@ -364,6 +372,13 @@ TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE)
// handle_close () which could be harmful.
retval = 0;
}
+ else if (upcalls < 0)
+ {
+ // As we have already performed the handle closing we dont want
+ // to return a -1. Doing so would make the reactor call
+ // handle_close () which could be harmful.
+ retval = 0;
+ }
if (retval == -1)
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index c4194fb4010..815d41e49b8 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -25,11 +25,12 @@
ACE_RCSID (tao, IIOP_Transport, "$Id$")
+
TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
- TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ TAO_ORB_Core *orb_core,
+ CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_IIOP_PROFILE,
- orb_core)
+ orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
@@ -37,13 +38,13 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Lite (orb_core));
+ TAO_GIOP_Message_Lite (orb_core));
}
else
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core));
}
}
@@ -66,11 +67,11 @@ TAO_IIOP_Transport::messaging_object (void)
ssize_t
TAO_IIOP_Transport::send_i (iovec *iov, int iovcnt,
- size_t &bytes_transferred,
- const ACE_Time_Value *max_wait_time)
+ size_t &bytes_transferred,
+ const ACE_Time_Value *max_wait_time)
{
ssize_t retval = this->connection_handler_->peer ().sendv (iov, iovcnt,
- max_wait_time);
+ max_wait_time);
if (retval > 0)
bytes_transferred = retval;
@@ -79,12 +80,12 @@ TAO_IIOP_Transport::send_i (iovec *iov, int iovcnt,
ssize_t
TAO_IIOP_Transport::recv_i (char *buf,
- size_t len,
- const ACE_Time_Value *max_wait_time)
+ size_t len,
+ const ACE_Time_Value *max_wait_time)
{
ssize_t n = this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
+ len,
+ max_wait_time);
// Do not print the error message if it is a timeout, which could
// occur in thread-per-connection.
@@ -93,16 +94,16 @@ TAO_IIOP_Transport::recv_i (char *buf,
errno != ETIME)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p \n"),
- ACE_TEXT ("TAO - read message failure ")
- ACE_TEXT ("recv_i () \n")));
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
}
// Error handling
if (n == -1)
{
if (errno == EWOULDBLOCK)
- return 0;
+ return 0;
return -1;
@@ -126,8 +127,8 @@ TAO_IIOP_Transport::register_handler_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - IIOP_Transport::register_handler %d\n",
- this->id ()));
+ "TAO (%P|%t) - IIOP_Transport::register_handler %d\n",
+ this->id ()));
}
ACE_Reactor *r = this->orb_core_->reactor ();
@@ -142,25 +143,25 @@ TAO_IIOP_Transport::register_handler_i (void)
// Register the handler with the reactor
return r->register_handler (this->connection_handler_,
- ACE_Event_Handler::READ_MASK);
+ ACE_Event_Handler::READ_MASK);
}
int
TAO_IIOP_Transport::send_request (TAO_Stub *stub,
- TAO_ORB_Core *orb_core,
- TAO_OutputCDR &stream,
- int two_way,
- ACE_Time_Value *max_wait_time)
+ TAO_ORB_Core *orb_core,
+ TAO_OutputCDR &stream,
+ int two_way,
+ ACE_Time_Value *max_wait_time)
{
if (this->ws_->sending_request (orb_core,
- two_way) == -1)
+ two_way) == -1)
return -1;
if (this->send_message (stream,
- stub,
- two_way,
- max_wait_time) == -1)
+ stub,
+ two_way,
+ max_wait_time) == -1)
return -1;
@@ -169,9 +170,9 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
- TAO_Stub *stub,
- int twoway,
- ACE_Time_Value *max_wait_time)
+ TAO_Stub *stub,
+ int twoway,
+ ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
if (this->messaging_object_->format_message (stream) != 0)
@@ -179,17 +180,17 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_i (stub,
- twoway,
- stream.begin (),
- max_wait_time);
+ twoway,
+ stream.begin (),
+ max_wait_time);
if (n == -1)
{
if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
- this->id (),
- ACE_TEXT ("send_message ()\n")));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
+ this->id (),
+ ACE_TEXT ("send_message ()\n")));
return -1;
}
@@ -199,8 +200,8 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
int
TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails,
- TAO_Target_Specification &spec,
- TAO_OutputCDR &msg)
+ TAO_Target_Specification &spec,
+ TAO_OutputCDR &msg)
{
// Check whether we have a Bi Dir IIOP policy set, whether the
// messaging objects are ready to handle bidirectional connections
@@ -214,24 +215,25 @@ TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails,
// Set the flag to 0 (i.e., originating side)
this->bidirectional_flag (0);
- }
- // Modify the request id if we have BiDirectional client/server
- // setup
- opdetails.modify_request_id (this->bidirectional_flag ());
+ // Modify the request id if we have BiDirectional client/server
+ // setup.
+ // @@ Is this needed at all?
+ opdetails.modify_request_id (this->bidirectional_flag ());
+ }
return TAO_Transport::generate_request_header (opdetails,
- spec,
- msg);
+ spec,
+ msg);
}
int
TAO_IIOP_Transport::messaging_init (CORBA::Octet major,
- CORBA::Octet minor)
+ CORBA::Octet minor)
{
this->messaging_object_->init (major,
- minor);
+ minor);
return 1;
}
@@ -272,12 +274,12 @@ TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
{
// Check whether it is a IIOP acceptor
if ((*acceptor)->tag () == TAO_TAG_IIOP_PROFILE)
- {
- // @@ Why isn't the return value checked!
- // -Ossama
- this->get_listen_point (listen_point_list,
- *acceptor);
- }
+ {
+ // @@ Why isn't the return value checked!
+ // -Ossama
+ this->get_listen_point (listen_point_list,
+ *acceptor);
+ }
}
// We have the ListenPointList at this point. Create a output CDR
@@ -291,7 +293,7 @@ TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
// Add this info in to the svc_list
opdetails.request_service_context ().set_context (IOP::BI_DIR_IIOP,
- cdr);
+ cdr);
return;
}
@@ -303,7 +305,7 @@ TAO_IIOP_Transport::get_listen_point (
{
TAO_IIOP_Acceptor *iiop_acceptor =
ACE_dynamic_cast (TAO_IIOP_Acceptor *,
- acceptor );
+ acceptor );
// Get the array of endpoints serviced by TAO_IIOP_Acceptor
const ACE_INET_Addr *endpoint_addr =
@@ -320,10 +322,10 @@ TAO_IIOP_Transport::get_listen_point (
== -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local ")
- ACE_TEXT ("host address in ")
- ACE_TEXT ("get_listen_point()\n")),
- -1);
+ ACE_TEXT ("(%P|%t) Could not resolve local ")
+ ACE_TEXT ("host address in ")
+ ACE_TEXT ("get_listen_point()\n")),
+ -1);
}
// Note: Looks like there is no point in sending the list of
@@ -333,13 +335,13 @@ TAO_IIOP_Transport::get_listen_point (
// Get the hostname for the local address
if (iiop_acceptor->hostname (this->orb_core_,
- local_addr,
- local_interface.out ()) == -1)
+ local_addr,
+ local_interface.out ()) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local host")
- ACE_TEXT (" name \n")),
- -1);
+ ACE_TEXT ("(%P|%t) Could not resolve local host")
+ ACE_TEXT (" name \n")),
+ -1);
}
for (size_t index = 0;
@@ -347,20 +349,20 @@ TAO_IIOP_Transport::get_listen_point (
index++)
{
if (local_addr.get_ip_address()
- == endpoint_addr[index].get_ip_address())
- {
- // Get the count of the number of elements
- CORBA::ULong len = listen_point_list.length ();
-
- // Increase the length by 1
- listen_point_list.length (len + 1);
-
- // We have the connection and the acceptor endpoint on the
- // same interface
- IIOP::ListenPoint &point = listen_point_list[len];
- point.host = CORBA::string_dup (local_interface.in ());
- point.port = endpoint_addr[index].get_port_number ();
- }
+ == endpoint_addr[index].get_ip_address())
+ {
+ // Get the count of the number of elements
+ CORBA::ULong len = listen_point_list.length ();
+
+ // Increase the length by 1
+ listen_point_list.length (len + 1);
+
+ // We have the connection and the acceptor endpoint on the
+ // same interface
+ IIOP::ListenPoint &point = listen_point_list[len];
+ point.host = CORBA::string_dup (local_interface.in ());
+ point.port = endpoint_addr[index].get_port_number ();
+ }
}
return 1;
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index afa3c958af2..f9cacfbd200 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "Invocation.h"
#include "Stub.h"
#include "Profile.h"
@@ -564,8 +565,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request
// preallocated reply dispatcher.
// Bind.
-
- TAO_Transport_Mux_Strategy *tms = this->transport_->tms ();
+ TAO_Transport_Mux_Strategy *tms =
+ this->transport_->tms ();
TAO_Bind_Dispatcher_Guard dispatch_guard (
this->op_details_.request_id (),
@@ -1103,4 +1104,3 @@ template class auto_ptr<CORBA::Exception>;
#pragma instantiate auto_ptr<CORBA::Exception>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp
index fb73ccba938..88d7207d0e7 100644
--- a/TAO/tao/Muxed_TMS.cpp
+++ b/TAO/tao/Muxed_TMS.cpp
@@ -30,7 +30,28 @@ TAO_Muxed_TMS::request_id (void)
// @@ What is a good error return value?
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon,
this->lock_, 0);
- return this->request_id_generator_++;
+
+ ++this->request_id_generator_;
+
+ // if TAO_Transport::bidirectional_flag_
+ // == 1 --> originating side
+ // == 0 --> other side
+ // == -1 --> no bi-directional connection was negotiated
+ // The originating side must have an even request ID, and the other
+ // side must have an odd request ID. Make sure that is the case.
+ int bidir_flag =
+ this->transport_->bidirectional_flag ();
+
+ if ((bidir_flag == 1 && ACE_ODD (this->request_id_generator_))
+ || (bidir_flag == 0 && ACE_EVEN (this->request_id_generator_)))
+ ++this->request_id_generator_;
+
+ if (TAO_debug_level > 4)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) TAO_Muxed_TMS::request_id - <%d>\n"),
+ this->request_id_generator_));
+
+ return this->request_id_generator_;
}
// Bind the dispatcher with the request id.
@@ -45,8 +66,8 @@ TAO_Muxed_TMS::bind_dispatcher (CORBA::ULong request_id,
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P | %t):TAO_Muxed_TMS::bind_dispatcher: ")
- ACE_TEXT ("bind dispatcher failed: result = %d\n"),
- result));
+ ACE_TEXT ("bind dispatcher failed: result = %d, request id = %d \n"),
+ result, request_id));
return -1;
}
diff --git a/TAO/tao/Muxed_TMS.h b/TAO/tao/Muxed_TMS.h
index 0824dc1d72f..124f5839ae2 100644
--- a/TAO/tao/Muxed_TMS.h
+++ b/TAO/tao/Muxed_TMS.h
@@ -83,19 +83,6 @@ protected:
/// Keep track of the orb core pointer. We need to this to create the
/// Reply Dispatchers.
TAO_ORB_Core *orb_core_;
-
- // @@ Commented for the time being, let the commented line stay for
- // sometime - Bala
- // TAO_GIOP_Message_State *message_state_;
- // Message state where the current input message is being read. This
- // is created at start of each incoming message. When that message
- // is read, the message is processed and for the next message a new
- // message state is created.
-
- // @@ Having members of type TAO_GIOP* indicates that we
- // (Transport_Mux_Strategy) are aware of the underlying messaging
- // protocol. But for the present let us close our eyes till we are
- // able to iterate on a use case - Bala.
};
#include "ace/post.h"
diff --git a/TAO/tao/Resource_Factory.cpp b/TAO/tao/Resource_Factory.cpp
index 92251adacbf..8711b484454 100644
--- a/TAO/tao/Resource_Factory.cpp
+++ b/TAO/tao/Resource_Factory.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "ace/Auto_Ptr.h"
#include "ace/Dynamic_Service.h"
@@ -165,6 +166,13 @@ TAO_Resource_Factory::purge_percentage (void) const
}
int
+TAO_Resource_Factory::max_muxed_connections (void) const
+{
+ return 0;
+}
+
+
+int
TAO_Resource_Factory::get_parser_names (char **&,
int &)
{
@@ -178,6 +186,13 @@ TAO_Resource_Factory::create_cached_connection_lock (void)
}
int
+TAO_Resource_Factory::locked_transport_cache (void)
+{
+ return 0;
+}
+
+
+int
TAO_Resource_Factory::load_default_protocols (void)
{
return 0;
diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h
index cf9c7560395..6a137f593d3 100644
--- a/TAO/tao/Resource_Factory.h
+++ b/TAO/tao/Resource_Factory.h
@@ -181,12 +181,22 @@ public:
/// cache.
virtual int purge_percentage (void) const;
+ /// Return the number of muxed connections that are allowed for a
+ /// remote endpoint
+ virtual int max_muxed_connections (void) const;
+
virtual int get_parser_names (char **&names,
int &number_of_names);
/// Creates the lock for the lock needed in the Cache Map
+ /// @@todo: This method needs to go away as it doesnt make much
+ /// sense now.
virtual ACE_Lock *create_cached_connection_lock (void);
+ /// Should the transport cache have a lock or not? Return 1 if the
+ /// transport cache needs to be locked else return 0
+ virtual int locked_transport_cache (void);
+
/// Creates the flushing strategy. The new instance is owned by the
/// caller.
virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0;
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index aae2a3a4adc..db34ab04558 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -27,7 +27,10 @@
# include "Transport.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(tao, Transport, "$Id$")
+ACE_RCSID (tao,
+ Transport,
+ "$Id$")
+
TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount)
: ACE_Refcountable (refcount)
@@ -994,16 +997,34 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
ACE_CDR::grow (&incoming,
payload);
- // .. do a read on the socket again.
- ssize_t n = this->recv (incoming.wr_ptr (),
- missing_data,
- max_wait_time);
+ ssize_t n = 0;
- if (TAO_debug_level > 6)
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (ssize_t bytes = missing_data;
+ bytes != 0;
+ bytes -= n)
{
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Read [%d] bytes on attempt \n",
- n));
+ // .. do a read on the socket again.
+ n = this->recv (incoming.wr_ptr (),
+ bytes,
+ max_wait_time);
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Read [%d] bytes on attempt \n",
+ n));
+ }
+
+ if (n == 0 ||
+ n == -1)
+ {
+ break;
+ }
+
+ incoming.wr_ptr (n);
+ missing_data -= n;
}
// If we got an error..
@@ -1019,16 +1040,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
return -1;
}
- // If we had gooten a EWOULDBLOCK n would be equal to zero. But we
+ // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
// have to put the message in the queue anyway. So let us proceed
// to do that and return...
- // Move the write pointer
- incoming.wr_ptr (n);
-
- // ..Decrement
- missing_data -= n;
-
// Check to see if we have messages in queue or if we have missing
// data . AT this point we cannot have have semi-complete messages
// in the queue as they would have been taken care before. Put
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index f1a4d525b72..614b83a7050 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -260,8 +260,8 @@ public:
CORBA::Octet minor) = 0;
/// Get/Set the bidirectional flag
- virtual int bidirectional_flag (void) const;
- virtual void bidirectional_flag (int flag);
+ int bidirectional_flag (void) const;
+ void bidirectional_flag (int flag);
/// Fill in a handle_set with any associated handler's reactor handle.
/**
diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp
index f5470abc336..9baebf6a41f 100644
--- a/TAO/tao/Transport_Cache_Manager.cpp
+++ b/TAO/tao/Transport_Cache_Manager.cpp
@@ -4,7 +4,8 @@
#include "tao/ORB_Core.h"
#include "tao/Resource_Factory.h"
#include "tao/Connection_Purging_Strategy.h"
-
+#include "tao/Condition.h"
+#include "ace/Synch_T.h"
#include "ace/Handle_Set.h"
#if !defined (__ACE_INLINE__)
@@ -16,23 +17,65 @@ ACE_RCSID (TAO,
Transport_Cache_Manager,
"$Id$")
+
TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core)
: percent_ (orb_core.resource_factory ()->purge_percentage ()),
purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()),
cache_map_ (ACE_static_cast (size_t, ACE::max_handles ())),
- cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ())
+ condition_ (0),
+ cache_lock_ (0),
+ muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()),
+ no_waiting_threads_ (0),
+ last_entry_returned_ (0)
{
+ if (orb_core.resource_factory ()->locked_transport_cache ())
+ {
+ ACE_NEW (this->condition_,
+ TAO_Condition <TAO_SYNCH_MUTEX>);
+
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ()));
+ }
+ else
+ {
+ /// If the cache is not going to be locked then dont create a
+ /// condition variable. Make the <muxed_number_> to 0, else a
+ /// single thread could get into waiting mode
+ this->muxed_number_ = 0;
+ ACE_NEW (this->cache_lock_,
+ ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>);
+ }
}
TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void)
{
+ // Wakeup all the waiting threads threads before we shutdown stuff
+ if (this->no_waiting_threads_)
+ this->condition_->broadcast ();
+
// Delete the lock that we have
- delete this->cache_lock_;
+ if (this->cache_lock_)
+ {
+ delete this->cache_lock_;
+ this->cache_lock_ = 0;
+ }
// Delete the purging strategy
- delete this->purging_strategy_;
+ if (this->purging_strategy_)
+ {
+ delete this->purging_strategy_;
+ this->purging_strategy_ = 0;
+ }
+
+ // Delete the condition variable
+ if (this->condition_)
+ {
+ delete this->condition_;
+ this->condition_ = 0;
+ }
}
+
int
TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
TAO_Cache_IntId &int_id)
@@ -63,8 +106,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
{
// The entry has been added to cache succesfully
// Add the cache_map_entry to the transport
- int_id.transport () ->cache_map_entry (entry);
-
+ int_id.transport ()->cache_map_entry (entry);
}
else if (retval == 1)
{
@@ -98,13 +140,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id,
return retval;
}
-// Used to be in the .inl, but moved here b/c the use of
-// TAO_Transport::_duplicate() caused an include file cycle with
-// inlining turned on.
int
TAO_Transport_Cache_Manager::find_transport (
- TAO_Transport_Descriptor_Interface *prop,
- TAO_Transport *&transport)
+ TAO_Transport_Descriptor_Interface *prop,
+ TAO_Transport *&transport)
{
if (prop == 0)
{
@@ -127,6 +166,30 @@ TAO_Transport_Cache_Manager::find_transport (
}
int
+TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key,
+ TAO_Cache_IntId &value)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->cache_lock_,
+ -1));
+
+ int status = this->find_i (key,
+ value);
+
+ if (status == 0)
+ {
+ // Update the purging strategy information while we
+ // are holding our lock
+ this->purging_strategy_->update_item (value.transport ());
+ }
+
+ return status;
+}
+
+
+
+int
TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value)
{
@@ -135,20 +198,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
// Get the entry from the Hash Map
int retval = 0;
- // Get the index of the <key>
- int index = key.index ();
-
// Make a temporary object. It does not do a copy.
TAO_Cache_ExtId tmp_key (key.property ());
while (retval == 0)
{
+ // Wait for a connection..
+ this->wait_for_connection (tmp_key);
+
// Look for an entry in the map
retval = this->cache_map_.find (tmp_key,
entry);
- // We have an entry in the map, check whether it is idle. If it
- // is idle it would be marked as busy.
+ // We have an entry in the map, check whether it is idle.
if (entry)
{
CORBA::Boolean idle =
@@ -174,7 +236,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
// Bump the index up
- tmp_key.index (++index);
+ tmp_key.incr_index ();
}
// If we are here then it is an error
@@ -189,32 +251,13 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key,
}
int
-TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value)
-{
- return this->cache_map_.rebind (key,
- value);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key)
-{
- return this->cache_map_.unbind (key);
-}
-
-int
-TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- return this->cache_map_.unbind (key,
- value);
-}
-
-int
TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
{
// First get the entry again (if at all things had changed in the
// cache map in the mean time)
+
+ // @todo: Is this required? Looks like a legacy one..
+
HASH_MAP_ENTRY *new_entry = 0;
int retval = this->cache_map_.find (entry->ext_id_,
new_entry);
@@ -225,6 +268,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE);
entry = new_entry;
+
+ // Does any one need waking?
+ if (this->no_waiting_threads_)
+ {
+ // We returned this entry to the map
+ this->last_entry_returned_ = &new_entry->ext_id_;
+
+ // Wake up a thread
+ this->condition_->signal ();
+ }
}
else if (TAO_debug_level > 0 && retval != 0)
{
@@ -250,23 +303,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered,
if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED)
{
-#if 0
- // @@ This code from Connection_Cache_Manager disappeared
- // during the changeover; we need the functional equivalent back.
- // The problem is that with the locking stuff that we're putting
- // in to the Transport, we might want to encapsulate the whole
- // exercise of adding to the handle set in a method on the transport
- // rather than doing it here. That way, the locking is correct.
- if ((*iter).int_id_.handler ()->is_registered ())
- {
- reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ());
- }
-#else
// Get the transport to fill its associated connection's handle in
// the handle sets.
(*iter).int_id_.transport ()->provide_handle (reactor_registered,
unregistered);
-#endif
// Inform the transport that has a reference to the entry in the
// map that we are *gone* now. So, the transport should not use
// the reference to the entry that he has, to acces us *at any
@@ -495,6 +535,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set,
sorted_set = 0;
}
+int
+TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid)
+{
+ if (this->muxed_number_ &&
+ this->muxed_number_ == extid.index ())
+ {
+ // If we have a limit on the number of muxed connections for
+ // a particular endpoint just wait to get the connection
+ ++this->no_waiting_threads_;
+
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Going to wait for connections.. \n"));
+
+ int ready_togo = 0;
+
+ while (ready_togo == 0)
+ {
+ this->condition_->wait ();
+
+ // Check whether we are waiting for this connection
+ ready_togo = this->is_wakeup_useful (extid);
+ }
+
+ // We are not waiting anymore
+ --this->no_waiting_threads_;
+ }
+
+ return 0;
+}
+
+int
+TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid)
+{
+ // Get the underlying property that we are looking for
+ TAO_Transport_Descriptor_Interface *prop = extid.property ();
+
+ // Just check the underlying property for equivalence. If the last
+ // connection that was returned had the same property just return
+ // 1.
+ if (this->last_entry_returned_ &&
+ prop->is_equivalent (this->last_entry_returned_->property ()))
+ {
+ // Set the index to be right so that we can pick teh connection
+ // right away..
+ extid.index (this->last_entry_returned_->index ());
+
+ // There is no more use for it ...
+ this->last_entry_returned_ = 0;
+
+ return 1;
+ }
+
+ // If there is an entry that was returned and if there are more
+ // threads just wake up the peer to check for the returned
+ // connection.
+ if (this->last_entry_returned_ &&
+ this->no_waiting_threads_ > 1)
+ {
+ this->condition_->signal ();
+ }
+
+ return 0;
+}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/Transport_Cache_Manager.h b/TAO/tao/Transport_Cache_Manager.h
index 0454ab4fe2b..5a87d7f7e72 100644
--- a/TAO/tao/Transport_Cache_Manager.h
+++ b/TAO/tao/Transport_Cache_Manager.h
@@ -15,19 +15,26 @@
#define TAO_CONNECTION_CACHE_MANAGER_H
#include "ace/pre.h"
-#include "ace/Hash_Map_Manager_T.h"
+#include "tao/Cache_Entries.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
#define ACE_LACKS_PRAGMA_ONCE
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "ace/Hash_Map_Manager_T.h"
+#include "ace/Synch_T.h"
+
+// #include "tao/TAO_Export.h"
+// #include "tao/Cache_Entries.h"
+// #include "tao/Connection_Purging_Strategy.h"
-#include "tao/TAO_Export.h"
-#include "tao/Cache_Entries.h"
-#include "tao/Connection_Purging_Strategy.h"
class TAO_ORB_Core;
class ACE_Handle_Set;
class TAO_Resource_Factory;
+class TAO_Connection_Purging_Strategy;
+
+template <class ACE_COND_MUTEX> class TAO_Condition;
typedef ACE_Unbounded_Set<ACE_Event_Handler*> TAO_EventHandlerSet;
typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*>
@@ -36,19 +43,16 @@ typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*>
/**
* @class TAO_Transport_Cache_Manager
*
- * @brief The Transport Cache Manager for TAO.
+ * @brief The Transport Cache Manager for TAO
*
- * This class provides interfaces associating a TAO_Cache_ExtId and
- * TAO_Cache_IntId. This class manages a ACE_Hash_Map_Manager class
- * which is used as a container to Cache the connections. This class
- * protects the entries with a lock. The map can be updated only by
- * holding the lock.
+ * This class provides interfaces associating a TAO_Cache_ExtId
+ * & TAO_Cache_IntId. This class is wrapper around the
+ * ACE_Hash_Map_Manager class which is used as a container to Cache
+ * the connections. This class protects the entries with a lock. The
+ * map is updated only by holding the lock. The more compelling reason
+ * to have the lock in this class and not in the Hash_Map is that, we
+ * do quite a bit of work in this class for which we need a lock.
*
- * @note This class at present has an interface that may not be
- * needed. But, the interface has just been copied from the ACE
- * Hash Map Manager classes. The interface wold be pruned once I
- * get the purging stuff also in. Till then let the interface be
- * there as it is.
*/
class TAO_Export TAO_Transport_Cache_Manager
{
@@ -66,6 +70,9 @@ public:
typedef ACE_Hash_Map_Entry <TAO_Cache_ExtId,
TAO_Cache_IntId> HASH_MAP_ENTRY;
+ typedef TAO_Condition<TAO_SYNCH_MUTEX> CONDITION;
+
+ // == Public methods
/// Constructor
TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core);
@@ -117,19 +124,6 @@ private:
int find (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value);
- /// Reassociate the <key> with <value>. Grabs the lock and calls the
- /// implementation function find_i.
- int rebind (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value);
-
- /// Remove <key> from the cache.
- int unbind (const TAO_Cache_ExtId &key);
-
- /// Remove <key> from the cache, and return the <value> associated with
- /// <key>.
- int unbind (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value);
-
/**
* Non-Locking version and actual implementation of bind ()
* call. Calls bind on the Hash_Map_Manager that it holds. If the
@@ -150,17 +144,6 @@ private:
int find_i (const TAO_Cache_ExtId &key,
TAO_Cache_IntId &value);
- /// Non-locking version and actual implementation of rebind () call
- int rebind_i (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value);
-
- /// Non-locking version and actual implementation of unbind () call
- int unbind_i (const TAO_Cache_ExtId &key);
-
- /// Non-locking version and actual implementation of unbind () call
- int unbind_i (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value);
-
/// Non-locking version and actual implementation of make_idle ().
int make_idle_i (HASH_MAP_ENTRY *&entry);
@@ -213,7 +196,17 @@ private:
/// the required number of items in the set.
void close_entries (DESCRIPTOR_SET& sorted_set, int size);
+ /// Wait for connections if we have reached the limit on the number
+ /// of muxed connections. If not (ie. if we dont use a muxed
+ /// connection or if we have not reached the limit) this just
+ /// behaves as a no-op. <extid> has all the information about the
+ /// connection that is being searched.
+ int wait_for_connection (TAO_Cache_ExtId &extid);
+
+ /// Is the wakeup useful todo some work?
+ int is_wakeup_useful (TAO_Cache_ExtId &extid);
private:
+
/// The percentage of the cache to purge at one time
int percent_;
@@ -223,8 +216,23 @@ private:
/// The hash map that has the connections
HASH_MAP cache_map_;
- /// Lock for the map
+ /// The condition variable
+ CONDITION *condition_;
+
+ /// The lock that is used by the cache map
ACE_Lock *cache_lock_;
+
+ /// Number of allowed muxed connections
+ CORBA::ULong muxed_number_;
+
+ /// Number of threads waiting for connections
+ int no_waiting_threads_;
+
+ /// This is for optimization purposes. In a situation where number
+ /// of threads are waiting for connections, the last connection that
+ /// is put back is cached here. This should prevent all th threads
+ /// trying to search for their required entry.
+ TAO_Cache_ExtId *last_entry_returned_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Transport_Cache_Manager.inl b/TAO/tao/Transport_Cache_Manager.inl
index 0840e7961c9..601864ade4f 100644
--- a/TAO/tao/Transport_Cache_Manager.inl
+++ b/TAO/tao/Transport_Cache_Manager.inl
@@ -16,28 +16,6 @@ TAO_Transport_Cache_Manager::bind (TAO_Cache_ExtId &ext_id,
ACE_INLINE int
-TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- int status = this->find_i (key,
- value);
-
- if (status == 0)
- {
- // Update the purging strategy information while we
- // are holding our lock
- this->purging_strategy_->update_item (value.transport ());
- }
- return status;
-}
-
-
-ACE_INLINE int
TAO_Transport_Cache_Manager::cache_transport (
TAO_Transport_Descriptor_Interface *prop,
TAO_Transport *transport)
@@ -52,44 +30,6 @@ TAO_Transport_Cache_Manager::cache_transport (
}
ACE_INLINE int
-TAO_Transport_Cache_Manager::rebind (const TAO_Cache_ExtId &key,
- const TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->rebind_i (key,
- value);
-}
-
-ACE_INLINE int
-TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->unbind_i (key);
-}
-
-ACE_INLINE int
-TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key,
- TAO_Cache_IntId &value)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->cache_lock_,
- -1));
-
- return this->unbind_i (key,
- value);
-}
-
-
-ACE_INLINE int
TAO_Transport_Cache_Manager::purge (void)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0));
diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp
index 7bc5ebe37a6..e280ccd5040 100644
--- a/TAO/tao/default_resource.cpp
+++ b/TAO/tao/default_resource.cpp
@@ -40,6 +40,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void)
connection_caching_type_ (TAO_CONNECTION_CACHING_STRATEGY),
cache_maximum_ (TAO_CONNECTION_CACHE_MAXIMUM),
purge_percentage_ (TAO_PURGE_PERCENT),
+ max_muxed_connections_ (0),
reactor_mask_signals_ (1),
dynamically_allocated_reactor_ (0),
options_processed_ (0),
@@ -350,7 +351,20 @@ TAO_Default_Resource_Factory::init (int argc, ACE_TCHAR *argv[])
this->report_option_value_error (ACE_LIB_TEXT("-ORBFlushingStrategy"), name);
}
}
- else if (ACE_OS::strncmp (argv[curarg], ACE_LIB_TEXT("-ORB"), 4) == 0)
+ else if (ACE_OS::strcasecmp (argv[curarg],
+ ACE_LIB_TEXT ("-ORBMuxedConnectionMax")) == 0)
+ {
+ curarg++;
+ if (curarg < argc)
+ this->max_muxed_connections_ =
+ ACE_OS::atoi (argv[curarg]);
+ else
+ this->report_option_value_error ("-ORBMuxedConnectionMax",
+ argv[curarg]);
+ }
+ else if (ACE_OS::strncmp (argv[curarg],
+ ACE_LIB_TEXT ("-ORB"),
+ 4) == 0)
{
// Can we assume there is an argument after the option?
// curarg++;
@@ -552,7 +566,7 @@ TAO_Default_Resource_Factory::init_protocol_factories (void)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT ("TAO (%P|%t) Unable to load ")
ACE_LIB_TEXT ("protocol <%s>, %p\n"),
- ACE_TEXT_CHAR_TO_TCHAR(name.c_str ()),
+ ACE_TEXT_CHAR_TO_TCHAR(name.c_str ()),
ACE_LIB_TEXT ("")),
-1);
}
@@ -739,6 +753,13 @@ TAO_Default_Resource_Factory::purge_percentage (void) const
return this->purge_percentage_;
}
+int
+TAO_Default_Resource_Factory::max_muxed_connections (void) const
+{
+ return this->max_muxed_connections_;
+}
+
+
ACE_Lock *
TAO_Default_Resource_Factory::create_cached_connection_lock (void)
{
@@ -756,6 +777,16 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void)
return the_lock;
}
+int
+TAO_Default_Resource_Factory::locked_transport_cache (void)
+{
+ if (this->cached_connection_lock_type_ == TAO_NULL_LOCK)
+ return 0;
+
+ return 1;
+}
+
+
TAO_Flushing_Strategy *
TAO_Default_Resource_Factory::create_flushing_strategy (void)
{
diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h
index 76b5ab4a026..c0c11bba5a4 100644
--- a/TAO/tao/default_resource.h
+++ b/TAO/tao/default_resource.h
@@ -116,7 +116,9 @@ public:
virtual TAO_Resource_Factory::Caching_Strategy connection_caching_strategy_type (void) const;
virtual int cache_maximum (void) const;
virtual int purge_percentage (void) const;
+ virtual int max_muxed_connections (void) const;
virtual ACE_Lock *create_cached_connection_lock (void);
+ virtual int locked_transport_cache (void);
virtual TAO_Flushing_Strategy *create_flushing_strategy (void);
virtual TAO_Connection_Purging_Strategy *create_purging_strategy (void);
virtual TAO_LF_Strategy *create_lf_strategy (void);
@@ -169,6 +171,11 @@ protected:
/// demand.
int purge_percentage_;
+ /// Specifies the limit on the number of muxed connections
+ /// allowed per-property for the ORB. A value of 0 indicates no
+ /// limit
+ size_t max_muxed_connections_;
+
/// If <0> then we create reactors with signal handling disabled.
int reactor_mask_signals_;