summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordhinton <dhinton@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-12-06 05:43:30 +0000
committerdhinton <dhinton@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-12-06 05:43:30 +0000
commit0b3df6bdba0e3b2ae82423898c839af308836fc6 (patch)
treee6e9d0a7da63a7610a2508e3e2352501783a5227
parentd47d873fe37cd7dc397b53436a0b1d9889d07be0 (diff)
downloadATCD-0b3df6bdba0e3b2ae82423898c839af308836fc6.tar.gz
Fri Dec 6 05:20:28 UTC 2002 Don Hinton <dhinton@ieee.org>
-rw-r--r--TAO/ChangeLog21
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp342
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h78
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i14
-rw-r--r--TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile2
-rw-r--r--TAO/orbsvcs/tests/EC_MT_Mcast/Makefile2
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/Makefile2
-rw-r--r--TAO/orbsvcs/tests/EC_Multiple/Makefile2
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/Makefile2
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Makefile2
-rw-r--r--TAO/orbsvcs/tests/Event/Performance/Makefile2
11 files changed, 324 insertions, 145 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index d6764e1a683..f883ef97c49 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,24 @@
+Fri Dec 6 05:20:28 UTC 2002 Don Hinton <dhinton@ieee.org>
+
+ * orbsvcs/orbsvcs/Event/ECG_Mcast_EH.{h,i,cpp}: Refactored
+ code to properly remove event handlers from the reactor. This
+ included refcounting the event handlers. Added ifdef for
+ ACE_LACKS_PERFECT_MULTICAST_FILTERING so that platforms that
+ bind the multicast address by default don't even try to
+ join multiple groups on a single socket before adding a new
+ socket. Also added a deactivator for the Observer class to
+ help simplify deactivation on shutdown and in case of exceptions.
+ Thanks to Russ Noseworthy for reporting this.
+
+ * orbsvcs/tests/EC_Custom_Marshal/Makefile:
+ * orbsvcs/tests/EC_MT_Mcast/Makefile:
+ * orbsvcs/tests/EC_Mcast/Makefile:
+ * orbsvcs/tests/EC_Multiple/Makefile:
+ * orbsvcs/tests/EC_Throughput/Makefile:
+ * orbsvcs/tests/Event/Basic/Makefile:
+ * orbsvcs/tests/Event/Performance/Makefile: Added link to
+ libTAO_Utils to pick up Implicit_Deactivator.
+
Thu Dec 5 14:42:21 2002 Balachandran Natarajan <bala@isis-server.isis.vanderbilt.edu>
* tests/Server_Connection_Purging/client.cpp: Fixed a warning in
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
index 28b3e5896c7..6661680cbfc 100644
--- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
@@ -1,28 +1,67 @@
// $Id$
#include "orbsvcs/Event/ECG_Mcast_EH.h"
+
+#if !defined(__ACE_INLINE__)
+#include "ECG_Mcast_EH.i"
+#endif /* __ACE_INLINE__ */
+
#include "orbsvcs/Event/EC_Gateway_UDP.h"
#include "orbsvcs/Event_Service_Constants.h"
+#include "tao/Utils/Implicit_Deactivator.h"
#include "ace/Reactor.h"
-#if !defined(__ACE_INLINE__)
-#include "ECG_Mcast_EH.i"
-#endif /* __ACE_INLINE__ */
+ACE_RCSID(Event, ECG_Mcast_EH, "$Id$")
-ACE_RCSID(Event, ECG_UDP_EH, "$Id$")
+TAO_ECG_Mcast_Socket::TAO_ECG_Mcast_Socket (ACE_SOCK_Dgram_Mcast::options opts,
+ ACE_Lock *lock)
+ : ACE_SOCK_Dgram_Mcast (opts)
+ , TAO_Synch_Refcountable (lock, 1)
+{
+ if (!this->refcount_lock_)
+ ACE_NEW_NORETURN (this->refcount_lock_, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>);
+ ACE_ASSERT (TAO_Synch_Refcountable::refcount () == 1);
+}
+
+TAO_ECG_Mcast_Socket::~TAO_ECG_Mcast_Socket (void)
+{
+}
+
+int
+TAO_ECG_Mcast_Socket::decrement (void)
+{
+ int count = TAO_Synch_Refcountable::decrement ();
+ ACE_ASSERT (count >= 0);
+ if (count == 0)
+ {
+ this->close ();
+ delete this;
+ }
+ return count;
+}
+
+// ****************************************************************
TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv,
- const ACE_TCHAR *net_if)
- : net_if_ (net_if?ACE_OS::strdup (net_if):0)
+ const ACE_TCHAR *net_if,
+ ACE_Lock *lock)
+ : net_if_ (ACE::strnew (net_if))
+ , lock_ (lock)
, receiver_ (recv)
, observer_ (this)
+ , handle_ (0)
{
+ if (!this->lock_)
+ ACE_NEW_NORETURN (this->lock_,ACE_Lock_Adapter <TAO_SYNCH_MUTEX>);
}
TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
{
- ACE_OS::free (this->net_if_);
+ if (this->handle_)
+ this->close ();
+ ACE::strdelete (this->net_if_);
+ delete this->lock_;
}
// @@ TODO Why have a return code *and* exceptions? Only one would do!
@@ -31,6 +70,12 @@ int
TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL)
{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1));
+
+ // Have we already been opened?
+ if (this->handle_)
+ return -1;
+
// @@ TODO Think about the exception safety (or lack thereof) of
// this code, what if the following operations fail?
@@ -38,10 +83,6 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
// @@ TODO This activation should be configured by the application,
// it is too much policy to use _this().
- // @@ TODO Using a full instance instead of a pointer makes memory
- // management complex. The observer_ object should derive from
- // PortableServer::RefCountServantBase and use the POA to control
- // its lifetime.
RtecEventChannelAdmin::Observer_var obs =
this->observer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
@@ -55,18 +96,8 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
}
ACE_CATCH(CORBA::SystemException, ex)
{
- // @@ TODO This code is tedious and error prone, plus its
- // exceptions should be ignored (no way to recover from them),
- // we should encapsulate it in a Deactivator.
-
- PortableServer::POA_var poa =
- this->observer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->observer_ ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
+ TAO::Utils::Implicit_Deactivator deactivator (&this->observer_
+ ACE_ENV_ARG_PARAMETER);
ACE_RE_THROW;
}
@@ -79,65 +110,81 @@ TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
int
TAO_ECG_Mcast_EH::close (ACE_ENV_SINGLE_ARG_DECL)
{
+ // handle_ is the Observer_Handle.
if (this->handle_ == 0)
return 0;
- RtecEventChannelAdmin::Observer_Handle h = this->handle_;
- this->handle_ = 0;
- this->ec_->remove_observer (h ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1));
+
+ // Double check they we're open.
+ if (this->handle_ == 0)
+ return 0;
- // @@ TODO If the first operation raises an exception then the
- // second one never executes!!!
- {
- PortableServer::POA_var poa =
- this->observer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->observer_ ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- }
-
- // Ignore the result, the handler could have been removed by a call
- // to update_consumer() or something similar.
- (void) this->reactor ()->remove_handler (this,
- ACE_Event_Handler::READ_MASK);
-
- size_t sockets_size = this->sockets_.size ();
- for (size_t i = 0; i != sockets_size; ++i)
+ Handle_Iterator h_iter = this->handles_.begin ();
+ while (h_iter != this->handles_.end ())
{
- // Ignore any errors that we may have when closing the socket,
- // there is nothing we can do about them at this point....
- (void) this->sockets_[i]->close();
- delete this->sockets_[i];
+ TAO_ECG_Mcast_Socket *socket = (*h_iter).int_id_;
+ ACE_HANDLE handle = (*h_iter).ext_id_;
+ if (this->reactor ()->remove_handler (handle,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("failed to remove mcast handler from ")
+ ACE_TEXT ("reactor.\n")));
+ socket->decrement ();
+ ++h_iter;
}
- this->sockets_.size (0);
- // Once all the sockets are closed they no longer are subscribed for
- // anything
+ // Once all the sockets are closed they no longer are subscribed for anything
this->subscriptions_.unbind_all ();
+ this->handles_.unbind_all ();
+
+ TAO::Utils::Implicit_Deactivator deactivator (&this->observer_
+ ACE_ENV_ARG_PARAMETER);
+
+ RtecEventChannelAdmin::Observer_Handle h = this->handle_;
+ this->handle_ = 0;
+ this->ec_->remove_observer (h ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
return 0;
}
+TAO_ECG_Mcast_Socket*
+TAO_ECG_Mcast_EH::find (ACE_HANDLE fd)
+{
+ TAO_ECG_Mcast_Socket *socket = 0;
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0));
+ if (this->handles_.find (fd, socket) == 0)
+ socket->increment ();
+ return socket;
+}
+
int
TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
{
- // @@ TODO How expensive is this loop? Should we use a more
- // efficient data structure than an Array? For example a RB tree
- // or a hash map?
- size_t sockets_size = this->sockets_.size ();
- for (size_t i = 0; i != sockets_size; ++i)
+ TAO_ECG_Mcast_Socket *socket = this->find (fd);
+ if (socket)
{
- ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i];
- if (socket->get_handle () == fd)
+ if (this->receiver_->handle_input (*socket) == -1)
{
- return this->receiver_->handle_input (*socket);
+ // Since we are working with UDP multicast sockets here, there aren't
+ // a whole lot of errors we can get that will make us want to close
+ // the close the socket, but if we return -1 to the reactor, that's
+ // exactly what will happend. Or, more precisely, the reactor will
+ // remove us from it's handle set and call back our handle_close()
+ // method. Since that's not what we generally want, we just log
+ // the message and always return 0.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("handle_input() returned an error (%p)\n")));
}
+
+ // No matter what happened, we need to decrement the refcount that
+ // find() incremented.
+ socket->decrement ();
}
- return -1;
+
+ return 0;
}
void
@@ -182,8 +229,10 @@ TAO_ECG_Mcast_EH::compute_required_subscriptions (
int
TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
- Address_Set& multicast_addresses)
+ Address_Set& multicast_addresses)
{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0));
+
Subscriptions_Iterator j = this->subscriptions_.begin ();
while (j != this->subscriptions_.end ())
{
@@ -197,17 +246,29 @@ TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
continue;
}
- ACE_TCHAR buf[256];
- multicast_group.addr_to_string (buf, sizeof buf, 1);
- ACE_SOCK_Dgram_Mcast *socket = (*j).int_id_;
+ this->subscriptions_.unbind (multicast_group);
+ TAO_ECG_Mcast_Socket *socket = (*j).int_id_;
// Ignore errors, there is no appropriate policy to handle them
// at this layer.
// @@ TODO Consider if we should raise exceptions.
- socket->unsubscribe (multicast_group, this->net_if_);
+ ACE_HANDLE handle = socket->get_handle ();
+ socket->leave (multicast_group, this->net_if_);
+ if (this->reactor ()->remove_handler (handle,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL)
+ == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("failed to remove mcast handler from ")
+ ACE_TEXT ("reactor.\n")));
+
+ this->handles_.unbind (handle);
+ // We're done with it, so decrement the refcount so it can get
+ // deleted (either now or when it finishes an upcall.
+ socket->decrement ();
+
// Increment and then remove, this is a safe way to remove the
// element without invalidating the iterator.
++j;
- this->subscriptions_.unbind (multicast_group);
}
return 0;
}
@@ -216,28 +277,21 @@ int
TAO_ECG_Mcast_EH::subscribe_to_existing_socket (
ACE_INET_Addr& multicast_group)
{
- size_t sockets_size = this->sockets_.size ();
- int result = -1;
- for (size_t i = 0; i != sockets_size; ++i)
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, -1));
+ int result = 0;
+
+ Handle_Iterator h_iter = this->handles_.begin ();
+ while (h_iter != this->handles_.end ())
{
- ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i];
- result =
- socket->subscribe (multicast_group, 1, this->net_if_);
+ TAO_ECG_Mcast_Socket *socket = (*h_iter).int_id_;
+ result = socket->join (multicast_group, 1, this->net_if_);
if (result != -1)
{
// Add the subscription to the subscription list
(void) this->subscriptions_.bind (multicast_group, socket);
break;
}
- /* assert(result == -1) */
- if (errno == ENOBUFS || errno == ETOOMANYREFS)
- {
- // The socket is full, try with the next one...
- continue;
- }
- // @@ TODO: There was an error, but the problem is not with
- // too many subscriptions... what we need to do is close the
- // socket? Or skip the subscription?
+ ++h_iter;
}
return result;
}
@@ -246,25 +300,27 @@ void
TAO_ECG_Mcast_EH::subscribe_to_new_socket(
ACE_INET_Addr& multicast_group)
{
- ACE_SOCK_Dgram_Mcast *socket;
- ACE_NEW (socket, ACE_SOCK_Dgram_Mcast);
- size_t sockets_size = this->sockets_.size ();
-
- if (this->sockets_.max_size () == sockets_size)
- {
- this->sockets_.max_size (sockets_size + 1);
- this->sockets_.size (sockets_size + 1);
- }
- this->sockets_[sockets_size] = socket;
- socket->subscribe (multicast_group, 1, this->net_if_);
-
+ TAO_ECG_Mcast_Socket *socket;
+ // Always bind the address, which means that we will only get traffic
+ // that was sent to the "single" group we joined and won't get any
+ // cross-talk.
+ ACE_NEW (socket, TAO_ECG_Mcast_Socket(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES));
+
+ socket->join (multicast_group, 1, this->net_if_);
+ ACE_HANDLE handle = socket->get_handle ();
+
+ // We only need the lock long enough to update the lists.
+ ACE_MT (ACE_GUARD (ACE_Lock, ace_mon, *this->lock_));
+ this->handles_.bind (handle, socket);
// @@ TODO: we have no way to handle this failure....
(void) this->subscriptions_.bind (multicast_group, socket);
+ ACE_MT (ace_mon.release ());
// @@ TODO: we have no way to handle this failure....
- (void) this->reactor ()->register_handler (socket->get_handle (),
- this,
- ACE_Event_Handler::READ_MASK);
+ if (this->reactor ()->register_handler (handle,
+ this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("reg handle failed\n")));
}
void
@@ -279,19 +335,26 @@ TAO_ECG_Mcast_EH::add_new_subscriptions (
// This is the multicast group we want to add to...
ACE_INET_Addr multicast_group = *k;
- ACE_TCHAR buf[256];
- multicast_group.addr_to_string (buf, sizeof buf, 1);
-
// Try to find a socket that has space for another multicast
// group.
- int successful_subscription =
+ int successful_subscription = -1;
+
+ // It only makes sense to try subscribing to an existing
+ // socket if the OS supports it. Otherwise it will always
+ // fail since we bind the address by default and you can't
+ // join a multicast group that's different from the one
+ // bound to the socket.
+#if defined (ACE_LACKS_PERFECT_MULTICAST_FILTERING) \
+ && (ACE_LACKS_PERFECT_MULTICAST_FILTERING == 0)
+ successful_subscription =
this->subscribe_to_existing_socket (multicast_group);
+# endif /* ACE_LACKS_PERFECT_MULTICAST_FILTERING == 0 */
if (successful_subscription == -1)
{
// We try all the sockets and all of them appear to be full,
// we need to open a new one and add it to the Array...
- this->subscribe_to_new_socket(multicast_group);
+ this->subscribe_to_new_socket (multicast_group);
}
}
}
@@ -302,15 +365,18 @@ TAO_ECG_Mcast_EH::update_consumer (
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- // @@ TODO This function turned out to be too long, we need to break
- // it up in pieces, maybe 3 or 4 private member functions....
+ // Make sure we are open for business.
+ if (!this->handle_)
+ return;
// 1) Figure out the list of multicast groups that we need to
// subscribe to
Address_Set multicast_addresses;
- this->compute_required_subscriptions (sub, multicast_addresses ACE_ENV_ARG_PARAMETER);
+ this->compute_required_subscriptions (sub,
+ multicast_addresses
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// 2) To conserve OS and network resources we first unsubscribe from
@@ -330,7 +396,7 @@ TAO_ECG_Mcast_EH::update_consumer (
void
TAO_ECG_Mcast_EH::update_supplier (const RtecEventChannelAdmin::SupplierQOS&
ACE_ENV_ARG_DECL_NOT_USED)
- ACE_THROW_SPEC ((CORBA::SystemException))
+ ACE_THROW_SPEC ((CORBA::SystemException))
{
// Do nothing
}
@@ -362,36 +428,60 @@ TAO_ECG_Mcast_EH::Observer::update_supplier (
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
-template class ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>;
+template class ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket*, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr, TAO_ECG_Mcast_Socket *>;
template class ACE_Hash<ACE_INET_Addr>;
template class ACE_Node<ACE_INET_Addr>;
template class ACE_Equal_To<ACE_INET_Addr>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+
+template class ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_HANDLE, TAO_ECG_Mcast_Socket * >;
+template class ACE_Hash<ACE_HANDLE>;
+template class ACE_Node<ACE_HANDLE>;
+template class ACE_Equal_To<ACE_HANDLE>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket*, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>;
+
template class ACE_Unbounded_Set<ACE_INET_Addr>;
-template class ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>;
template class ACE_Unbounded_Set_Iterator<ACE_INET_Addr>;
-template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
-template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>
+#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket *,ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, TAO_ECG_Mcast_Socket *>
#pragma instantiate ACE_Hash<ACE_INET_Addr>
#pragma instantiate ACE_Node<ACE_INET_Addr>
#pragma instantiate ACE_Equal_To<ACE_INET_Addr>
+#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *,ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+
+#pragma instantiate ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Entry<ACE_HANDLE, TAO_ECG_Mcast_Socket * >
+#pragma instantiate ACE_Hash<ACE_HANDLE>
+#pragma instantiate ACE_Node<ACE_HANDLE>
+#pragma instantiate ACE_Equal_To<ACE_HANDLE>
+#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_HANDLE, TAO_Mcast_Socket*, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Hash<ACE_HANDLE>, ACE_Equal_To<ACE_HANDLE>, ACE_Null_Mutex>
+
#pragma instantiate ACE_Unbounded_Set<ACE_INET_Addr>
-#pragma instantiate ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>
#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_INET_Addr>
-#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h
index fa9fc0dd703..dc0359e9089 100644
--- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h
@@ -6,6 +6,7 @@
*
* @author Carlos O'Ryan <coryan@uci.edu>
* @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu>
+ * @author and Don Hinton <dhinton@ieee.org>
*
* http://doc.ece.uci.edu/~coryan/EC/index.html
*
@@ -15,15 +16,50 @@
#include "ace/pre.h"
#include "orbsvcs/Event/event_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
#include "orbsvcs/RtecEventChannelAdminS.h"
#include "ace/Event_Handler.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/Array_Base.h"
#include "ace/SOCK_Dgram_Mcast.h"
+#include "ace/Synch.h"
+#include "ace/Atomic_Op.h"
+#include "tao/Synch_Refcountable.h"
+//#include "tao/Utils/Servant_Var.h"
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
+/**
+ * @class TAO_ECG_Mcast_Socket
+ *
+ * @brief Refcounted wrapper for ACE_SOCK_Dgram_Mcast
+ *
+ * This class is a simple refcounted wrapper around ACE_SOCK_Dgram_Mcast that
+ * makes takes care of deleting itself if the refcount drops to zero.
+ */
+class TAO_RTEvent_Export TAO_ECG_Mcast_Socket :
+ public ACE_SOCK_Dgram_Mcast, private TAO_Synch_Refcountable
+{
+public:
+ /// Default Constructor
+ TAO_ECG_Mcast_Socket (ACE_SOCK_Dgram_Mcast::options opts =
+ ACE_SOCK_Dgram_Mcast::DEFOPTS,
+ ACE_Lock *lock = 0);
+
+ int increment (void);
+ int decrement (void);
+ int refcount (void) const;
+
+private:
+ /// We need a friend so that we have a private dtor>
+ friend class ACE_SOCK_Dgram_Mcast;
+
+ /// Destructor. Force it on the Heap, so we can call delete this in
+ /// decrement().
+ ~TAO_ECG_Mcast_Socket (void);
+};
class TAO_ECG_UDP_Receiver;
@@ -46,7 +82,8 @@ public:
* expected using <net_if>
*/
TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv,
- const ACE_TCHAR *net_if = 0);
+ const ACE_TCHAR *net_if = 0,
+ ACE_Lock *lock = 0);
/// Destructor
virtual ~TAO_ECG_Mcast_EH (void);
@@ -69,7 +106,7 @@ public:
int close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
/// Reactor callbacks
- virtual int handle_input (ACE_HANDLE fd);
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
/// The Observer methods
void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
@@ -90,7 +127,9 @@ public:
* compilers.
*/
- class Observer : public POA_RtecEventChannelAdmin::Observer
+ class Observer
+ : public virtual POA_RtecEventChannelAdmin::Observer
+ , public virtual PortableServer::RefCountServantBase
{
public:
/// We report changes in the EC subscriptions to the event
@@ -113,6 +152,10 @@ public:
};
private:
+ /// Find the socket mapped to a particular fd. find() holds a lock and calls
+ /// increment () on the socket, if found, and returns a pointer to it.
+ TAO_ECG_Mcast_Socket *find (ACE_HANDLE fd);
+
typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set;
/// Compute the list of multicast addresses that we need to be
@@ -178,15 +221,26 @@ private:
ACE_TCHAR *net_if_;
/// Define the collection used to keep the iterator
- typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Map;
- typedef ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Iterator;
+ typedef ACE_Hash_Map_Manager<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+ Subscriptions_Map;
+ typedef ACE_Hash_Map_Iterator<ACE_INET_Addr, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+ Subscriptions_Iterator;
- /// @@ Please describe this map and its role in the class!
+ /// Map of multicast address to sockets, used to manage the subscription
+ /// to multicast address by the update_{consumer|supplier} () methods.
Subscriptions_Map subscriptions_;
- /// The datagram used to receive the data.
- typedef ACE_Array_Base<ACE_SOCK_Dgram_Mcast*> Socket_List;
- Socket_List sockets_;
+ typedef ACE_Hash_Map_Manager<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+ Handle_Map;
+ typedef ACE_Hash_Map_Iterator<ACE_HANDLE, TAO_ECG_Mcast_Socket *, ACE_Null_Mutex>
+ Handle_Iterator;
+
+ /// Map of ACE_HANDLEs to sockets. Used to find the appropriate socket in
+ /// calls to handle_input ().
+ Handle_Map handles_;
+
+ /// The lock used to protect the lists.
+ ACE_Lock *lock_;
/// We callback to this object when a message arrives.
TAO_ECG_UDP_Receiver* receiver_;
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i
index cfa1da318d3..d13f27b7c10 100644
--- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.i
@@ -1 +1,15 @@
+/* -*- C++ -*- */
// $Id$
+
+int
+TAO_ECG_Mcast_Socket::increment (void)
+{
+ return TAO_Synch_Refcountable::increment ();
+}
+
+int
+TAO_ECG_Mcast_Socket::refcount (void) const
+{
+ return TAO_Synch_Refcountable::refcount ();
+}
+
diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile b/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile
index 74f55ecdca2..abc3acc52fa 100644
--- a/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile
+++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/Makefile
@@ -1,6 +1,6 @@
# $Id$
-LDLIBS= -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO
+LDLIBS= -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils
ifndef TAO_ROOT
TAO_ROOT = $(ACE_ROOT)/TAO
diff --git a/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile b/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile
index 2bd88b5c1db..59daff5ebde 100644
--- a/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile
+++ b/TAO/orbsvcs/tests/EC_MT_Mcast/Makefile
@@ -15,7 +15,7 @@ endif # ! TAO_ROOT
BIN2 = MCast
SRC=MCast.cpp Consumer.cpp Supplier.cpp AddrServer.cpp
-LDLIBS = -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO
+LDLIBS = -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils
CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \
$(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc))
diff --git a/TAO/orbsvcs/tests/EC_Mcast/Makefile b/TAO/orbsvcs/tests/EC_Mcast/Makefile
index bda0daaf7d3..c139fc0ca44 100644
--- a/TAO/orbsvcs/tests/EC_Mcast/Makefile
+++ b/TAO/orbsvcs/tests/EC_Mcast/Makefile
@@ -1,7 +1,7 @@
# $Id$
SRC = $(BIN:%=%$(VAR).cpp)
-LDLIBS= -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO
+LDLIBS= -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils
ifndef TAO_ROOT
TAO_ROOT = $(ACE_ROOT)/TAO
diff --git a/TAO/orbsvcs/tests/EC_Multiple/Makefile b/TAO/orbsvcs/tests/EC_Multiple/Makefile
index cc5a6c32e8e..9d7978a5a73 100644
--- a/TAO/orbsvcs/tests/EC_Multiple/Makefile
+++ b/TAO/orbsvcs/tests/EC_Multiple/Makefile
@@ -1,7 +1,7 @@
# $Id$
SRC = $(BIN:%=%$(VAR).cpp)
-LDLIBS= -lTAO_RTOLDEvent -lTAO_RTSchedEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO
+LDLIBS= -lTAO_RTOLDEvent -lTAO_RTSchedEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils
ifndef TAO_ROOT
TAO_ROOT = $(ACE_ROOT)/TAO
diff --git a/TAO/orbsvcs/tests/EC_Throughput/Makefile b/TAO/orbsvcs/tests/EC_Throughput/Makefile
index d2b7a9ad0ee..c2bad07ac5d 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/Makefile
+++ b/TAO/orbsvcs/tests/EC_Throughput/Makefile
@@ -1,6 +1,6 @@
# $Id$
-LDLIBS = -lTAO_RTOLDEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO
+LDLIBS = -lTAO_RTOLDEvent -lTAO_RTSched -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_Utils
ifndef TAO_ROOT
TAO_ROOT = $(ACE_ROOT)/TAO
diff --git a/TAO/orbsvcs/tests/Event/Basic/Makefile b/TAO/orbsvcs/tests/Event/Basic/Makefile
index 28ac0e04ae1..5099b310dde 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Makefile
+++ b/TAO/orbsvcs/tests/Event/Basic/Makefile
@@ -29,7 +29,7 @@ BIN2 = Reconnect \
Random
PSRC=$(addsuffix .cpp,$(BIN2)) Schedule.cpp
-LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_Messaging -lTAO_IORTable -lTAO_PortableServer -lTAO
+LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_Messaging -lTAO_IORTable -lTAO_PortableServer -lTAO -lTAO_Utils
CPPFLAGS += -I../lib -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \
$(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc))
diff --git a/TAO/orbsvcs/tests/Event/Performance/Makefile b/TAO/orbsvcs/tests/Event/Performance/Makefile
index 72ab2e9a3ae..508ae681dce 100644
--- a/TAO/orbsvcs/tests/Event/Performance/Makefile
+++ b/TAO/orbsvcs/tests/Event/Performance/Makefile
@@ -19,7 +19,7 @@ BIN2 = Throughput \
Latency_Server
PSRC=$(addsuffix .cpp,$(BIN2))
-LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO_Strategies -lTAO
+LDLIBS = -lECTests -lTAO_RTEvent -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO_Strategies -lTAO -lTAO_Utils
CPPFLAGS += -I../lib -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \
$(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc))