summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp393
1 files changed, 393 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
new file mode 100644
index 00000000000..8efc4b7a5d7
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
@@ -0,0 +1,393 @@
+// $Id$
+
+#include "orbsvcs/Event/ECG_Mcast_EH.h"
+#include "orbsvcs/Event/EC_Gateway_UDP.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+#if !defined(__ACE_INLINE__)
+#include "ECG_Mcast_EH.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, ECG_UDP_EH, "$Id$")
+
+TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv,
+ const ACE_TCHAR *net_if)
+ : net_if_ (net_if?ACE_OS::strdup (net_if):0)
+ , receiver_ (recv)
+ , observer_ (this)
+{
+}
+
+TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
+{
+ ACE_OS::free (this->net_if_);
+}
+
+// @@ TODO Why have a return code *and* exceptions? Only one would do!
+
+int
+TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ // @@ TODO Think about the exception safety (or lack thereof) of
+ // this code, what if the following operations fail?
+
+ this->ec_ = RtecEventChannelAdmin::EventChannel::_duplicate (ec);
+
+ // @@ TODO This activation should be configured by the application,
+ // it is too much policy to use _this().
+ // @@ TODO Using a full instance instead of a pointer makes memory
+ // management complex. The observer_ object should derive from
+ // PortableServer::RefCountServantBase and use the POA to control
+ // its lifetime.
+
+ RtecEventChannelAdmin::Observer_var obs =
+ this->observer_._this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_TRY
+ {
+ this->handle_ =
+ this->ec_->append_observer (obs.in (), ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ }
+ ACE_CATCH(CORBA::SystemException, ex)
+ {
+ // @@ TODO This code is tedious and error prone, plus its
+ // exceptions should be ignored (no way to recover from them),
+ // we should encapsulate it in a Deactivator.
+
+ PortableServer::POA_var poa =
+ this->observer_._default_POA (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->observer_, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
+TAO_ECG_Mcast_EH::close (CORBA::Environment& ACE_TRY_ENV)
+{
+ if (this->handle_ == 0)
+ return 0;
+
+ RtecEventChannelAdmin::Observer_Handle h = this->handle_;
+ this->handle_ = 0;
+ this->ec_->remove_observer (h, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ // @@ TODO If the first operation raises an exception then the
+ // second one never executes!!!
+ {
+ PortableServer::POA_var poa =
+ this->observer_._default_POA (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->observer_, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ }
+
+ // Ignore the result, the handler could have been removed by a call
+ // to update_consumer() or something similar.
+ (void) this->reactor ()->remove_handler (this,
+ ACE_Event_Handler::READ_MASK);
+
+ size_t sockets_size = this->sockets_.size ();
+ for (size_t i = 0; i != sockets_size; ++i)
+ {
+ // Ignore any errors that we may have when closing the socket,
+ // there is nothing we can do about them at this point....
+ (void) this->sockets_[i]->close();
+ delete this->sockets_[i];
+ }
+ this->sockets_.size (0);
+
+ // Once all the sockets are closed they no longer are subscribed for
+ // anything
+ this->subscriptions_.unbind_all ();
+
+ return 0;
+}
+
+int
+TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
+{
+ // @@ TODO How expensive is this loop? Should we use a more
+ // efficient data structure than an Array? For example a RB tree
+ // or a hash map?
+ size_t sockets_size = this->sockets_.size ();
+ for (size_t i = 0; i != sockets_size; ++i)
+ {
+ ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i];
+ if (socket->get_handle () == fd)
+ {
+ return this->receiver_->handle_input (*socket);
+ }
+ }
+ return -1;
+}
+
+void
+TAO_ECG_Mcast_EH::compute_required_subscriptions (
+ const RtecEventChannelAdmin::ConsumerQOS& sub,
+ Address_Set& multicast_addresses,
+ CORBA::Environment& ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ CORBA::ULong count = sub.dependencies.length ();
+ for (CORBA::ULong i = 0; i != count; ++i)
+ {
+ const RtecEventComm::EventHeader& header =
+ sub.dependencies[i].event.header;
+ if (0 <= header.type && header.type < ACE_ES_EVENT_UNDEFINED)
+ {
+ continue;
+ }
+ RtecUDPAdmin::UDP_Addr addr;
+
+ // For the time being we are exception neutral, i.e., if an
+ // exception is raised at this point we simply propagate it.
+ // Notice that we haven't performed any changes to the state of
+ // the current class, so this is the easiest approach. The
+ // alternatives are:
+ // + Ignore the exception and continue with the operation: risky
+ // because we don't know if the exception indicated a
+ // temporary or permanent condition, and if it is the former
+ // we may loose an important multicast group, with no
+ // opportunity to recover it.
+ // + Close the MCast Event Handler completely, too much policy
+ // for this level.
+ this->receiver_->get_addr (header, addr, ACE_TRY_ENV);
+ ACE_CHECK;
+
+ ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
+ // Ignore errors, if the element is in the set we simply ignore
+ // the problem...
+ (void) multicast_addresses.insert (inet_addr);
+ }
+}
+
+int
+TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
+ Address_Set& multicast_addresses)
+{
+ Subscriptions_Iterator j = this->subscriptions_.begin ();
+ while (j != this->subscriptions_.end ())
+ {
+ ACE_INET_Addr multicast_group = (*j).ext_id_;
+ if (multicast_addresses.find (multicast_group))
+ {
+ // Remove from the list of subscriptions that should be
+ // added, because it is already there...
+ (void) multicast_addresses.remove (multicast_group);
+ ++j;
+ continue;
+ }
+
+ char buf[256];
+ multicast_group.addr_to_string (buf, sizeof buf, 1);
+ ACE_SOCK_Dgram_Mcast *socket = (*j).int_id_;
+ // Ignore errors, there is no appropriate policy to handle them
+ // at this layer.
+ // @@ TODO Consider if we should raise exceptions.
+ socket->unsubscribe (multicast_group, this->net_if_);
+ // Increment and then remove, this is a safe way to remove the
+ // element without invalidating the iterator.
+ ++j;
+ this->subscriptions_.unbind (multicast_group);
+ }
+ return 0;
+}
+
+int
+TAO_ECG_Mcast_EH::subscribe_to_existing_socket (
+ ACE_INET_Addr& multicast_group)
+{
+ size_t sockets_size = this->sockets_.size ();
+ int result = -1;
+ for (size_t i = 0; i != sockets_size; ++i)
+ {
+ ACE_SOCK_Dgram_Mcast *socket = this->sockets_[i];
+ result =
+ socket->subscribe (multicast_group, 1, this->net_if_);
+ if (result == 0)
+ {
+ // Add the subscription to the subscription list
+ (void) this->subscriptions_.bind (multicast_group, socket);
+ break;
+ }
+ /* assert(result == -1) */
+ if (errno == ENOBUFS || errno == ETOOMANYREFS)
+ {
+ // The socket is full, try with the next one...
+ continue;
+ }
+ // @@ TODO: There was an error, but the problem is not with
+ // too many subscriptions... what we need to do is close the
+ // socket? Or skip the subscription?
+ }
+ return result;
+}
+
+void
+TAO_ECG_Mcast_EH::subscribe_to_new_socket(
+ ACE_INET_Addr& multicast_group)
+{
+ ACE_SOCK_Dgram_Mcast *socket;
+ ACE_NEW (socket, ACE_SOCK_Dgram_Mcast);
+ size_t sockets_size = this->sockets_.size ();
+
+ if (this->sockets_.max_size () == sockets_size)
+ {
+ this->sockets_.max_size (sockets_size + 1);
+ this->sockets_.size (sockets_size + 1);
+ }
+ this->sockets_[sockets_size] = socket;
+ socket->subscribe (multicast_group, 1, this->net_if_);
+
+ // @@ TODO: we have no way to handle this failure....
+ (void) this->subscriptions_.bind (multicast_group, socket);
+
+ // @@ TODO: we have no way to handle this failure....
+ (void) this->reactor ()->register_handler (socket->get_handle (),
+ this,
+ ACE_Event_Handler::READ_MASK);
+}
+
+void
+TAO_ECG_Mcast_EH::add_new_subscriptions (
+ Address_Set& multicast_addresses)
+{
+ typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
+ for (Address_Iterator k = multicast_addresses.begin ();
+ k != multicast_addresses.end ();
+ ++k)
+ {
+ // This is the multicast group we want to add to...
+ ACE_INET_Addr multicast_group = *k;
+
+ char buf[256];
+ multicast_group.addr_to_string (buf, sizeof buf, 1);
+
+ // Try to find a socket that has space for another multicast
+ // group.
+ int successful_subscription =
+ this->subscribe_to_existing_socket (multicast_group);
+
+ if (successful_subscription == -1)
+ {
+ // We try all the sockets and all of them appear to be full,
+ // we need to open a new one and add it to the Array...
+ this->subscribe_to_new_socket(multicast_group);
+ }
+ }
+}
+
+void
+TAO_ECG_Mcast_EH::update_consumer (
+ const RtecEventChannelAdmin::ConsumerQOS& sub,
+ CORBA::Environment& ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // @@ TODO This function turned out to be too long, we need to break
+ // it up in pieces, maybe 3 or 4 private member functions....
+
+ // 1) Figure out the list of multicast groups that we need to
+ // subscribe to
+
+ Address_Set multicast_addresses;
+
+ this->compute_required_subscriptions (sub, multicast_addresses, ACE_TRY_ENV);
+ ACE_CHECK;
+
+ // 2) To conserve OS and network resources we first unsubscribe from
+ // the multicast groups no longer wanted. This is done by
+ // iterating over the current set of multicast groups and
+ // removing those not present in the "multicast_addresses" set.
+
+ this->delete_unwanted_subscriptions (multicast_addresses);
+
+ // 3) After the loop above the "multicast_addresses" set contains
+ // only the new subscriptions... go ahead and add them to the
+ // sockets, opening sockets if needed.
+
+ this->add_new_subscriptions (multicast_addresses);
+}
+
+void
+TAO_ECG_Mcast_EH::update_supplier (const RtecEventChannelAdmin::SupplierQOS&,
+ CORBA::Environment&)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // Do nothing
+}
+
+// ****************************************************************
+
+TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
+ : eh_ (eh)
+{
+}
+
+void
+TAO_ECG_Mcast_EH::Observer::update_consumer (
+ const RtecEventChannelAdmin::ConsumerQOS& sub,
+ CORBA::Environment& ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->eh_->update_consumer (sub, ACE_TRY_ENV);
+}
+
+void
+TAO_ECG_Mcast_EH::Observer::update_supplier (
+ const RtecEventChannelAdmin::SupplierQOS& pub,
+ CORBA::Environment& ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->eh_->update_supplier (pub, ACE_TRY_ENV);
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>;
+template class ACE_Hash<ACE_INET_Addr>;
+template class ACE_Equal_To<ACE_INET_Addr>;
+template class ACE_Unbounded_Set<ACE_INET_Addr>;
+template class ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>;
+template class ACE_Unbounded_Set_Iterator<ACE_INET_Addr>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>
+#pragma instantiate ACE_Hash<ACE_INET_Addr>
+#pragma instantiate ACE_Equal_To<ACE_INET_Addr>
+#pragma instantiate ACE_Unbounded_Set<ACE_INET_Addr>
+#pragma instantiate ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>
+#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_INET_Addr>
+#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
+#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */