summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp303
1 files changed, 0 insertions, 303 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
deleted file mode 100644
index e72dafca760..00000000000
--- a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.cpp
+++ /dev/null
@@ -1,303 +0,0 @@
-// $Id$
-
-#include "orbsvcs/Event/ECG_Mcast_EH.h"
-#include "orbsvcs/Event/ECG_UDP_Receiver.h"
-#include "orbsvcs/Event_Service_Constants.h"
-#include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
-#include "ace/Unbounded_Set.h"
-#include "ace/OS_NS_string.h"
-#include "ace/Reactor.h"
-#include "ace/os_include/os_fcntl.h"
-
-#if !defined(__ACE_INLINE__)
-#include "orbsvcs/Event/ECG_Mcast_EH.i"
-#endif /* __ACE_INLINE__ */
-
-ACE_RCSID(Event, ECG_Mcast_EH, "$Id$")
-
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
-TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
- const ACE_TCHAR *net_if,
- CORBA::ULong sz)
- : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
- , subscriptions_ ()
- , receiver_ (recv)
- , recvbuf_size_ (sz)
- , observer_ ()
- , auto_observer_disconnect_ ()
-{
- ACE_ASSERT (this->receiver_);
-}
-
-TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
-{
- ACE_OS::free (this->net_if_);
-}
-
-void
-TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
- ACE_ENV_ARG_DECL)
-{
- if (!this->receiver_)
- {
- // We are shut down.
- ACE_THROW (CORBA::INTERNAL());
- }
-
- if (CORBA::is_nil (ec))
- {
- ACE_ERROR ((LM_ERROR, "TAO_ECG_Mcast_EH::open(): "
- "nil ec argument"));
- ACE_THROW (CORBA::INTERNAL ());
- }
-
- // Create and activate Event Channel Observer.
- ACE_NEW (this->observer_,
- Observer (this));
-
- if (!this->observer_.in ())
- {
- ACE_THROW (CORBA::NO_MEMORY ());
- }
-
- TAO_EC_Object_Deactivator observer_deactivator;
- RtecEventChannelAdmin::Observer_var observer_ref;
- PortableServer::POA_var poa =
- this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- activate (observer_ref,
- poa.in (),
- this->observer_.in (),
- observer_deactivator
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- RtecEventChannelAdmin::Observer_Handle handle =
- ec->append_observer (observer_ref.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- this->observer_->set_deactivator (observer_deactivator);
- this->auto_observer_disconnect_.set_command
- (Observer_Disconnect_Command (handle, ec));
-}
-
-int
-TAO_ECG_Mcast_EH::shutdown (void)
-{
- // Already shut down.
- if (!this->receiver_)
- return -1;
-
- // Disconnect Observer from EC.
- this->auto_observer_disconnect_.execute ();
-
- // Shutdown the observer.
- if (this->observer_.in ())
- {
- this->observer_->shutdown ();
- this->observer_ = 0;
- }
-
- // Indicates that we are in a shutdown state.
- this->receiver_ = 0;
-
- // Deregister from reactor, close and clean up sockets.
- size_t subscriptions_size = this->subscriptions_.size ();
- for (size_t i = 0; i != subscriptions_size; ++i)
- {
- (void) this->reactor ()->remove_handler (
- this->subscriptions_[i].dgram->get_handle (),
- ACE_Event_Handler::READ_MASK);
- (void) this->subscriptions_[i].dgram->close();
- delete this->subscriptions_[i].dgram;
- }
- this->subscriptions_.size (0);
-
- return 0;
-}
-
-int
-TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
-{
- size_t subscriptions_size = this->subscriptions_.size ();
- for (size_t i = 0; i != subscriptions_size; ++i)
- {
- ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
- if (socket->get_handle () == fd)
- {
- return this->receiver_->handle_input (*socket);
- }
- }
- return -1;
-}
-
-void
-TAO_ECG_Mcast_EH::update_consumer (
- const RtecEventChannelAdmin::ConsumerQOS& sub
- ACE_ENV_ARG_DECL)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- Address_Set multicast_addresses;
-
- this->compute_required_subscriptions (sub,
- multicast_addresses
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- this->delete_unwanted_subscriptions (multicast_addresses);
-
- this->add_new_subscriptions (multicast_addresses);
-}
-
-void
-TAO_ECG_Mcast_EH::compute_required_subscriptions (
- const RtecEventChannelAdmin::ConsumerQOS& sub,
- Address_Set& multicast_addresses
- ACE_ENV_ARG_DECL)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- CORBA::ULong count = sub.dependencies.length ();
- for (CORBA::ULong i = 0; i != count; ++i)
- {
- const RtecEventComm::EventHeader& header =
- sub.dependencies[i].event.header;
- if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
- {
- continue;
- }
- RtecUDPAdmin::UDP_Addr addr;
-
- this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
- // Ignore errors, if the element is in the set we simply ignore
- // the problem...
- (void) multicast_addresses.insert (inet_addr);
- }
-}
-
-int
-TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
- Address_Set& multicast_addresses)
-{
- for (size_t i = 0; i < this->subscriptions_.size (); ++i)
- {
- ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
- if (multicast_addresses.find (multicast_group))
- {
- // Remove from the list of subscriptions to be added,
- // because we already subscribe to it...
- (void) multicast_addresses.remove (multicast_group);
- continue;
- }
-
- // This subscription is no longer needed - remove from reactor,
- // close and delete the socket.
- ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
- (void) this->reactor ()->remove_handler (socket->get_handle (),
- ACE_Event_Handler::READ_MASK);
- (void) socket->close();
- delete socket;
- // Move the deleted subscription out of the <subscriptions_>
- // array by moving the last subscription in array into its place.
- this->subscriptions_[i] =
- this->subscriptions_[this->subscriptions_.size () - 1];
- this->subscriptions_.size (this->subscriptions_.size () - 1);
- --i;
- }
-
- return 0;
-}
-
-void
-TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
-{
- typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
- for (Address_Iterator k = multicast_addresses.begin ();
- k != multicast_addresses.end ();
- ++k)
- {
- Subscription new_subscription;
- new_subscription.mcast_addr = *k;
- ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
-
- size_t subscriptions_size = this->subscriptions_.size ();
- this->subscriptions_.size (subscriptions_size + 1);
- this->subscriptions_[subscriptions_size] = new_subscription;
-
- ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
-
- if (socket->open (new_subscription.mcast_addr, this->net_if_, 1) == -1) {
- ACE_ERROR ((LM_ERROR,
- "Error: %d - Unable to open multicast socket\n",
- errno ));
- }
-
- if ( socket->enable (ACE_NONBLOCK) != 0 ) {
- ACE_ERROR ((LM_ERROR,
- "Error: %d - Unable to enable nonblocking on mcast_eh\n",
- errno ));
- }
-
- if (socket->join (new_subscription.mcast_addr, 1, this->net_if_) == -1) {
- ACE_ERROR ((LM_ERROR,
- "Error: %d - Unable to join multicast group\n",
- errno ));
- }
-
- if (this->recvbuf_size_ != 0
- && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
- SO_RCVBUF,
- (void *) &this->recvbuf_size_,
- sizeof (this->recvbuf_size_)) == -1)
- && errno != ENOTSUP )
- {
- ACE_ERROR ((LM_ERROR,
- "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
- errno,
- this->recvbuf_size_));
- }
- (void) this->reactor ()->register_handler (
- socket->get_handle (),
- this,
- ACE_Event_Handler::READ_MASK);
- }
-}
-
-// ****************************************************************
-
-TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
- : eh_ (eh)
-{
-}
-
-void
-TAO_ECG_Mcast_EH::Observer::update_consumer (
- const RtecEventChannelAdmin::ConsumerQOS& sub
- ACE_ENV_ARG_DECL)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- if (this->eh_)
- this->eh_->update_consumer (sub ACE_ENV_ARG_PARAMETER);
-}
-
-void
-TAO_ECG_Mcast_EH::Observer::update_supplier (
- const RtecEventChannelAdmin::SupplierQOS&
- ACE_ENV_ARG_DECL_NOT_USED)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
-}
-
-void
-TAO_ECG_Mcast_EH::Observer::shutdown (void)
-{
- this->eh_ = 0;
- this->deactivator_.deactivate ();
-}
-
-TAO_END_VERSIONED_NAMESPACE_DECL