diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-04-04 04:26:10 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-04-04 04:26:10 +0000 |
commit | 725e73703a67326ed06452ca975b5378ad84c03b (patch) | |
tree | 866f51c285d8f0d3c1cd2a53a0194d493825e847 | |
parent | ddf00484367e672294353272d1c670964963ba00 (diff) | |
download | ATCD-725e73703a67326ed06452ca975b5378ad84c03b.tar.gz |
ChangeLogTag: Wed Apr 3 22:18:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 107 | ||||
-rw-r--r-- | TAO/tao/Cache_Entries.cpp | 15 | ||||
-rw-r--r-- | TAO/tao/Cache_Entries.h | 6 | ||||
-rw-r--r-- | TAO/tao/Cache_Entries.inl | 6 | ||||
-rw-r--r-- | TAO/tao/ChangeLog | 96 | ||||
-rw-r--r-- | TAO/tao/Condition.cpp | 56 | ||||
-rw-r--r-- | TAO/tao/Condition.h | 115 | ||||
-rw-r--r-- | TAO/tao/Condition.inl | 50 | ||||
-rw-r--r-- | TAO/tao/Exclusive_TMS.cpp | 18 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connection_Handler.cpp | 39 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 158 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Muxed_TMS.cpp | 27 | ||||
-rw-r--r-- | TAO/tao/Muxed_TMS.h | 13 | ||||
-rw-r--r-- | TAO/tao/Resource_Factory.cpp | 15 | ||||
-rw-r--r-- | TAO/tao/Resource_Factory.h | 10 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 47 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 4 | ||||
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.cpp | 208 | ||||
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.h | 88 | ||||
-rw-r--r-- | TAO/tao/Transport_Cache_Manager.inl | 60 | ||||
-rw-r--r-- | TAO/tao/default_resource.cpp | 35 | ||||
-rw-r--r-- | TAO/tao/default_resource.h | 7 |
23 files changed, 896 insertions, 290 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 8cf5f0d3f21..c3fda4b0d8d 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,110 @@ +Wed Apr 3 22:18:38 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + Changes to wide variety of stuff from my branch. The checkin + contains the following: + + 1. Couple of bug fixes (#1129 and #1164) + 2. A new policy to regulate the number of connections allowed by + the ORB for every QoS property. + + The details of the checkin follows + + Sun Mar 24 08:57:37 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.cpp: Fix for bug 1164. When big messages are + being read, read it in a loop till you get all the bytes or till + you get an error. We used to do only one read before and looks + like this had performance impacts. Thanks to James Kanyok + <james.kanyok@lmco.com> for reporting the problem. This will go + into the main trunk once we get a feedback from James. + + Tue Feb 19 07:45:05 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/IIOP_Connection_Handler.cpp: Added a fix for Jody ie. bug + #1129. Yet, to get a reply whether the fix is right or + wrong. Now, we close the socket with the first call to + handle_close () and delete it when the upcall count gets to + zero. + + Mon Feb 11 14:03:18 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.h: Removed the virtual declaration from the method + bidirectional_flag (). It shouldnt have been virtual to start + with. + + * tao/Muxed_TMS.h: Removed some old comments + + * tao/Muxed_TMS.cpp (request_id): + * tao/Exclusive_TMS.cpp: Added a condition within request_id + (). The request_id that is generated will obey BiDirGIOP rules, + ie. the originator will send even number requests and the + receiver send odd numbered requests. The old method of + generation was giving problems in MT cases. + + * tao/Invocation.cpp: Minor cosmetic changes. + * tao/IIOP_Transport.cpp: Modify the request id only for the first + request during BiDir connection origination. + + + Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Cache_Entries.cpp: Did a cosmetic fix. + + * tao/Cache_Entries.h: + * tao/Cache_Entries.inl: Added a new method incr_index () to the + TAO_CacheExtId class. + + * tao/Resource_Factory.cpp: + * tao/Resource_Factory.h:Added two new methods, + max_muxed_connections () and locked_transport_cache (). The + former returns the number of user specified muxed connections + with a particular property. The latter returns a boolean value + to indicate whether the transport cache needs to have a lock or + not . + + * tao/default_resource.cpp: + * tao/default_resource.h: Concrete implementations for the methods + declared in Resource_Factory. + + * tao/Transport_Cache_Manager.cpp: + * tao/Transport_Cache_Manager.h: + * tao/Transport_Cache_Manager.inl: Added support for counted muxed + connections ie. the user can limit the number of remote + connections (with a particular property). If the thread + searching the cache for a free connection doesnt find one that + is free, the thread will wait on a condition variable for the + connection to be released. To accomodate this the following set + of changes were made + + - Added a condition variable to the class + + - Create the type of lock that needs to be used for the cache + locally and use that lock to create the condition variable. If + the cache is free of any locks we dont create a condition + variable. + + - The creation of lock has been moved to the + Transport_Cache_Manager from the Resource_Factory. + + - Added two new methods, wait_for_connection () and + is_wakeup_useful (). The wait_for_connection () blocks the + thread searching for connection, if there is a limit on the + number of muxed connections and if there have been enough + connections created with the same property. The + is_wakeup_useful () method is called by an unblocked thread to + check whether the transport that was returned is the one the + thread was waiting for. + + - Removed a version of rebind () and a couple of version of + unbind () calls as they were not used. + + + * tao/Condition.h: + * tao/Condition.cpp:. + * tao/Condition.inl: A simple wrapper that wraps a + TAO_SYNCH_CONDITION that can be used with different types of + Mutex classes. + Tue Apr 3 12:44:51 UTC 2002 Don Hinton <dhinton@ieee.org> * tao/docs/pluggable_protocols/index.html: diff --git a/TAO/tao/Cache_Entries.cpp b/TAO/tao/Cache_Entries.cpp index 9c7e17f0273..df610a378f6 100644 --- a/TAO/tao/Cache_Entries.cpp +++ b/TAO/tao/Cache_Entries.cpp @@ -24,13 +24,14 @@ TAO_Cache_IntId::~TAO_Cache_IntId (void) TAO_Cache_IntId& TAO_Cache_IntId::operator= (const TAO_Cache_IntId &rhs) { - if (this != &rhs) { - this->recycle_state_ = rhs.recycle_state_; + if (this != &rhs) + { + this->recycle_state_ = rhs.recycle_state_; + + TAO_Transport* old_transport = this->transport_; + this->transport_ = TAO_Transport::_duplicate (rhs.transport_); + TAO_Transport::release (old_transport); + } - TAO_Transport* old_transport = this->transport_; - this->transport_ = TAO_Transport::_duplicate (rhs.transport_); - TAO_Transport::release (old_transport); - } return *this; } - diff --git a/TAO/tao/Cache_Entries.h b/TAO/tao/Cache_Entries.h index 0907db2f944..4a616d7b664 100644 --- a/TAO/tao/Cache_Entries.h +++ b/TAO/tao/Cache_Entries.h @@ -6,6 +6,7 @@ * * $Id$ * + * * @author Bala Natarajan <bala@cs.wustl.edu> */ //============================================================================= @@ -106,7 +107,7 @@ private: * <value> for a <key> in a hash table holding the state of the * Transport Cache. */ -class TAO_Cache_ExtId +class TAO_Export TAO_Cache_ExtId { public: @@ -147,6 +148,9 @@ public: /// but for the TAO_Transport_Cache_Manager class. void index (CORBA::ULong index); + /// Increment the index value + void incr_index (void); + // = Accessors /// Get the underlying the property pointer TAO_Transport_Descriptor_Interface *property (void) const; diff --git a/TAO/tao/Cache_Entries.inl b/TAO/tao/Cache_Entries.inl index 1e6bbc118fb..ec19b1ed1e1 100644 --- a/TAO/tao/Cache_Entries.inl +++ b/TAO/tao/Cache_Entries.inl @@ -180,6 +180,12 @@ TAO_Cache_ExtId::index (CORBA::ULong index) this->index_ = index; } +ACE_INLINE void +TAO_Cache_ExtId::incr_index (void) +{ + ++this->index_; +} + ACE_INLINE TAO_Transport_Descriptor_Interface * TAO_Cache_ExtId::property (void) const { diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog new file mode 100644 index 00000000000..e581b088baa --- /dev/null +++ b/TAO/tao/ChangeLog @@ -0,0 +1,96 @@ +Sun Mar 24 08:57:37 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.cpp: Fix for bug 1164. When big messages are + being read, read it in a loop till you get all the bytes or till + you get an error. We used to do only one read before and looks + like this had performance impacts. Thanks to James Kanyok + <james.kanyok@lmco.com> for reporting the problem. This will go + into the main trunk once we get a feedback from James. + +Tue Feb 19 07:45:05 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/IIOP_Connection_Handler.cpp: Added a fix for Jody ie. bug + #1129. Yet, to get a reply whether the fix is right or + wrong. Now, we close the socket with the first call to + handle_close () and delete it when the upcall count gets to + zero. + +Mon Feb 11 14:03:18 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.h: Removed the virtual declaration from the method + bidirectional_flag (). It shouldnt have been virtual to start + with. + + * tao/Muxed_TMS.h: Removed some old comments + + * tao/Muxed_TMS.cpp (request_id): + * tao/Exclusive_TMS.cpp: Added a condition within request_id + (). The request_id that is generated will obey BiDirGIOP rules, + ie. the originator will send even number requests and the + receiver send odd numbered requests. The old method of + generation was giving problems in MT cases. + + * tao/Invocation.cpp: Minor cosmetic changes. + * tao/IIOP_Transport.cpp: Modify the request id only for the first + request during BiDir connection origination. + + +Thu Jan 31 11:23:59 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Cache_Entries.cpp: Did a cosmetic fix. + + * tao/Cache_Entries.h: + * tao/Cache_Entries.inl: Added a new method incr_index () to the + TAO_CacheExtId class. + + * tao/Resource_Factory.cpp: + * tao/Resource_Factory.h:Added two new methods, + max_muxed_connections () and locked_transport_cache (). The + former returns the number of user specified muxed connections + with a particular property. The latter returns a boolean value + to indicate whether the transport cache needs to have a lock or + not . + + * tao/default_resource.cpp: + * tao/default_resource.h: Concrete implementations for the methods + declared in Resource_Factory. + + * tao/Transport_Cache_Manager.cpp: + * tao/Transport_Cache_Manager.h: + * tao/Transport_Cache_Manager.inl: Added support for counted muxed + connections ie. the user can limit the number of remote + connections (with a particular property). If the thread + searching the cache for a free connection doesnt find one that + is free, the thread will wait on a condition variable for the + connection to be released. To accomodate this the following set + of changes were made + + - Added a condition variable to the class + + - Create the type of lock that needs to be used for the cache + locally and use that lock to create the condition variable. If + the cache is free of any locks we dont create a condition + variable. + + - The creation of lock has been moved to the + Transport_Cache_Manager from the Resource_Factory. + + - Added two new methods, wait_for_connection () and + is_wakeup_useful (). The wait_for_connection () blocks the + thread searching for connection, if there is a limit on the + number of muxed connections and if there have been enough + connections created with the same property. The + is_wakeup_useful () method is called by an unblocked thread to + check whether the transport that was returned is the one the + thread was waiting for. + + - Removed a version of rebind () and a couple of version of + unbind () calls as they were not used. + + + * tao/Condition.h: + * tao/Condition.cpp:. + * tao/Condition.inl: A simple wrapper that wraps a + TAO_SYNCH_CONDITION that can be used with different types of + Mutex classes. + diff --git a/TAO/tao/Condition.cpp b/TAO/tao/Condition.cpp new file mode 100644 index 00000000000..d754007a77d --- /dev/null +++ b/TAO/tao/Condition.cpp @@ -0,0 +1,56 @@ +#include "Condition.h" + + +#if !defined (__ACE_INLINE__) +# include "tao/Condition.inl" +#endif /* __ACE_INLINE__ */ + + +ACE_RCSID (TAO, + Condition, + "$Id$") + +template <class MUTEX> +TAO_Condition<MUTEX>::TAO_Condition (MUTEX &m) + + : mutex_ (&m), + delete_lock_ (0), + cond_ (0) +{ + // @@todo: Need to add the allocatore here.. + ACE_NEW (this->cond_, + TAO_SYNCH_CONDITION (*this->mutex_)); +} + +template <class MUTEX> +TAO_Condition<MUTEX>::TAO_Condition (void) + : mutex_ (0), + delete_lock_ (0), + cond_ (0) + +{ + // @@todo: Need to add the allocatore here.. + + ACE_NEW (this->mutex_, + MUTEX); + + this->delete_lock_ = 1; + + ACE_NEW (this->cond_, + TAO_SYNCH_CONDITION (*this->mutex_)); +} + + +template <class MUTEX> +TAO_Condition<MUTEX>::~TAO_Condition (void) +{ + if (this->remove () == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("TAO_Condition::~TAO_Condition"))); + + delete this->cond_; + + if (this->delete_lock_) + delete this->mutex_; +} diff --git a/TAO/tao/Condition.h b/TAO/tao/Condition.h new file mode 100644 index 00000000000..b9b85a4c817 --- /dev/null +++ b/TAO/tao/Condition.h @@ -0,0 +1,115 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file Condition.h + * + * $Id$ + * + * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> + */ +//============================================================================= + +#ifndef TAO_CONDITION_H +#define TAO_CONDITION_H +#include "ace/pre.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + + +/** + * @class TAO_Condition + * + * @brief Same as to the ACE_Condition variable wrapper + * + * This class differs from ACE_Condition in that it uses a + * TAO_SYNCH_CONDITION instead of ACE_cond_t under the hood to + * provide blocking. + */ +template <class MUTEX> +class TAO_Condition +{ +public: + + /// Useful typedef + typedef MUTEX LOCK; + + // = Initialiation and termination methods. + /// Initialize the condition variable. + TAO_Condition (MUTEX &m); + + /// A default constructor. Since no lock is provided by the user, + /// one will be created internally. + TAO_Condition (void); + + /// Implicitly destroy the condition variable. + ~TAO_Condition (void); + + // = Lock accessors. + /** + * Block on condition, or until absolute time-of-day has passed. If + * abstime == 0 use "blocking" <wait> semantics. Else, if <abstime> + * != 0 and the call times out before the condition is signaled + * <wait> returns -1 and sets errno to ETIME. + */ + int wait (const ACE_Time_Value *abstime); + + /// Block on condition. + int wait (void); + + /** + * Block on condition or until absolute time-of-day has passed. If + * abstime == 0 use "blocking" wait() semantics on the <mutex> + * passed as a parameter (this is useful if you need to store the + * <Condition> in shared memory). Else, if <abstime> != 0 and the + * call times out before the condition is signaled <wait> returns -1 + * and sets errno to ETIME. + */ + int wait (MUTEX &mutex, const ACE_Time_Value *abstime = 0); + + /// Signal one waiting thread. + int signal (void); + + /// Signal *all* waiting threads. + int broadcast (void); + + // = Utility methods. + /// Explicitly destroy the condition variable. + int remove (void); + + /// Returns a reference to the underlying mutex_; + MUTEX *mutex (void); + +private: + + /// Reference to mutex lock. + MUTEX *mutex_; + + /// A flag to indicate whether the lock needs to be deleted. + int delete_lock_; + + /// Condition variable. + TAO_SYNCH_CONDITION *cond_; + + // = Prevent assignment and initialization. + ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Condition<MUTEX> &)) + ACE_UNIMPLEMENTED_FUNC (TAO_Condition (const TAO_Condition<MUTEX> &)) +}; + +#if defined (__ACE_INLINE__) +#include "Condition.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Condition.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Condition.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /*TAO_CONDITION_H*/ diff --git a/TAO/tao/Condition.inl b/TAO/tao/Condition.inl new file mode 100644 index 00000000000..089bc0d54be --- /dev/null +++ b/TAO/tao/Condition.inl @@ -0,0 +1,50 @@ +/* -*- C++ -*- */ +//$Id$ +template <class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::wait (void) +{ + return this->cond_->wait (); +} + +template <class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::wait (MUTEX &mutex, + const ACE_Time_Value *abstime) +{ + return this->cond_->wait (mutex, + abstime); +} + +// Peform an "alertable" timed wait. If the argument ABSTIME == 0 +// then we do a regular cond_wait(), else we do a timed wait for up to +// ABSTIME using the Solaris cond_timedwait() function. + +template <class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::wait (const ACE_Time_Value *abstime) +{ + return this->wait (this->mutex_, abstime); +} + +template<class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::remove (void) +{ + this->cond_->remove (); +} + +template<class MUTEX> ACE_INLINE MUTEX * +TAO_Condition<MUTEX>::mutex (void) +{ + // ACE_TRACE ("ACE_Condition<MUTEX>::mutex"); + return this->mutex_; +} + +template <class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::signal (void) +{ + return this->cond_->signal (); +} + +template <class MUTEX> ACE_INLINE int +TAO_Condition<MUTEX>::broadcast (void) +{ + return this->cond_->broadcast (); +} diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp index 95cd9d1bd09..0f17f8eddca 100644 --- a/TAO/tao/Exclusive_TMS.cpp +++ b/TAO/tao/Exclusive_TMS.cpp @@ -27,11 +27,27 @@ TAO_Exclusive_TMS::~TAO_Exclusive_TMS (void) CORBA::ULong TAO_Exclusive_TMS::request_id (void) { + this->request_id_generator_++; + + // if TAO_Transport::bidirectional_flag_ + // == 1 --> originating side + // == 0 --> other side + // == -1 --> no bi-directional connection was negotiated + // The originating side must have an even request ID, and the other + // side must have an odd request ID. Make sure that is the case. + int bidir_flag = + this->transport_->bidirectional_flag (); + + if ((bidir_flag == 1 && ACE_ODD (this->request_id_generator_)) + || (bidir_flag == 0 && ACE_EVEN (this->request_id_generator_))) + ++this->request_id_generator_; + if (TAO_debug_level > 4) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Exclusive_TMS::request_id - <%d>\n"), this->request_id_generator_)); - return this->request_id_generator_++; + + return this->request_id_generator_; } // Bind the handler with the request id. diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 50db44db5fb..5821c6ab8b4 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -22,6 +22,7 @@ ACE_RCSID (tao, IIOP_Connection_Handler, "$Id$") + TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (ACE_Thread_Manager *t) : TAO_IIOP_SVC_HANDLER (t, 0 , 0), TAO_Connection_Handler (0), @@ -206,7 +207,19 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle, long upcalls = this->decr_pending_upcalls (); - ACE_ASSERT (upcalls >= 0); + // Just return incase the upcall count goes below 0. + if (upcalls < 0) + return 0; + + if (this->get_handle () != ACE_INVALID_HANDLE) + { + // Just close the socket irrespective of what the upcall count + // is. + this->peer().close (); + + // Set the handle to be INVALID_HANDLE + this->set_handle (ACE_INVALID_HANDLE); + } // If the upcall count is zero start the cleanup. if (upcalls == 0) @@ -238,16 +251,13 @@ TAO_IIOP_Connection_Handler::handle_close_i (void) } // Close the handle.. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Remove the entry as it is invalid - this->transport ()->purge_entry (); + // Remove the entry as it is invalid + this->transport ()->purge_entry (); - // Signal the transport that we will no longer have - // a reference to it. This will eventually call - // TAO_Transport::release (). - this->transport (0); - } + // Signal the transport that we will no longer have + // a reference to it. This will eventually call + // TAO_Transport::release (). + this->transport (0); // Follow usual Reactor-style lifecycle semantics and commit // suicide. @@ -353,8 +363,6 @@ TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE) // The upcall is done. Bump down the reference count upcalls = this->decr_pending_upcalls (); - ACE_ASSERT (upcalls >= 0); - if (upcalls == 0) { this->handle_close_i (); @@ -364,6 +372,13 @@ TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE) // handle_close () which could be harmful. retval = 0; } + else if (upcalls < 0) + { + // As we have already performed the handle closing we dont want + // to return a -1. Doing so would make the reactor call + // handle_close () which could be harmful. + retval = 0; + } if (retval == -1) diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index c4194fb4010..815d41e49b8 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -25,11 +25,12 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$") + TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, - TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + TAO_ORB_Core *orb_core, + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_IIOP_PROFILE, - orb_core) + orb_core) , connection_handler_ (handler) , messaging_object_ (0) { @@ -37,13 +38,13 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, { // Use the lite version of the protocol ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_Lite (orb_core)); + TAO_GIOP_Message_Lite (orb_core)); } else { // Use the normal GIOP object ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_Base (orb_core)); + TAO_GIOP_Message_Base (orb_core)); } } @@ -66,11 +67,11 @@ TAO_IIOP_Transport::messaging_object (void) ssize_t TAO_IIOP_Transport::send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *max_wait_time) + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) { ssize_t retval = this->connection_handler_->peer ().sendv (iov, iovcnt, - max_wait_time); + max_wait_time); if (retval > 0) bytes_transferred = retval; @@ -79,12 +80,12 @@ TAO_IIOP_Transport::send_i (iovec *iov, int iovcnt, ssize_t TAO_IIOP_Transport::recv_i (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) + size_t len, + const ACE_Time_Value *max_wait_time) { ssize_t n = this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); + len, + max_wait_time); // Do not print the error message if it is a timeout, which could // occur in thread-per-connection. @@ -93,16 +94,16 @@ TAO_IIOP_Transport::recv_i (char *buf, errno != ETIME) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p \n"), - ACE_TEXT ("TAO - read message failure ") - ACE_TEXT ("recv_i () \n"))); + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); } // Error handling if (n == -1) { if (errno == EWOULDBLOCK) - return 0; + return 0; return -1; @@ -126,8 +127,8 @@ TAO_IIOP_Transport::register_handler_i (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Transport::register_handler %d\n", - this->id ())); + "TAO (%P|%t) - IIOP_Transport::register_handler %d\n", + this->id ())); } ACE_Reactor *r = this->orb_core_->reactor (); @@ -142,25 +143,25 @@ TAO_IIOP_Transport::register_handler_i (void) // Register the handler with the reactor return r->register_handler (this->connection_handler_, - ACE_Event_Handler::READ_MASK); + ACE_Event_Handler::READ_MASK); } int TAO_IIOP_Transport::send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int two_way, - ACE_Time_Value *max_wait_time) + TAO_ORB_Core *orb_core, + TAO_OutputCDR &stream, + int two_way, + ACE_Time_Value *max_wait_time) { if (this->ws_->sending_request (orb_core, - two_way) == -1) + two_way) == -1) return -1; if (this->send_message (stream, - stub, - two_way, - max_wait_time) == -1) + stub, + two_way, + max_wait_time) == -1) return -1; @@ -169,9 +170,9 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub, int TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, - TAO_Stub *stub, - int twoway, - ACE_Time_Value *max_wait_time) + TAO_Stub *stub, + int twoway, + ACE_Time_Value *max_wait_time) { // Format the message in the stream first if (this->messaging_object_->format_message (stream) != 0) @@ -179,17 +180,17 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_i (stub, - twoway, - stream.begin (), - max_wait_time); + twoway, + stream.begin (), + max_wait_time); if (n == -1) { if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"), - this->id (), - ACE_TEXT ("send_message ()\n"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"), + this->id (), + ACE_TEXT ("send_message ()\n"))); return -1; } @@ -199,8 +200,8 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, int TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg) + TAO_Target_Specification &spec, + TAO_OutputCDR &msg) { // Check whether we have a Bi Dir IIOP policy set, whether the // messaging objects are ready to handle bidirectional connections @@ -214,24 +215,25 @@ TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails, // Set the flag to 0 (i.e., originating side) this->bidirectional_flag (0); - } - // Modify the request id if we have BiDirectional client/server - // setup - opdetails.modify_request_id (this->bidirectional_flag ()); + // Modify the request id if we have BiDirectional client/server + // setup. + // @@ Is this needed at all? + opdetails.modify_request_id (this->bidirectional_flag ()); + } return TAO_Transport::generate_request_header (opdetails, - spec, - msg); + spec, + msg); } int TAO_IIOP_Transport::messaging_init (CORBA::Octet major, - CORBA::Octet minor) + CORBA::Octet minor) { this->messaging_object_->init (major, - minor); + minor); return 1; } @@ -272,12 +274,12 @@ TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) { // Check whether it is a IIOP acceptor if ((*acceptor)->tag () == TAO_TAG_IIOP_PROFILE) - { - // @@ Why isn't the return value checked! - // -Ossama - this->get_listen_point (listen_point_list, - *acceptor); - } + { + // @@ Why isn't the return value checked! + // -Ossama + this->get_listen_point (listen_point_list, + *acceptor); + } } // We have the ListenPointList at this point. Create a output CDR @@ -291,7 +293,7 @@ TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) // Add this info in to the svc_list opdetails.request_service_context ().set_context (IOP::BI_DIR_IIOP, - cdr); + cdr); return; } @@ -303,7 +305,7 @@ TAO_IIOP_Transport::get_listen_point ( { TAO_IIOP_Acceptor *iiop_acceptor = ACE_dynamic_cast (TAO_IIOP_Acceptor *, - acceptor ); + acceptor ); // Get the array of endpoints serviced by TAO_IIOP_Acceptor const ACE_INET_Addr *endpoint_addr = @@ -320,10 +322,10 @@ TAO_IIOP_Transport::get_listen_point ( == -1) { ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local ") - ACE_TEXT ("host address in ") - ACE_TEXT ("get_listen_point()\n")), - -1); + ACE_TEXT ("(%P|%t) Could not resolve local ") + ACE_TEXT ("host address in ") + ACE_TEXT ("get_listen_point()\n")), + -1); } // Note: Looks like there is no point in sending the list of @@ -333,13 +335,13 @@ TAO_IIOP_Transport::get_listen_point ( // Get the hostname for the local address if (iiop_acceptor->hostname (this->orb_core_, - local_addr, - local_interface.out ()) == -1) + local_addr, + local_interface.out ()) == -1) { ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local host") - ACE_TEXT (" name \n")), - -1); + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" name \n")), + -1); } for (size_t index = 0; @@ -347,20 +349,20 @@ TAO_IIOP_Transport::get_listen_point ( index++) { if (local_addr.get_ip_address() - == endpoint_addr[index].get_ip_address()) - { - // Get the count of the number of elements - CORBA::ULong len = listen_point_list.length (); - - // Increase the length by 1 - listen_point_list.length (len + 1); - - // We have the connection and the acceptor endpoint on the - // same interface - IIOP::ListenPoint &point = listen_point_list[len]; - point.host = CORBA::string_dup (local_interface.in ()); - point.port = endpoint_addr[index].get_port_number (); - } + == endpoint_addr[index].get_ip_address()) + { + // Get the count of the number of elements + CORBA::ULong len = listen_point_list.length (); + + // Increase the length by 1 + listen_point_list.length (len + 1); + + // We have the connection and the acceptor endpoint on the + // same interface + IIOP::ListenPoint &point = listen_point_list[len]; + point.host = CORBA::string_dup (local_interface.in ()); + point.port = endpoint_addr[index].get_port_number (); + } } return 1; diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index afa3c958af2..f9cacfbd200 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "Invocation.h" #include "Stub.h" #include "Profile.h" @@ -564,8 +565,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request // preallocated reply dispatcher. // Bind. - - TAO_Transport_Mux_Strategy *tms = this->transport_->tms (); + TAO_Transport_Mux_Strategy *tms = + this->transport_->tms (); TAO_Bind_Dispatcher_Guard dispatch_guard ( this->op_details_.request_id (), @@ -1103,4 +1104,3 @@ template class auto_ptr<CORBA::Exception>; #pragma instantiate auto_ptr<CORBA::Exception> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp index fb73ccba938..88d7207d0e7 100644 --- a/TAO/tao/Muxed_TMS.cpp +++ b/TAO/tao/Muxed_TMS.cpp @@ -30,7 +30,28 @@ TAO_Muxed_TMS::request_id (void) // @@ What is a good error return value? ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0); - return this->request_id_generator_++; + + ++this->request_id_generator_; + + // if TAO_Transport::bidirectional_flag_ + // == 1 --> originating side + // == 0 --> other side + // == -1 --> no bi-directional connection was negotiated + // The originating side must have an even request ID, and the other + // side must have an odd request ID. Make sure that is the case. + int bidir_flag = + this->transport_->bidirectional_flag (); + + if ((bidir_flag == 1 && ACE_ODD (this->request_id_generator_)) + || (bidir_flag == 0 && ACE_EVEN (this->request_id_generator_))) + ++this->request_id_generator_; + + if (TAO_debug_level > 4) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) TAO_Muxed_TMS::request_id - <%d>\n"), + this->request_id_generator_)); + + return this->request_id_generator_; } // Bind the dispatcher with the request id. @@ -45,8 +66,8 @@ TAO_Muxed_TMS::bind_dispatcher (CORBA::ULong request_id, if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P | %t):TAO_Muxed_TMS::bind_dispatcher: ") - ACE_TEXT ("bind dispatcher failed: result = %d\n"), - result)); + ACE_TEXT ("bind dispatcher failed: result = %d, request id = %d \n"), + result, request_id)); return -1; } diff --git a/TAO/tao/Muxed_TMS.h b/TAO/tao/Muxed_TMS.h index 0824dc1d72f..124f5839ae2 100644 --- a/TAO/tao/Muxed_TMS.h +++ b/TAO/tao/Muxed_TMS.h @@ -83,19 +83,6 @@ protected: /// Keep track of the orb core pointer. We need to this to create the /// Reply Dispatchers. TAO_ORB_Core *orb_core_; - - // @@ Commented for the time being, let the commented line stay for - // sometime - Bala - // TAO_GIOP_Message_State *message_state_; - // Message state where the current input message is being read. This - // is created at start of each incoming message. When that message - // is read, the message is processed and for the next message a new - // message state is created. - - // @@ Having members of type TAO_GIOP* indicates that we - // (Transport_Mux_Strategy) are aware of the underlying messaging - // protocol. But for the present let us close our eyes till we are - // able to iterate on a use case - Bala. }; #include "ace/post.h" diff --git a/TAO/tao/Resource_Factory.cpp b/TAO/tao/Resource_Factory.cpp index 92251adacbf..8711b484454 100644 --- a/TAO/tao/Resource_Factory.cpp +++ b/TAO/tao/Resource_Factory.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "ace/Auto_Ptr.h" #include "ace/Dynamic_Service.h" @@ -165,6 +166,13 @@ TAO_Resource_Factory::purge_percentage (void) const } int +TAO_Resource_Factory::max_muxed_connections (void) const +{ + return 0; +} + + +int TAO_Resource_Factory::get_parser_names (char **&, int &) { @@ -178,6 +186,13 @@ TAO_Resource_Factory::create_cached_connection_lock (void) } int +TAO_Resource_Factory::locked_transport_cache (void) +{ + return 0; +} + + +int TAO_Resource_Factory::load_default_protocols (void) { return 0; diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h index cf9c7560395..6a137f593d3 100644 --- a/TAO/tao/Resource_Factory.h +++ b/TAO/tao/Resource_Factory.h @@ -181,12 +181,22 @@ public: /// cache. virtual int purge_percentage (void) const; + /// Return the number of muxed connections that are allowed for a + /// remote endpoint + virtual int max_muxed_connections (void) const; + virtual int get_parser_names (char **&names, int &number_of_names); /// Creates the lock for the lock needed in the Cache Map + /// @@todo: This method needs to go away as it doesnt make much + /// sense now. virtual ACE_Lock *create_cached_connection_lock (void); + /// Should the transport cache have a lock or not? Return 1 if the + /// transport cache needs to be locked else return 0 + virtual int locked_transport_cache (void); + /// Creates the flushing strategy. The new instance is owned by the /// caller. virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0; diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index aae2a3a4adc..db34ab04558 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -27,7 +27,10 @@ # include "Transport.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID(tao, Transport, "$Id$") +ACE_RCSID (tao, + Transport, + "$Id$") + TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) : ACE_Refcountable (refcount) @@ -994,16 +997,34 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, ACE_CDR::grow (&incoming, payload); - // .. do a read on the socket again. - ssize_t n = this->recv (incoming.wr_ptr (), - missing_data, - max_wait_time); + ssize_t n = 0; - if (TAO_debug_level > 6) + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (ssize_t bytes = missing_data; + bytes != 0; + bytes -= n) { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Read [%d] bytes on attempt \n", - n)); + // .. do a read on the socket again. + n = this->recv (incoming.wr_ptr (), + bytes, + max_wait_time); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Read [%d] bytes on attempt \n", + n)); + } + + if (n == 0 || + n == -1) + { + break; + } + + incoming.wr_ptr (n); + missing_data -= n; } // If we got an error.. @@ -1019,16 +1040,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, return -1; } - // If we had gooten a EWOULDBLOCK n would be equal to zero. But we + // If we had gotten a EWOULDBLOCK n would be equal to zero. But we // have to put the message in the queue anyway. So let us proceed // to do that and return... - // Move the write pointer - incoming.wr_ptr (n); - - // ..Decrement - missing_data -= n; - // Check to see if we have messages in queue or if we have missing // data . AT this point we cannot have have semi-complete messages // in the queue as they would have been taken care before. Put diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index f1a4d525b72..614b83a7050 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -260,8 +260,8 @@ public: CORBA::Octet minor) = 0; /// Get/Set the bidirectional flag - virtual int bidirectional_flag (void) const; - virtual void bidirectional_flag (int flag); + int bidirectional_flag (void) const; + void bidirectional_flag (int flag); /// Fill in a handle_set with any associated handler's reactor handle. /** diff --git a/TAO/tao/Transport_Cache_Manager.cpp b/TAO/tao/Transport_Cache_Manager.cpp index f5470abc336..9baebf6a41f 100644 --- a/TAO/tao/Transport_Cache_Manager.cpp +++ b/TAO/tao/Transport_Cache_Manager.cpp @@ -4,7 +4,8 @@ #include "tao/ORB_Core.h" #include "tao/Resource_Factory.h" #include "tao/Connection_Purging_Strategy.h" - +#include "tao/Condition.h" +#include "ace/Synch_T.h" #include "ace/Handle_Set.h" #if !defined (__ACE_INLINE__) @@ -16,23 +17,65 @@ ACE_RCSID (TAO, Transport_Cache_Manager, "$Id$") + TAO_Transport_Cache_Manager::TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core) : percent_ (orb_core.resource_factory ()->purge_percentage ()), purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ()), cache_map_ (ACE_static_cast (size_t, ACE::max_handles ())), - cache_lock_ (orb_core.resource_factory ()->create_cached_connection_lock ()) + condition_ (0), + cache_lock_ (0), + muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ()), + no_waiting_threads_ (0), + last_entry_returned_ (0) { + if (orb_core.resource_factory ()->locked_transport_cache ()) + { + ACE_NEW (this->condition_, + TAO_Condition <TAO_SYNCH_MUTEX>); + + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ())); + } + else + { + /// If the cache is not going to be locked then dont create a + /// condition variable. Make the <muxed_number_> to 0, else a + /// single thread could get into waiting mode + this->muxed_number_ = 0; + ACE_NEW (this->cache_lock_, + ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>); + } } TAO_Transport_Cache_Manager::~TAO_Transport_Cache_Manager (void) { + // Wakeup all the waiting threads threads before we shutdown stuff + if (this->no_waiting_threads_) + this->condition_->broadcast (); + // Delete the lock that we have - delete this->cache_lock_; + if (this->cache_lock_) + { + delete this->cache_lock_; + this->cache_lock_ = 0; + } // Delete the purging strategy - delete this->purging_strategy_; + if (this->purging_strategy_) + { + delete this->purging_strategy_; + this->purging_strategy_ = 0; + } + + // Delete the condition variable + if (this->condition_) + { + delete this->condition_; + this->condition_ = 0; + } } + int TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, TAO_Cache_IntId &int_id) @@ -63,8 +106,7 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, { // The entry has been added to cache succesfully // Add the cache_map_entry to the transport - int_id.transport () ->cache_map_entry (entry); - + int_id.transport ()->cache_map_entry (entry); } else if (retval == 1) { @@ -98,13 +140,10 @@ TAO_Transport_Cache_Manager::bind_i (TAO_Cache_ExtId &ext_id, return retval; } -// Used to be in the .inl, but moved here b/c the use of -// TAO_Transport::_duplicate() caused an include file cycle with -// inlining turned on. int TAO_Transport_Cache_Manager::find_transport ( - TAO_Transport_Descriptor_Interface *prop, - TAO_Transport *&transport) + TAO_Transport_Descriptor_Interface *prop, + TAO_Transport *&transport) { if (prop == 0) { @@ -127,6 +166,30 @@ TAO_Transport_Cache_Manager::find_transport ( } int +TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key, + TAO_Cache_IntId &value) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->cache_lock_, + -1)); + + int status = this->find_i (key, + value); + + if (status == 0) + { + // Update the purging strategy information while we + // are holding our lock + this->purging_strategy_->update_item (value.transport ()); + } + + return status; +} + + + +int TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value) { @@ -135,20 +198,19 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, // Get the entry from the Hash Map int retval = 0; - // Get the index of the <key> - int index = key.index (); - // Make a temporary object. It does not do a copy. TAO_Cache_ExtId tmp_key (key.property ()); while (retval == 0) { + // Wait for a connection.. + this->wait_for_connection (tmp_key); + // Look for an entry in the map retval = this->cache_map_.find (tmp_key, entry); - // We have an entry in the map, check whether it is idle. If it - // is idle it would be marked as busy. + // We have an entry in the map, check whether it is idle. if (entry) { CORBA::Boolean idle = @@ -174,7 +236,7 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, } // Bump the index up - tmp_key.index (++index); + tmp_key.incr_index (); } // If we are here then it is an error @@ -189,32 +251,13 @@ TAO_Transport_Cache_Manager::find_i (const TAO_Cache_ExtId &key, } int -TAO_Transport_Cache_Manager::rebind_i (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value) -{ - return this->cache_map_.rebind (key, - value); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key) -{ - return this->cache_map_.unbind (key); -} - -int -TAO_Transport_Cache_Manager::unbind_i (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - return this->cache_map_.unbind (key, - value); -} - -int TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) { // First get the entry again (if at all things had changed in the // cache map in the mean time) + + // @todo: Is this required? Looks like a legacy one.. + HASH_MAP_ENTRY *new_entry = 0; int retval = this->cache_map_.find (entry->ext_id_, new_entry); @@ -225,6 +268,16 @@ TAO_Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry) recycle_state (ACE_RECYCLABLE_IDLE_AND_PURGABLE); entry = new_entry; + + // Does any one need waking? + if (this->no_waiting_threads_) + { + // We returned this entry to the map + this->last_entry_returned_ = &new_entry->ext_id_; + + // Wake up a thread + this->condition_->signal (); + } } else if (TAO_debug_level > 0 && retval != 0) { @@ -250,23 +303,10 @@ TAO_Transport_Cache_Manager::close_i (ACE_Handle_Set &reactor_registered, if ((*iter).int_id_.recycle_state () != ACE_RECYCLABLE_CLOSED) { -#if 0 - // @@ This code from Connection_Cache_Manager disappeared - // during the changeover; we need the functional equivalent back. - // The problem is that with the locking stuff that we're putting - // in to the Transport, we might want to encapsulate the whole - // exercise of adding to the handle set in a method on the transport - // rather than doing it here. That way, the locking is correct. - if ((*iter).int_id_.handler ()->is_registered ()) - { - reactor_registered.set_bit ((*iter).int_id_.handler ()->fetch_handle ()); - } -#else // Get the transport to fill its associated connection's handle in // the handle sets. (*iter).int_id_.transport ()->provide_handle (reactor_registered, unregistered); -#endif // Inform the transport that has a reference to the entry in the // map that we are *gone* now. So, the transport should not use // the reference to the entry that he has, to acces us *at any @@ -495,6 +535,70 @@ TAO_Transport_Cache_Manager::close_entries(DESCRIPTOR_SET& sorted_set, sorted_set = 0; } +int +TAO_Transport_Cache_Manager::wait_for_connection (TAO_Cache_ExtId &extid) +{ + if (this->muxed_number_ && + this->muxed_number_ == extid.index ()) + { + // If we have a limit on the number of muxed connections for + // a particular endpoint just wait to get the connection + ++this->no_waiting_threads_; + + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Going to wait for connections.. \n")); + + int ready_togo = 0; + + while (ready_togo == 0) + { + this->condition_->wait (); + + // Check whether we are waiting for this connection + ready_togo = this->is_wakeup_useful (extid); + } + + // We are not waiting anymore + --this->no_waiting_threads_; + } + + return 0; +} + +int +TAO_Transport_Cache_Manager::is_wakeup_useful (TAO_Cache_ExtId &extid) +{ + // Get the underlying property that we are looking for + TAO_Transport_Descriptor_Interface *prop = extid.property (); + + // Just check the underlying property for equivalence. If the last + // connection that was returned had the same property just return + // 1. + if (this->last_entry_returned_ && + prop->is_equivalent (this->last_entry_returned_->property ())) + { + // Set the index to be right so that we can pick teh connection + // right away.. + extid.index (this->last_entry_returned_->index ()); + + // There is no more use for it ... + this->last_entry_returned_ = 0; + + return 1; + } + + // If there is an entry that was returned and if there are more + // threads just wake up the peer to check for the returned + // connection. + if (this->last_entry_returned_ && + this->no_waiting_threads_ > 1) + { + this->condition_->signal (); + } + + return 0; +} #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/Transport_Cache_Manager.h b/TAO/tao/Transport_Cache_Manager.h index 0454ab4fe2b..5a87d7f7e72 100644 --- a/TAO/tao/Transport_Cache_Manager.h +++ b/TAO/tao/Transport_Cache_Manager.h @@ -15,19 +15,26 @@ #define TAO_CONNECTION_CACHE_MANAGER_H #include "ace/pre.h" -#include "ace/Hash_Map_Manager_T.h" +#include "tao/Cache_Entries.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) #define ACE_LACKS_PRAGMA_ONCE #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "ace/Hash_Map_Manager_T.h" +#include "ace/Synch_T.h" + +// #include "tao/TAO_Export.h" +// #include "tao/Cache_Entries.h" +// #include "tao/Connection_Purging_Strategy.h" -#include "tao/TAO_Export.h" -#include "tao/Cache_Entries.h" -#include "tao/Connection_Purging_Strategy.h" class TAO_ORB_Core; class ACE_Handle_Set; class TAO_Resource_Factory; +class TAO_Connection_Purging_Strategy; + +template <class ACE_COND_MUTEX> class TAO_Condition; typedef ACE_Unbounded_Set<ACE_Event_Handler*> TAO_EventHandlerSet; typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*> @@ -36,19 +43,16 @@ typedef ACE_Unbounded_Set_Iterator<ACE_Event_Handler*> /** * @class TAO_Transport_Cache_Manager * - * @brief The Transport Cache Manager for TAO. + * @brief The Transport Cache Manager for TAO * - * This class provides interfaces associating a TAO_Cache_ExtId and - * TAO_Cache_IntId. This class manages a ACE_Hash_Map_Manager class - * which is used as a container to Cache the connections. This class - * protects the entries with a lock. The map can be updated only by - * holding the lock. + * This class provides interfaces associating a TAO_Cache_ExtId + * & TAO_Cache_IntId. This class is wrapper around the + * ACE_Hash_Map_Manager class which is used as a container to Cache + * the connections. This class protects the entries with a lock. The + * map is updated only by holding the lock. The more compelling reason + * to have the lock in this class and not in the Hash_Map is that, we + * do quite a bit of work in this class for which we need a lock. * - * @note This class at present has an interface that may not be - * needed. But, the interface has just been copied from the ACE - * Hash Map Manager classes. The interface wold be pruned once I - * get the purging stuff also in. Till then let the interface be - * there as it is. */ class TAO_Export TAO_Transport_Cache_Manager { @@ -66,6 +70,9 @@ public: typedef ACE_Hash_Map_Entry <TAO_Cache_ExtId, TAO_Cache_IntId> HASH_MAP_ENTRY; + typedef TAO_Condition<TAO_SYNCH_MUTEX> CONDITION; + + // == Public methods /// Constructor TAO_Transport_Cache_Manager (TAO_ORB_Core &orb_core); @@ -117,19 +124,6 @@ private: int find (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value); - /// Reassociate the <key> with <value>. Grabs the lock and calls the - /// implementation function find_i. - int rebind (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value); - - /// Remove <key> from the cache. - int unbind (const TAO_Cache_ExtId &key); - - /// Remove <key> from the cache, and return the <value> associated with - /// <key>. - int unbind (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value); - /** * Non-Locking version and actual implementation of bind () * call. Calls bind on the Hash_Map_Manager that it holds. If the @@ -150,17 +144,6 @@ private: int find_i (const TAO_Cache_ExtId &key, TAO_Cache_IntId &value); - /// Non-locking version and actual implementation of rebind () call - int rebind_i (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value); - - /// Non-locking version and actual implementation of unbind () call - int unbind_i (const TAO_Cache_ExtId &key); - - /// Non-locking version and actual implementation of unbind () call - int unbind_i (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value); - /// Non-locking version and actual implementation of make_idle (). int make_idle_i (HASH_MAP_ENTRY *&entry); @@ -213,7 +196,17 @@ private: /// the required number of items in the set. void close_entries (DESCRIPTOR_SET& sorted_set, int size); + /// Wait for connections if we have reached the limit on the number + /// of muxed connections. If not (ie. if we dont use a muxed + /// connection or if we have not reached the limit) this just + /// behaves as a no-op. <extid> has all the information about the + /// connection that is being searched. + int wait_for_connection (TAO_Cache_ExtId &extid); + + /// Is the wakeup useful todo some work? + int is_wakeup_useful (TAO_Cache_ExtId &extid); private: + /// The percentage of the cache to purge at one time int percent_; @@ -223,8 +216,23 @@ private: /// The hash map that has the connections HASH_MAP cache_map_; - /// Lock for the map + /// The condition variable + CONDITION *condition_; + + /// The lock that is used by the cache map ACE_Lock *cache_lock_; + + /// Number of allowed muxed connections + CORBA::ULong muxed_number_; + + /// Number of threads waiting for connections + int no_waiting_threads_; + + /// This is for optimization purposes. In a situation where number + /// of threads are waiting for connections, the last connection that + /// is put back is cached here. This should prevent all th threads + /// trying to search for their required entry. + TAO_Cache_ExtId *last_entry_returned_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Transport_Cache_Manager.inl b/TAO/tao/Transport_Cache_Manager.inl index 0840e7961c9..601864ade4f 100644 --- a/TAO/tao/Transport_Cache_Manager.inl +++ b/TAO/tao/Transport_Cache_Manager.inl @@ -16,28 +16,6 @@ TAO_Transport_Cache_Manager::bind (TAO_Cache_ExtId &ext_id, ACE_INLINE int -TAO_Transport_Cache_Manager::find (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - int status = this->find_i (key, - value); - - if (status == 0) - { - // Update the purging strategy information while we - // are holding our lock - this->purging_strategy_->update_item (value.transport ()); - } - return status; -} - - -ACE_INLINE int TAO_Transport_Cache_Manager::cache_transport ( TAO_Transport_Descriptor_Interface *prop, TAO_Transport *transport) @@ -52,44 +30,6 @@ TAO_Transport_Cache_Manager::cache_transport ( } ACE_INLINE int -TAO_Transport_Cache_Manager::rebind (const TAO_Cache_ExtId &key, - const TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->rebind_i (key, - value); -} - -ACE_INLINE int -TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->unbind_i (key); -} - -ACE_INLINE int -TAO_Transport_Cache_Manager::unbind (const TAO_Cache_ExtId &key, - TAO_Cache_IntId &value) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->cache_lock_, - -1)); - - return this->unbind_i (key, - value); -} - - -ACE_INLINE int TAO_Transport_Cache_Manager::purge (void) { ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0)); diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp index 7bc5ebe37a6..e280ccd5040 100644 --- a/TAO/tao/default_resource.cpp +++ b/TAO/tao/default_resource.cpp @@ -40,6 +40,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void) connection_caching_type_ (TAO_CONNECTION_CACHING_STRATEGY), cache_maximum_ (TAO_CONNECTION_CACHE_MAXIMUM), purge_percentage_ (TAO_PURGE_PERCENT), + max_muxed_connections_ (0), reactor_mask_signals_ (1), dynamically_allocated_reactor_ (0), options_processed_ (0), @@ -350,7 +351,20 @@ TAO_Default_Resource_Factory::init (int argc, ACE_TCHAR *argv[]) this->report_option_value_error (ACE_LIB_TEXT("-ORBFlushingStrategy"), name); } } - else if (ACE_OS::strncmp (argv[curarg], ACE_LIB_TEXT("-ORB"), 4) == 0) + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_LIB_TEXT ("-ORBMuxedConnectionMax")) == 0) + { + curarg++; + if (curarg < argc) + this->max_muxed_connections_ = + ACE_OS::atoi (argv[curarg]); + else + this->report_option_value_error ("-ORBMuxedConnectionMax", + argv[curarg]); + } + else if (ACE_OS::strncmp (argv[curarg], + ACE_LIB_TEXT ("-ORB"), + 4) == 0) { // Can we assume there is an argument after the option? // curarg++; @@ -552,7 +566,7 @@ TAO_Default_Resource_Factory::init_protocol_factories (void) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("TAO (%P|%t) Unable to load ") ACE_LIB_TEXT ("protocol <%s>, %p\n"), - ACE_TEXT_CHAR_TO_TCHAR(name.c_str ()), + ACE_TEXT_CHAR_TO_TCHAR(name.c_str ()), ACE_LIB_TEXT ("")), -1); } @@ -739,6 +753,13 @@ TAO_Default_Resource_Factory::purge_percentage (void) const return this->purge_percentage_; } +int +TAO_Default_Resource_Factory::max_muxed_connections (void) const +{ + return this->max_muxed_connections_; +} + + ACE_Lock * TAO_Default_Resource_Factory::create_cached_connection_lock (void) { @@ -756,6 +777,16 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void) return the_lock; } +int +TAO_Default_Resource_Factory::locked_transport_cache (void) +{ + if (this->cached_connection_lock_type_ == TAO_NULL_LOCK) + return 0; + + return 1; +} + + TAO_Flushing_Strategy * TAO_Default_Resource_Factory::create_flushing_strategy (void) { diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h index 76b5ab4a026..c0c11bba5a4 100644 --- a/TAO/tao/default_resource.h +++ b/TAO/tao/default_resource.h @@ -116,7 +116,9 @@ public: virtual TAO_Resource_Factory::Caching_Strategy connection_caching_strategy_type (void) const; virtual int cache_maximum (void) const; virtual int purge_percentage (void) const; + virtual int max_muxed_connections (void) const; virtual ACE_Lock *create_cached_connection_lock (void); + virtual int locked_transport_cache (void); virtual TAO_Flushing_Strategy *create_flushing_strategy (void); virtual TAO_Connection_Purging_Strategy *create_purging_strategy (void); virtual TAO_LF_Strategy *create_lf_strategy (void); @@ -169,6 +171,11 @@ protected: /// demand. int purge_percentage_; + /// Specifies the limit on the number of muxed connections + /// allowed per-property for the ORB. A value of 0 indicates no + /// limit + size_t max_muxed_connections_; + /// If <0> then we create reactors with signal handling disabled. int reactor_mask_signals_; |