diff options
Diffstat (limited to 'TAO')
-rw-r--r-- | TAO/ChangeLog | 21 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp | 342 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h | 78 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i | 14 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_MT_Mcast/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Mcast/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Throughput/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/Basic/Makefile | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/Performance/Makefile | 2 |
11 files changed, 324 insertions, 145 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index d6764e1a683..f883ef97c49 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,24 @@ +Fri Dec 6 05:20:28 UTC 2002 Don Hinton <dhinton@ieee.org> + + * orbsvcs/orbsvcs/Event/ECG_Mcast_EH.{h,i,cpp}: Refactored + code to properly remove event handlers from the reactor. This + included refcounting the event handlers. Added ifdef for + ACE_LACKS_PERFECT_MULTICAST_FILTERING so that platforms that + bind the multicast address by default don't even try to + join multiple groups on a single socket before adding a new + socket. Also added a deactivator for the Observer class to + help simplify deactivation on shutdown and in case of exceptions. + Thanks to Russ Noseworthy for reporting this. + + * orbsvcs/tests/EC_Custom_Marshal/Makefile: + * orbsvcs/tests/EC_MT_Mcast/Makefile: + * orbsvcs/tests/EC_Mcast/Makefile: + * orbsvcs/tests/EC_Multiple/Makefile: + * orbsvcs/tests/EC_Throughput/Makefile: + * orbsvcs/tests/Event/Basic/Makefile: + * orbsvcs/tests/Event/Performance/Makefile: Added link to + libTAO_Utils to pick up Implicit_Deactivator. + Thu Dec 5 14:42:21 2002 Balachandran Natarajan <bala@isis-server.isis.vanderbilt.edu> * tests/Server_Connection_Purging/client.cpp: Fixed a warning in diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp index 28b3e5896c7..6661680cbfc 100644 --- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp @@ -1,28 +1,67 @@ // $Id$ #include "orbsvcs/Event/ECG_Mcast_EH.h" + +#if !defined(__ACE_INLINE__) +#include "ECG_Mcast_EH.i" +#endif /* __ACE_INLINE__ */ + #include "orbsvcs/Event/EC_Gateway_UDP.h" #include "orbsvcs/Event_Service_Constants.h" +#include "tao/Utils/Implicit_Deactivator.h" #include "ace/Reactor.h" -#if !defined(__ACE_INLINE__) -#include "ECG_Mcast_EH.i" -#endif /* __ACE_INLINE__ */ +ACE_RCSID(Event, ECG_Mcast_EH, "$Id$") -ACE_RCSID(Event, ECG_UDP_EH, "$Id$") +TAO_ECG_Mcast_Socket::TAO_ECG_Mcast_Socket (ACE_SOCK_Dgram_Mcast::options opts, + ACE_Lock *lock) + : ACE_SOCK_Dgram_Mcast (opts) + , TAO_Synch_Refcountable (lock, 1) +{ + if (!this->refcount_lock_) + ACE_NEW_NORETURN (this->refcount_lock_, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>); + ACE_ASSERT (TAO_Synch_Refcountable::refcount () == 1); +} + +TAO_ECG_Mcast_Socket::~TAO_ECG_Mcast_Socket (void) +{ +} + +int +TAO_ECG_Mcast_Socket::decrement (void) +{ + int count = TAO_Synch_Refcountable::decrement (); + ACE_ASSERT (count >= 0); + if (count == 0) + { + this->close (); + delete this; + } + return count; +} + +// **************************************************************** TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv, - const ACE_TCHAR *net_if) - : net_if_ (net_if?ACE_OS::strdup (net_if):0) + const ACE_TCHAR *net_if, + ACE_Lock *lock) + : net_if_ (ACE::strnew (net_if)) + , lock_ (lock) , receiver_ (recv) , observer_ (this) + , handle_ (0) { + if (!this->lock_) + ACE_NEW_NORETURN (this->lock_,ACE_Lock_Adapter <TAO_SYNCH_MUTEX>); } TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void) { - ACE_OS::free (this->net_if_); + if (this->handle_) + this->close (); + ACE::strdelete (this->net_if_); + delete this->lock_; } // @@ TODO Why have a return code *and* exceptions? Only one would do! @@ -31,6 +70,12 @@ int TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec ACE_ENV_ARG_DECL) { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1)); + + // Have we already been opened? + if (this->handle_) + return -1; + // @@ TODO Think about the exception safety (or lack thereof) of // this code, what if the following operations fail? @@ -38,10 +83,6 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec // @@ TODO This activation should be configured by the application, // it is too much policy to use _this(). - // @@ TODO Using a full instance instead of a pointer makes memory - // management complex. The observer_ object should derive from - // PortableServer::RefCountServantBase and use the POA to control - // its lifetime. RtecEventChannelAdmin::Observer_var obs = this->observer_._this (ACE_ENV_SINGLE_ARG_PARAMETER); @@ -55,18 +96,8 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec } ACE_CATCH(CORBA::SystemException, ex) { - // @@ TODO This code is tedious and error prone, plus its - // exceptions should be ignored (no way to recover from them), - // we should encapsulate it in a Deactivator. - - PortableServer::POA_var poa = - this->observer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->observer_ ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); + TAO::Utils::Implicit_Deactivator deactivator (&this->observer_ + ACE_ENV_ARG_PARAMETER); ACE_RE_THROW; } @@ -79,65 +110,81 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec int TAO_ECG_Mcast_EH::close (ACE_ENV_SINGLE_ARG_DECL) { + // handle_ is the Observer_Handle. if (this->handle_ == 0) return 0; - RtecEventChannelAdmin::Observer_Handle h = this->handle_; - this->handle_ = 0; - this->ec_->remove_observer (h ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1)); + + // Double check they we're open. + if (this->handle_ == 0) + return 0; - // @@ TODO If the first operation raises an exception then the - // second one never executes!!! - { - PortableServer::POA_var poa = - this->observer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->observer_ ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - } - - // Ignore the result, the handler could have been removed by a call - // to update_consumer() or something similar. - (void) this->reactor ()->remove_handler (this, - ACE_Event_Handler::READ_MASK); - - size_t sockets_size = this->sockets_.size (); - for (size_t i = 0; i != sockets_size; ++i) + Handle_Iterator h_iter = this->handles_.begin (); + while (h_iter != this->handles_.end ()) { - // Ignore any errors that we may have when closing the socket, - // there is nothing we can do about them at this point.... - (void) this->sockets_[i]->close(); - delete this->sockets_[i]; + TAO_ECG_Mcast_Socket *socket = (*h_iter).int_id_; + ACE_HANDLE handle = (*h_iter).ext_id_; + if (this->reactor ()->remove_handler (handle, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("failed to remove mcast handler from ") + ACE_TEXT ("reactor.\n"))); + socket->decrement (); + ++h_iter; } - this->sockets_.size (0); - // Once all the sockets are closed they no longer are subscribed for - // anything + // Once all the sockets are closed they no longer are subscribed for anything this->subscriptions_.unbind_all (); + this->handles_.unbind_all (); + + TAO::Utils::Implicit_Deactivator deactivator (&this->observer_ + ACE_ENV_ARG_PARAMETER); + + RtecEventChannelAdmin::Observer_Handle h = this->handle_; + this->handle_ = 0; + this->ec_->remove_observer (h ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); return 0; } +TAO_ECG_Mcast_Socket* +TAO_ECG_Mcast_EH::find (ACE_HANDLE fd) +{ + TAO_ECG_Mcast_Socket *socket = 0; + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0)); + if (this->handles_.find (fd, socket) == 0) + socket->increment (); + return socket; +} + int TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd) { - // @@ TODO How expensive is this loop? Should we use a more - // efficient data structure than an Array? For example a RB tree - // or a hash map? - size_t sockets_size = this->sockets_.size (); - for (size_t i = 0; i != sockets_size; ++i) + TAO_ECG_Mcast_Socket *socket = this->find (fd); + if (socket) { - ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i]; - if (socket->get_handle () == fd) + if (this->receiver_->handle_input (*socket) == -1) { - return this->receiver_->handle_input (*socket); + // Since we are working with UDP multicast sockets here, there aren't + // a whole lot of errors we can get that will make us want to close + // the close the socket, but if we return -1 to the reactor, that's + // exactly what will happend. Or, more precisely, the reactor will + // remove us from it's handle set and call back our handle_close() + // method. Since that's not what we generally want, we just log + // the message and always return 0. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("handle_input() returned an error (%p)\n"))); } + + // No matter what happened, we need to decrement the refcount that + // find() incremented. + socket->decrement (); } - return -1; + + return 0; } void @@ -182,8 +229,10 @@ TAO_ECG_Mcast_EH::compute_required_subscriptions ( int TAO_ECG_Mcast_EH::delete_unwanted_subscriptions ( - Address_Set& multicast_addresses) + Address_Set& multicast_addresses) { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0)); + Subscriptions_Iterator j = this->subscriptions_.begin (); while (j != this->subscriptions_.end ()) { @@ -197,17 +246,29 @@ TAO_ECG_Mcast_EH::delete_unwanted_subscriptions ( continue; } - ACE_TCHAR buf[256]; - multicast_group.addr_to_string (buf, sizeof buf, 1); - ACE_SOCK_Dgram_Mcast *socket = (*j).int_id_; + this->subscriptions_.unbind (multicast_group); + TAO_ECG_Mcast_Socket *socket = (*j).int_id_; // Ignore errors, there is no appropriate policy to handle them // at this layer. // @@ TODO Consider if we should raise exceptions. - socket->unsubscribe (multicast_group, this->net_if_); + ACE_HANDLE handle = socket->get_handle (); + socket->leave (multicast_group, this->net_if_); + if (this->reactor ()->remove_handler (handle, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL) + == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("failed to remove mcast handler from ") + ACE_TEXT ("reactor.\n"))); + + this->handles_.unbind (handle); + // We're done with it, so decrement the refcount so it can get + // deleted (either now or when it finishes an upcall. + socket->decrement (); + // Increment and then remove, this is a safe way to remove the // element without invalidating the iterator. ++j; - this->subscriptions_.unbind (multicast_group); } return 0; } @@ -216,28 +277,21 @@ int TAO_ECG_Mcast_EH::subscribe_to_existing_socket ( ACE_INET_Addr& multicast_group) { - size_t sockets_size = this->sockets_.size (); - int result = -1; - for (size_t i = 0; i != sockets_size; ++i) + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1)); + int result = 0; + + Handle_Iterator h_iter = this->handles_.begin (); + while (h_iter != this->handles_.end ()) { - ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i]; - result = - socket->subscribe (multicast_group, 1, this->net_if_); + TAO_ECG_Mcast_Socket *socket = (*h_iter).int_id_; + result = socket->join (multicast_group, 1, this->net_if_); if (result != -1) { // Add the subscription to the subscription list (void) this->subscriptions_.bind (multicast_group, socket); break; } - /* assert(result == -1) */ - if (errno == ENOBUFS || errno == ETOOMANYREFS) - { - // The socket is full, try with the next one... - continue; - } - // @@ TODO: There was an error, but the problem is not with - // too many subscriptions... what we need to do is close the - // socket? Or skip the subscription? + ++h_iter; } return result; } @@ -246,25 +300,27 @@ void TAO_ECG_Mcast_EH::subscribe_to_new_socket( ACE_INET_Addr& multicast_group) { - ACE_SOCK_Dgram_Mcast *socket; - ACE_NEW (socket, ACE_SOCK_Dgram_Mcast); - size_t sockets_size = this->sockets_.size (); - - if (this->sockets_.max_size () == sockets_size) - { - this->sockets_.max_size (sockets_size + 1); - this->sockets_.size (sockets_size + 1); - } - this->sockets_[sockets_size] = socket; - socket->subscribe (multicast_group, 1, this->net_if_); - + TAO_ECG_Mcast_Socket *socket; + // Always bind the address, which means that we will only get traffic + // that was sent to the "single" group we joined and won't get any + // cross-talk. + ACE_NEW (socket, TAO_ECG_Mcast_Socket(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES)); + + socket->join (multicast_group, 1, this->net_if_); + ACE_HANDLE handle = socket->get_handle (); + + // We only need the lock long enough to update the lists. + ACE_MT (ACE_GUARD (ACE_Lock, ace_mon, *this->lock_)); + this->handles_.bind (handle, socket); // @@ TODO: we have no way to handle this failure.... (void) this->subscriptions_.bind (multicast_group, socket); + ACE_MT (ace_mon.release ()); // @@ TODO: we have no way to handle this failure.... - (void) this->reactor ()->register_handler (socket->get_handle (), - this, - ACE_Event_Handler::READ_MASK); + if (this->reactor ()->register_handler (handle, + this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("reg handle failed\n"))); } void @@ -279,19 +335,26 @@ TAO_ECG_Mcast_EH::add_new_subscriptions ( // This is the multicast group we want to add to... ACE_INET_Addr multicast_group = *k; - ACE_TCHAR buf[256]; - multicast_group.addr_to_string (buf, sizeof buf, 1); - // Try to find a socket that has space for another multicast // group. - int successful_subscription = + int successful_subscription = -1; + + // It only makes sense to try subscribing to an existing + // socket if the OS supports it. Otherwise it will always + // fail since we bind the address by default and you can't + // join a multicast group that's different from the one + // bound to the socket. +#if defined (ACE_LACKS_PERFECT_MULTICAST_FILTERING) \ + && (ACE_LACKS_PERFECT_MULTICAST_FILTERING == 0) + successful_subscription = this->subscribe_to_existing_socket (multicast_group); +# endif /* ACE_LACKS_PERFECT_MULTICAST_FILTERING == 0 */ if (successful_subscription == -1) { // We try all the sockets and all of them appear to be full, // we need to open a new one and add it to the Array... - this->subscribe_to_new_socket(multicast_group); + this->subscribe_to_new_socket (multicast_group); } } } @@ -302,15 +365,18 @@ TAO_ECG_Mcast_EH::update_consumer ( ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ TODO This function turned out to be too long, we need to break - // it up in pieces, maybe 3 or 4 private member functions.... + // Make sure we are open for business. + if (!this->handle_) + return; // 1) Figure out the list of multicast groups that we need to // subscribe to Address_Set multicast_addresses; - this->compute_required_subscriptions (sub, multicast_addresses ACE_ENV_ARG_PARAMETER); + this->compute_required_subscriptions (sub, + multicast_addresses + ACE_ENV_ARG_PARAMETER); ACE_CHECK; // 2) To conserve OS and network resources we first unsubscribe from @@ -330,7 +396,7 @@ TAO_ECG_Mcast_EH::update_consumer ( void TAO_ECG_Mcast_EH::update_supplier (const RtecEventChannelAdmin::SupplierQOS& ACE_ENV_ARG_DECL_NOT_USED) - ACE_THROW_SPEC ((CORBA::SystemException)) + ACE_THROW_SPEC ((CORBA::SystemException)) { // Do nothing } @@ -362,36 +428,60 @@ TAO_ECG_Mcast_EH::Observer::update_supplier ( #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; -template class ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>; +template class ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket*, ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Entry<ACE_INET_Addr, TAO_ECG_Mcast_Socket *>; template class ACE_Hash<ACE_INET_Addr>; template class ACE_Node<ACE_INET_Addr>; template class ACE_Equal_To<ACE_INET_Addr>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; + +template class ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Entry<ACE_HANDLE, TAO_ECG_Mcast_Socket * >; +template class ACE_Hash<ACE_HANDLE>; +template class ACE_Node<ACE_HANDLE>; +template class ACE_Equal_To<ACE_HANDLE>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket*, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>; + template class ACE_Unbounded_Set<ACE_INET_Addr>; -template class ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>; template class ACE_Unbounded_Set_Iterator<ACE_INET_Addr>; -template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; -template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>; #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *> +#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket *,ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, TAO_ECG_Mcast_Socket *> #pragma instantiate ACE_Hash<ACE_INET_Addr> #pragma instantiate ACE_Node<ACE_INET_Addr> #pragma instantiate ACE_Equal_To<ACE_INET_Addr> +#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *,ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> + +#pragma instantiate ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Entry<ACE_HANDLE, TAO_ECG_Mcast_Socket * > +#pragma instantiate ACE_Hash<ACE_HANDLE> +#pragma instantiate ACE_Node<ACE_HANDLE> +#pragma instantiate ACE_Equal_To<ACE_HANDLE> +#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_HANDLE, TAO_Mcast_Socket*, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex> + #pragma instantiate ACE_Unbounded_Set<ACE_INET_Addr> -#pragma instantiate ACE_Array_Base<ACE_SOCK_Dgram_Mcast *> #pragma instantiate ACE_Unbounded_Set_Iterator<ACE_INET_Addr> -#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> -#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h index fa9fc0dd703..dc0359e9089 100644 --- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h @@ -6,6 +6,7 @@ * * @author Carlos O'Ryan <coryan@uci.edu> * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu> + * @author and Don Hinton <dhinton@ieee.org> * * http://doc.ece.uci.edu/~coryan/EC/index.html * @@ -15,15 +16,50 @@ #include "ace/pre.h" #include "orbsvcs/Event/event_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + #include "orbsvcs/RtecEventChannelAdminS.h" #include "ace/Event_Handler.h" #include "ace/Hash_Map_Manager.h" #include "ace/Array_Base.h" #include "ace/SOCK_Dgram_Mcast.h" +#include "ace/Synch.h" +#include "ace/Atomic_Op.h" +#include "tao/Synch_Refcountable.h" +//#include "tao/Utils/Servant_Var.h" -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +/** + * @class TAO_ECG_Mcast_Socket + * + * @brief Refcounted wrapper for ACE_SOCK_Dgram_Mcast + * + * This class is a simple refcounted wrapper around ACE_SOCK_Dgram_Mcast that + * makes takes care of deleting itself if the refcount drops to zero. + */ +class TAO_RTEvent_Export TAO_ECG_Mcast_Socket : + public ACE_SOCK_Dgram_Mcast, private TAO_Synch_Refcountable +{ +public: + /// Default Constructor + TAO_ECG_Mcast_Socket (ACE_SOCK_Dgram_Mcast::options opts = + ACE_SOCK_Dgram_Mcast::DEFOPTS, + ACE_Lock *lock = 0); + + int increment (void); + int decrement (void); + int refcount (void) const; + +private: + /// We need a friend so that we have a private dtor> + friend class ACE_SOCK_Dgram_Mcast; + + /// Destructor. Force it on the Heap, so we can call delete this in + /// decrement(). + ~TAO_ECG_Mcast_Socket (void); +}; class TAO_ECG_UDP_Receiver; @@ -46,7 +82,8 @@ public: * expected using <net_if> */ TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv, - const ACE_TCHAR *net_if = 0); + const ACE_TCHAR *net_if = 0, + ACE_Lock *lock = 0); /// Destructor virtual ~TAO_ECG_Mcast_EH (void); @@ -69,7 +106,7 @@ public: int close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); /// Reactor callbacks - virtual int handle_input (ACE_HANDLE fd); + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); /// The Observer methods void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub @@ -90,7 +127,9 @@ public: * compilers. */ - class Observer : public POA_RtecEventChannelAdmin::Observer + class Observer + : public virtual POA_RtecEventChannelAdmin::Observer + , public virtual PortableServer::RefCountServantBase { public: /// We report changes in the EC subscriptions to the event @@ -113,6 +152,10 @@ public: }; private: + /// Find the socket mapped to a particular fd. find() holds a lock and calls + /// increment () on the socket, if found, and returns a pointer to it. + TAO_ECG_Mcast_Socket *find (ACE_HANDLE fd); + typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set; /// Compute the list of multicast addresses that we need to be @@ -178,15 +221,26 @@ private: ACE_TCHAR *net_if_; /// Define the collection used to keep the iterator - typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Map; - typedef ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Iterator; + typedef ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> + Subscriptions_Map; + typedef ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> + Subscriptions_Iterator; - /// @@ Please describe this map and its role in the class! + /// Map of multicast address to sockets, used to manage the subscription + /// to multicast address by the update_{consumer|supplier} () methods. Subscriptions_Map subscriptions_; - /// The datagram used to receive the data. - typedef ACE_Array_Base<ACE_SOCK_Dgram_Mcast*> Socket_List; - Socket_List sockets_; + typedef ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> + Handle_Map; + typedef ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex> + Handle_Iterator; + + /// Map of ACE_HANDLEs to sockets. Used to find the appropriate socket in + /// calls to handle_input (). + Handle_Map handles_; + + /// The lock used to protect the lists. + ACE_Lock *lock_; /// We callback to this object when a message arrives. TAO_ECG_UDP_Receiver* receiver_; diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i index cfa1da318d3..d13f27b7c10 100644 --- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i @@ -1 +1,15 @@ +/* -*- C++ -*- */ // $Id$ + +int +TAO_ECG_Mcast_Socket::increment (void) +{ + return TAO_Synch_Refcountable::increment (); +} + +int +TAO_ECG_Mcast_Socket::refcount (void) const +{ + return TAO_Synch_Refcountable::refcount (); +} + diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile b/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile index 74f55ecdca2..abc3acc52fa 100644 --- a/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile +++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile @@ -1,6 +1,6 @@ # $Id$ -LDLIBS= -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO +LDLIBS= -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils ifndef TAO_ROOT TAO_ROOT = $(ACE_ROOT)/TAO diff --git a/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile b/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile index 2bd88b5c1db..59daff5ebde 100644 --- a/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile +++ b/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile @@ -15,7 +15,7 @@ endif # ! TAO_ROOT BIN2 = MCast SRC=MCast.cpp Consumer.cpp Supplier.cpp AddrServer.cpp -LDLIBS = -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO +LDLIBS = -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \ $(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc)) diff --git a/TAO/orbsvcs/tests/EC_Mcast/Makefile b/TAO/orbsvcs/tests/EC_Mcast/Makefile index bda0daaf7d3..c139fc0ca44 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/Makefile +++ b/TAO/orbsvcs/tests/EC_Mcast/Makefile @@ -1,7 +1,7 @@ # $Id$ SRC = $(BIN:%=%$(VAR).cpp) -LDLIBS= -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO +LDLIBS= -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils ifndef TAO_ROOT TAO_ROOT = $(ACE_ROOT)/TAO diff --git a/TAO/orbsvcs/tests/EC_Multiple/Makefile b/TAO/orbsvcs/tests/EC_Multiple/Makefile index cc5a6c32e8e..9d7978a5a73 100644 --- a/TAO/orbsvcs/tests/EC_Multiple/Makefile +++ b/TAO/orbsvcs/tests/EC_Multiple/Makefile @@ -1,7 +1,7 @@ # $Id$ SRC = $(BIN:%=%$(VAR).cpp) -LDLIBS= -lTAO_RTOLDEvent -lTAO_RTSchedEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO +LDLIBS= -lTAO_RTOLDEvent -lTAO_RTSchedEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils ifndef TAO_ROOT TAO_ROOT = $(ACE_ROOT)/TAO diff --git a/TAO/orbsvcs/tests/EC_Throughput/Makefile b/TAO/orbsvcs/tests/EC_Throughput/Makefile index d2b7a9ad0ee..c2bad07ac5d 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/Makefile +++ b/TAO/orbsvcs/tests/EC_Throughput/Makefile @@ -1,6 +1,6 @@ # $Id$ -LDLIBS = -lTAO_RTOLDEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO +LDLIBS = -lTAO_RTOLDEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils ifndef TAO_ROOT TAO_ROOT = $(ACE_ROOT)/TAO diff --git a/TAO/orbsvcs/tests/Event/Basic/Makefile b/TAO/orbsvcs/tests/Event/Basic/Makefile index 28ac0e04ae1..5099b310dde 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Makefile +++ b/TAO/orbsvcs/tests/Event/Basic/Makefile @@ -29,7 +29,7 @@ BIN2 = Reconnect \ Random PSRC=$(addsuffix .cpp,$(BIN2)) Schedule.cpp -LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_Messaging -lTAO_IORTable -lTAO_PortableServer -lTAO +LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_Messaging -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils CPPFLAGS += -I../lib -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \ $(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc)) diff --git a/TAO/orbsvcs/tests/Event/Performance/Makefile b/TAO/orbsvcs/tests/Event/Performance/Makefile index 72ab2e9a3ae..508ae681dce 100644 --- a/TAO/orbsvcs/tests/Event/Performance/Makefile +++ b/TAO/orbsvcs/tests/Event/Performance/Makefile @@ -19,7 +19,7 @@ BIN2 = Throughput \ Latency_Server PSRC=$(addsuffix .cpp,$(BIN2)) -LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO_Strategies -lTAO +LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO_Strategies -lTAO -lTAO_Utils CPPFLAGS += -I../lib -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \ $(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc)) |