diff options
Diffstat (limited to 'modules/CIAO/ciaosvcs/Events/CIAO_RTEC')
7 files changed, 1406 insertions, 0 deletions
diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h new file mode 100644 index 00000000000..d79e4a4581f --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h @@ -0,0 +1,58 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl CIAO_RTEVENT +// ------------------------------ +#ifndef CIAO_RTEVENT_EXPORT_H +#define CIAO_RTEVENT_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (CIAO_RTEVENT_HAS_DLL) +# define CIAO_RTEVENT_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && CIAO_RTEVENT_HAS_DLL */ + +#if !defined (CIAO_RTEVENT_HAS_DLL) +# define CIAO_RTEVENT_HAS_DLL 1 +#endif /* ! CIAO_RTEVENT_HAS_DLL */ + +#if defined (CIAO_RTEVENT_HAS_DLL) && (CIAO_RTEVENT_HAS_DLL == 1) +# if defined (CIAO_RTEVENT_BUILD_DLL) +# define CIAO_RTEVENT_Export ACE_Proper_Export_Flag +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* CIAO_RTEVENT_BUILD_DLL */ +# define CIAO_RTEVENT_Export ACE_Proper_Import_Flag +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* CIAO_RTEVENT_BUILD_DLL */ +#else /* CIAO_RTEVENT_HAS_DLL == 1 */ +# define CIAO_RTEVENT_Export +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* CIAO_RTEVENT_HAS_DLL == 1 */ + +// Set CIAO_RTEVENT_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (CIAO_RTEVENT_NTRACE) +# if (ACE_NTRACE == 1) +# define CIAO_RTEVENT_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define CIAO_RTEVENT_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !CIAO_RTEVENT_NTRACE */ + +#if (CIAO_RTEVENT_NTRACE == 1) +# define CIAO_RTEVENT_TRACE(X) +#else /* (CIAO_RTEVENT_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define CIAO_RTEVENT_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (CIAO_RTEVENT_NTRACE == 1) */ + +#endif /* CIAO_RTEVENT_EXPORT_H */ + +// End of auto generated file. diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp new file mode 100644 index 00000000000..1ef5b1ad889 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp @@ -0,0 +1,833 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_RTEvent.cpp + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#include "CIAO_RTEvent.h" +#include "ciao/CIAO_common.h" +#include "SimpleAddressServer.h" +#include "tao/ORB_Core.h" +#include "tao/AnyTypeCode/Any_Unknown_IDL_Type.h" +#include "orbsvcs/CosNamingC.h" + +#include <sstream> + +namespace CIAO +{ + + + RTEventService::RTEventService (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + const char * ec_name) : + orb_ (CORBA::ORB::_duplicate (orb)), + root_poa_ (PortableServer::POA::_duplicate (poa)) + { + this->create_rt_event_channel (ec_name); + } + + + RTEventService::~RTEventService (void) + { + } + + + Supplier_Config_ptr + RTEventService::create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Supplier_Config_impl * supplier_config = 0; + ACE_NEW_RETURN (supplier_config, + RTEvent_Supplier_Config_impl (this->root_poa_.in ()), + Supplier_Config::_nil ()); + RTEvent_Supplier_Config_var return_rtec = + supplier_config->_this (); + return return_rtec._retn (); + } + + + Consumer_Config_ptr + RTEventService::create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Consumer_Config_impl * consumer_config = 0; + ACE_NEW_RETURN (consumer_config, + RTEvent_Consumer_Config_impl (this->root_poa_.in ()), + Consumer_Config::_nil ()); + RTEvent_Consumer_Config_var return_rtec = + consumer_config->_this (); + return return_rtec._retn (); + } + + + // @@TODO: We might want to maintain a map for managing multiple proxy consumers + // to multiple event suppliers. + void + RTEventService::connect_event_supplier ( + Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_supplier\n")); + } + + RTEvent_Supplier_Config_ptr rt_config = + RTEvent_Supplier_Config::_narrow (supplier_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + // Get a proxy push consumer from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->rt_event_channel_->for_suppliers (); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_push_consumer = + supplier_admin->obtain_push_consumer(); + + // Create and register supplier servant + RTEventServiceSupplier_impl * supplier_servant = 0; + ACE_NEW (supplier_servant, + RTEventServiceSupplier_impl (root_poa_.in ())); + RtecEventComm::PushSupplier_var push_supplier = + supplier_servant->_this (); + + RtecEventChannelAdmin::SupplierQOS_var qos = + rt_config->rt_event_qos (); + + ACE_SupplierQOS_Factory supplier_qos; + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + proxy_push_consumer->connect_push_supplier (push_supplier.in (), + supplier_qos.get_SupplierQOS ()); + + + this->proxy_consumer_map_.bind ( + supplier_config->supplier_id (), + proxy_push_consumer._retn ()); + } + + void + RTEventService::connect_event_consumer ( + Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_consumer\n")); + } + + RTEvent_Consumer_Config_ptr rt_config = + RTEvent_Consumer_Config::_narrow (consumer_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + Components::EventConsumerBase_var consumer = + consumer_config->consumer (); + + if (CORBA::is_nil (consumer.in ())) + ACE_DEBUG ((LM_DEBUG, "nil event consumer\n")); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->rt_event_channel_->for_consumers (); + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier = + consumer_admin->obtain_push_supplier (); + + // Create and register consumer servant + RTEventServiceConsumer_impl * consumer_servant = 0; + ACE_NEW (consumer_servant, + RTEventServiceConsumer_impl ( + root_poa_.in (), + consumer.in ())); + RtecEventComm::PushConsumer_var push_consumer = + consumer_servant->_this (); + + RtecEventChannelAdmin::ConsumerQOS_var qos = + rt_config->rt_event_qos (); + + ACE_DEBUG ((LM_DEBUG, "\n======== ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + + if (qos->dependencies.length () == 0) + { + qos->dependencies.length (1); + qos->dependencies[0].event.header.type = ACE_ES_EVENT_ANY; + qos->dependencies[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + qos->dependencies[0].rt_info = 0; + + ACE_DEBUG ((LM_DEBUG, "\n======== Normalized ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + } + + proxy_supplier->connect_push_consumer (push_consumer.in (), + qos.in () + //qos_factory.get_ConsumerQOS () + ); + + ACE_CString consumer_id = + consumer_config->consumer_id (); + + this->proxy_supplier_map_.bind (consumer_id.c_str (), proxy_supplier._retn ()); + } + + void + RTEventService::disconnect_event_supplier ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + ACE_UNUSED_ARG (connection_id); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + this->proxy_consumer_map_.unbind (connection_id, proxy_consumer); + + proxy_consumer->disconnect_push_consumer (); + } + + void + RTEventService::disconnect_event_consumer ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier; + + this->proxy_supplier_map_.unbind (connection_id, proxy_supplier); + + proxy_supplier->disconnect_push_supplier (); + } + + void + RTEventService::push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_UNUSED_ARG (ev); + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::push_event------\n")); + } + } + + void + RTEventService::ciao_push_event ( + Components::EventBase * ev, + const char * source_id, + CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::BadEventType)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::ciao_push_event------\n")); + } + RtecEventComm::EventSet events (1); + events.length (1); + + ACE_Hash<ACE_CString> hasher; + + events[0].header.source = hasher (source_id); + events[0].header.type = ACE_ES_EVENT_ANY; + events[0].header.ttl = 10; + + // We can't use the Any insert operator here, since it will put the + // EventBase typecode into the Any, and the actual eventtype's fields + // (if any) will get truncated when the Any is demarshaled. So the + // signature of this method has been changed to pass in the derived + // typecode, and TAO-specific methods are used to assign it as the + // Any's typecode and encode the value. This incurs an extra + // encoding, which we may want to try to optimize someday. + TAO_OutputCDR out; + out << ev; + TAO_InputCDR in (out); + TAO::Unknown_IDL_Type *unk = 0; + ACE_NEW (unk, + TAO::Unknown_IDL_Type (tc, in)); + events[0].data.any_value.replace (unk); + + ACE_DEBUG ((LM_DEBUG, "******* push event for source string: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* push event for source id: %i\n", events[0].header.source)); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + if (this->proxy_consumer_map_.find (source_id, proxy_consumer) != 0) + { + ACE_DEBUG ((LM_DEBUG, + "CIAO (%P|%t) - RTEventService::ciao_push_event, " + "Error in finding the proxy consumer object.\n")); + throw Components::BadEventType (); + } + + proxy_consumer->push (events); + } + + void + RTEventService::create_rt_event_channel (const char * ec_name) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::EventService_Factory_impl::create_rt_event_channel\n")); + } + + TAO_EC_Default_Factory::init_svcs (); + + TAO_EC_Event_Channel_Attributes attributes (this->root_poa_.in (), + this->root_poa_.in ()); + TAO_EC_Event_Channel * ec_servant = 0; + ACE_NEW (ec_servant, TAO_EC_Event_Channel (attributes)); + ec_servant->activate (); + this->rt_event_channel_ = ec_servant->_this (); + + if (false) + { + // Find the Naming Service. + CORBA::Object_var obj = orb_->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Bind the Event Channel using Naming Services + CosNaming::Name_var name = root_context->to_name (ec_name); + ACE_DEBUG ((LM_DEBUG, "\nRegister naming: %s\n", ec_name)); + root_context->rebind (name.in(), rt_event_channel_.in()); + } + } + + ::CORBA::Boolean + RTEventService::create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_ERROR, "Create an address server using port [%d]\n", port)); + + // Initialize the address server with the desired address. + // This will be used by the sender object and the multicast + // receiver. + ACE_INET_Addr send_addr (port, address); + + SimpleAddressServer * addr_srv_impl = new SimpleAddressServer (send_addr); + + PortableServer::ObjectId_var addr_srv_oid = + this->root_poa_->activate_object (addr_srv_impl); + CORBA::Object_var addr_srv_obj = + this->root_poa_->id_to_reference (addr_srv_oid.in()); + RtecUDPAdmin::AddrServer_var addr_srv = + RtecUDPAdmin::AddrServer::_narrow (addr_srv_obj.in()); + +/* + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (address); + ACE_DEBUG ((LM_DEBUG, + "udp mcast address is: %s\n", + address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + SimpleAddressServer as_impl (addr); + RtecUDPAdmin::AddrServer_var addr_srv = + as_impl._this (); +*/ + + this->addr_serv_map_.bind ( + name, + RtecUDPAdmin::AddrServer::_duplicate (addr_srv.in ())); + + + return true; + } + + ::CORBA::Boolean + RTEventService::create_sender ( + const char * addr_serv_id) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a Sender object with addr_serv_id: %s\n",addr_serv_id )); + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + RtecUDPAdmin::AddrServer_var addr_srv; + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + // Now we setup the sender: + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = TAO_ECG_UDP_Sender::create(); + sender->init (this->rt_event_channel_.in (), + addr_srv.in (), + endpoint); + + // Setup the subscription and connect to the EC + ACE_ConsumerQOS_Factory cons_qos_fact; + cons_qos_fact.start_disjunction_group (); + cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0); + RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS (); + sub.is_gateway = 1; + sender->connect (sub); + + return true; + } + + ::CORBA::Boolean + RTEventService::create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a receiver object with addr_serv_id: %s\n",addr_serv_id )); + + // Create and initialize the receiver + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = + TAO_ECG_UDP_Receiver::create(); + + // AddressServer is necessary when "multicast" is enabled, but not for "udp" + if (is_multicast) + { + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n")); + return false; + } + + // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the receiver will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + false); + + RtecUDPAdmin::AddrServer_var addr_srv; + + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + receiver->init (this->rt_event_channel_.in (), + clone, + addr_srv.in ()); + } + else + { + receiver->init (this->rt_event_channel_.in (), 0, 0); + } + + // Setup the registration and connect to the event channel + ACE_SupplierQOS_Factory supp_qos_fact; + supp_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS (); + receiver->connect (pub); + + // Create the appropriate event handler and register it with the reactor + + if (is_multicast) + { + auto_ptr<TAO_ECG_Mcast_EH> mcast_eh (new TAO_ECG_Mcast_EH (receiver.in())); + mcast_eh->reactor (this->orb_->orb_core ()->reactor ()); + mcast_eh->open (this->rt_event_channel_.in()); + mcast_eh.release(); + } + else + { + ACE_DEBUG ((LM_DEBUG, "\nUDP Event Handler Port [%d]\n", listen_port)); + + //auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in())); + TAO_ECG_UDP_EH * udp_eh = new TAO_ECG_UDP_EH (receiver.in()); + + udp_eh->reactor (this->orb_->orb_core ()->reactor ()); + + ACE_INET_Addr local_addr (listen_port); + if (udp_eh->open (local_addr) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open event handler on port [%d]\n", listen_port)); + return false; + } + //udp_eh.release (); + } + + return true; + } + + + ::RtecEventChannelAdmin::EventChannel_ptr + RTEventService::tao_rt_event_channel () + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + return this->rt_event_channel_.in (); + } + + ////////////////////////////////////////////////////////////////////// + /// Supplier Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceSupplier_impl::RTEventServiceSupplier_impl ( + PortableServer::POA_ptr poa) : + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + void + RTEventServiceSupplier_impl::disconnect_push_supplier (void) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceConsumer_impl::RTEventServiceConsumer_impl ( + PortableServer::POA_ptr poa, + Components::EventConsumerBase_ptr consumer) : + poa_ (PortableServer::POA::_duplicate (poa)), + event_consumer_ (Components::EventConsumerBase::_duplicate (consumer)) + { + } + + void + RTEventServiceConsumer_impl::push (const RtecEventComm::EventSet& events) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::push\n")); + } + + for (size_t i = 0; i < events.length (); ++i) + { + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + + Components::EventBase * ev = 0; + try + { + TAO::Unknown_IDL_Type *unk = + dynamic_cast<TAO::Unknown_IDL_Type *> (events[i].data.any_value.impl ()); + TAO_InputCDR for_reading (unk->_tao_get_cdr ()); + + if (for_reading >> ev) + { + ev->_add_ref (); + this->event_consumer_->push_event (ev); + } + else + { + ACE_ERROR ((LM_ERROR, "CIAO::RTEventServiceConsumer_impl::push(), " + "failed to extract event\n")); + } + } + catch (const CORBA::Exception& ex) + { + ACE_ERROR ((LM_ERROR, + "CORBA EXCEPTION caught\n")); + ex._tao_print_exception ("RTEventServiceConsumer_impl::push()\n"); + } + } + + } + + void + RTEventServiceConsumer_impl::disconnect_push_consumer (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::disconnect_push_consumer\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + + ////////////////////////////////////////////////////////////////////// + /// Supplier Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Supplier_Config_impl::RTEvent_Supplier_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl (void) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl\n")); + } + } + + void + RTEvent_Supplier_Config_impl::supplier_id ( + const char * supplier_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "supplier's id: %s\n", supplier_id)); + } + + this->supplier_id_ = supplier_id; + + ACE_Hash<ACE_CString> hasher; + + RtecEventComm::EventSourceID source_id = + hasher (this->supplier_id_.c_str ()); +/* + this->qos_.insert (source_id, + ACE_ES_EVENT_ANY, + 0, + 1); + +*/ + + this->qos_.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + ACE_DEBUG ((LM_DEBUG, "supplier's source id is: %d\n", source_id)); + } + + CONNECTION_ID + RTEvent_Supplier_Config_impl::supplier_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->supplier_id_.c_str ()); + } + + EventServiceType + RTEvent_Supplier_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + RtecEventChannelAdmin::SupplierQOS * + RTEvent_Supplier_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::SupplierQOS * supplier_qos = 0; + ACE_NEW_RETURN (supplier_qos, + RtecEventChannelAdmin::SupplierQOS (this->qos_.get_SupplierQOS ()), + 0); + return supplier_qos; + } + + void + RTEvent_Supplier_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Consumer_Config_impl::RTEvent_Consumer_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl (void) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl\n")); + } + + void + RTEvent_Consumer_Config_impl::start_conjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::start_conjunction_group\n")); + + this->qos_.start_conjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::start_disjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + // Note, since we only support basic builder here... + if (size == 0L) + this->qos_.start_disjunction_group (); + else + this->qos_.start_disjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::insert_source ( + const char * source_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_Hash<ACE_CString> hasher; + RtecEventComm::EventSourceID int_source_id = hasher (source_id); + + ACE_DEBUG ((LM_DEBUG, "******* the source string is: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* the source id is: %i\n", int_source_id)); + + this->qos_.insert_source (int_source_id, 0); + } + + void + RTEvent_Consumer_Config_impl::insert_type ( + ::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + if (event_type == 0L) + this->qos_.insert_type (ACE_ES_EVENT_ANY, 0); + else + this->qos_.insert_type (event_type, + 0); + } + + void + RTEvent_Consumer_Config_impl::consumer_id ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, + "RTEvent_Consumer_Config_impl::set_consumer_id:%s\n", + consumer_id)); + } + + this->consumer_id_ = consumer_id; + } + + + void + RTEvent_Consumer_Config_impl::consumer ( + Components::EventConsumerBase_ptr consumer) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + this->consumer_ = Components::EventConsumerBase::_duplicate (consumer); + } + + CONNECTION_ID + RTEvent_Consumer_Config_impl::consumer_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->consumer_id_.c_str ()); + } + + + EventServiceType + RTEvent_Consumer_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + Components::EventConsumerBase_ptr + RTEvent_Consumer_Config_impl::consumer () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "RTEvent_Consumer_Config_impl::get_consumer\n")); + } + + return Components::EventConsumerBase::_duplicate (this->consumer_.in ()); + } + + RtecEventChannelAdmin::ConsumerQOS * + RTEvent_Consumer_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::ConsumerQOS * consumer_qos = 0; + ACE_NEW_RETURN (consumer_qos, + RtecEventChannelAdmin::ConsumerQOS (this->qos_.get_ConsumerQOS ()), + 0); + + return consumer_qos; + } + + void + RTEvent_Consumer_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::destroy\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h new file mode 100644 index 00000000000..7737643b0a4 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h @@ -0,0 +1,359 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_RTEvent.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#ifndef CIAO_RTEVENT_H +#define CIAO_RTEVENT_H +#include /**/ "ace/pre.h" + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "CIAO_RTEVENT_Export.h" +#include "ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h" +#include "CIAO_RTEventS.h" + +#include "orbsvcs/orbsvcs/Event_Utilities.h" +#include "orbsvcs/orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "orbsvcs/Event/ECG_UDP_EH.h" +#include "ace/Hash_Map_Manager.h" + +namespace CIAO +{ + + /** + * @class RTEventService + * + * An implementation of EventServiceBase using the RT event channel. + * + * @@ (GD) There should be a place where the deployment tool could + * specify the RT Event Channel service configuration file. + * This should be the place where the RtecEventChannel servant was + * first time initialized. + */ + class CIAO_RTEVENT_Export RTEventService : + public virtual EventServiceBase, + public virtual POA_CIAO::CIAO_RT_Event_Service + { + public: + + RTEventService (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + const char * ec_name); + + virtual ~RTEventService (void); + + virtual Supplier_Config_ptr + create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual Consumer_Config_ptr + create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void connect_event_supplier ( + CIAO::Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void connect_event_consumer ( + CIAO::Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void + disconnect_event_supplier ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)); + + virtual void disconnect_event_consumer ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)); + + virtual void push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void ciao_push_event ( + Components::EventBase * evt, + const char * source_id, + CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + ::CORBA::SystemException, + ::Components::BadEventType)); + + virtual ::CORBA::Boolean create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_sender ( + const char * addr_serv_id) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::RtecEventChannelAdmin::EventChannel_ptr tao_rt_event_channel ( + ) + ACE_THROW_SPEC ((::CORBA::SystemException)); + + private: + // @@ (GD) This is the place where use could provide a parameter + // which specifies the event channel service configuration file. + void create_rt_event_channel (const char * ec_name) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + /// Reference to the ORB + CORBA::ORB_var orb_; + + /// Reference to the Root POA + PortableServer::POA_var root_poa_; + + /** + * @var RtecEventChannelAdmin::EventChannel_var rt_event_channel_ + * + * Reference to the RT event channel. + */ + RtecEventChannelAdmin::EventChannel_var rt_event_channel_; + + /** + * @var ACE_Hash_Map_Manager<> proxy_supplier_map_ + * + * Mapping of each event sink to a proxy supplier for disconnect purposes. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecEventChannelAdmin::ProxyPushConsumer_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> proxy_consumer_map_; + + /** + * @var ACE_Hash_Map_Manager<> proxy_supplier_map_ + * + * Mapping of each event sink to a proxy supplier for disconnect purposes. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecEventChannelAdmin::ProxyPushSupplier_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> proxy_supplier_map_; + + /** + * @var ACE_Hash_Map_Manager<> addr_serv_map_ + * + * A map which managers a set of address servers for event channel + * federation purpose. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecUDPAdmin::AddrServer_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> addr_serv_map_; + + }; + + /** + * @class RTEventServiceSupplier_impl + * + * An implementation of the PushSupplier interface. + */ + class RTEventServiceSupplier_impl : + public virtual POA_RtecEventComm::PushSupplier + { + public: + + RTEventServiceSupplier_impl ( + PortableServer::POA_ptr poa); + + virtual void disconnect_push_supplier () + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + PortableServer::POA_var poa_; + }; + + /** + * @class RTEventServiceConsumer_impl + * + * An implementation of the PushConsumer interface. + */ + class RTEventServiceConsumer_impl : + public virtual POA_RtecEventComm::PushConsumer + { + public: + + RTEventServiceConsumer_impl ( + PortableServer::POA_ptr poa, + Components::EventConsumerBase_ptr consumer); + + virtual void push ( + const RtecEventComm::EventSet& events) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void disconnect_push_consumer () + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + PortableServer::POA_var poa_; + + Components::EventConsumerBase_var event_consumer_; + }; + + /** + * @class RTEvent_Consumer_Config_impl + * + * Implementation of the RTEvent_Consumer_Config IDL interface that + * configures TAO's RT event channel. An object of this type will be returned + * from @c CIAO::Container::create_consumer_config () when @c RTEC is + * specified as the event service type. + */ + class RTEvent_Consumer_Config_impl : + public virtual POA_CIAO::RTEvent_Consumer_Config, + public virtual Event_Consumer_Config_Base + { + + public: + RTEvent_Consumer_Config_impl (PortableServer::POA_ptr poa); + + virtual ~RTEvent_Consumer_Config_impl (void); + + virtual void start_conjunction_group (CORBA::Long size) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void start_disjunction_group (CORBA::Long size) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void insert_source (const char * source_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void insert_type (::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)); + + virtual void consumer_id (const char * consumer_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual CONNECTION_ID consumer_id () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //virtual void supplier_id (const char * supplier_id) + // ACE_THROW_SPEC ((CORBA::SystemException)); + + //virtual CONNECTION_ID supplier_id () + // ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void consumer (Components::EventConsumerBase_ptr consumer) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual Components::EventConsumerBase_ptr consumer () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual EventServiceType service_type () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@@ (GD) There should be a place where the deployment tool could + // set up the rt_event_qos properties for Consumer Config. + + virtual RtecEventChannelAdmin::ConsumerQOS * rt_event_qos () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void destroy () + ACE_THROW_SPEC ((CORBA::SystemException)); + + private: + + ACE_CString consumer_id_; + + Components::EventConsumerBase_var consumer_; + + EventServiceType service_type_; + + ACE_ConsumerQOS_Factory qos_; + + PortableServer::POA_var poa_; + }; + + /** + * @class RTEvent_Supplier_Config_impl + * + * Implementation of the RTEvent_Supplier_Config IDL interface that + * configures TAO's RT event channel. An object of this type will be returned + * from @c CIAO::Container::create_supplier_config () when @c RTEC is + * specified as the event service type. + */ + class RTEvent_Supplier_Config_impl : + public virtual POA_CIAO::RTEvent_Supplier_Config + { + public: + RTEvent_Supplier_Config_impl (PortableServer::POA_ptr poa); + + virtual ~RTEvent_Supplier_Config_impl (void); + + void supplier_id (const char * supplier_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + CONNECTION_ID supplier_id () + ACE_THROW_SPEC ((CORBA::SystemException)); + + EventServiceType service_type () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@@ (GD) There should be a place where the deployment tool could + // set up the rt_event_qos properties for Supplier Config. + + RtecEventChannelAdmin::SupplierQOS * rt_event_qos () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void destroy () + ACE_THROW_SPEC ((CORBA::SystemException)); + + private: + ACE_CString supplier_id_; + + EventServiceType service_type_; + + ACE_SupplierQOS_Factory qos_; + + PortableServer::POA_var poa_; + }; +} + +#include /**/ "ace/post.h" +#endif /* CIAO_RTEVENT_H */ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl new file mode 100644 index 00000000000..072bf1f89a1 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl @@ -0,0 +1,54 @@ +// $Id$ + +/** + * @file CIAO_RTEvent.idl + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards + * + * @brief Interfaces for configuring CIAO's RT event channel. + */ + +#include <ciaosvcs/Events/CIAO_Events_Base/CIAO_Events.idl> +#include <orbsvcs/orbsvcs/RtecEventChannelAdmin.idl> + +module CIAO +{ + interface RTEvent_Consumer_Config : + Consumer_Config + { + //void start_logical_and_group (in long size); + + //void start_negation (); + + //void insert_bitmasked_value (in long source_mask, + // in long type_mask, + // in long source_value, + // in long type_value); + + readonly attribute RtecEventChannelAdmin::ConsumerQOS rt_event_qos; + }; + + interface RTEvent_Supplier_Config : + Supplier_Config + { + readonly attribute RtecEventChannelAdmin::SupplierQOS rt_event_qos; + }; + + interface CIAO_RT_Event_Service : + CIAO_Event_Service + { + boolean create_addr_serv (in string name, + in unsigned short port, + in string address); + + boolean create_sender (in string addr_serv_id); + + boolean create_receiver (in string addr_serv_id, + in boolean is_multicast, + in unsigned short listen_port); + + RtecEventChannelAdmin::EventChannel tao_rt_event_channel (); + }; + +}; diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc new file mode 100644 index 00000000000..40ba0138f4b --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc @@ -0,0 +1,27 @@ +// -*- MPC -*- +// $Id$ + +project (CIAO_RTEvent) : naming, rtevent_serv, ciao_events_base_dnc { + + sharedname = CIAO_RTEvent + idlflags += -Wb,export_include=CIAO_RTEVENT_Export.h -Wb,export_macro=CIAO_RTEVENT_Export + dynamicflags = CIAO_RTEVENT_BUILD_DLL + + IDL_Files { + CIAO_RTEvent.idl + } + + Source_Files { + CIAO_RTEvent.cpp + CIAO_RTEventC.cpp + CIAO_RTEventS.cpp + SimpleAddressServer.cpp + } + + Header_Files { + CIAO_RTEvent.h + CIAO_RTEventC.h + CIAO_RTEventS.h + SimpleAddressServer.h + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp new file mode 100644 index 00000000000..cdce7218291 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp @@ -0,0 +1,37 @@ +/* $Id$ */ + +#include "SimpleAddressServer.h" +#include <ace/INET_Addr.h> +#include <ace/OS_NS_string.h> + +SimpleAddressServer::SimpleAddressServer (const ACE_INET_Addr& address) { + this->address_.ipaddr = address.get_ip_address (); + this->address_.port = address.get_port_number (); +} + +SimpleAddressServer::SimpleAddressServer (const RtecUDPAdmin::UDP_Addr& addr) + : address_ (addr) +{ +} + +void +SimpleAddressServer::get_addr (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException) { + address = this->address_; +} +/* +void +SimpleAddressServer::get_ip_address (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_IP_Address_out address) + throw (CORBA::SystemException) { + address = new RtecUDPAdmin::UDP_IP_Address; + + ACE_INET_Addr x (this->address_.port, + static_cast<ACE_UINT32>(this->address_.ipaddr)); + address->length (x.get_addr_size ()); + ACE_OS::memcpy (address->get_buffer (), + x.get_addr (), + x.get_addr_size ()); +} +*/ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h new file mode 100644 index 00000000000..5a69d3171dc --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h @@ -0,0 +1,38 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file SimpleAddressServer.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + */ +//============================================================================= + +#ifndef SIMPLEADDRESSSERVER_H +#define SIMPLEADDRESSSERVER_H + +#include <orbsvcs/RtecUDPAdminS.h> + +class SimpleAddressServer : public POA_RtecUDPAdmin::AddrServer { +public: + SimpleAddressServer (const ACE_INET_Addr& address); + + SimpleAddressServer (const RtecUDPAdmin::UDP_Addr& addr); + + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException); + + +/* + virtual void get_ip_address (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_IP_Address_out address) + throw (CORBA::SystemException); +*/ +private: + RtecUDPAdmin::UDP_Addr address_; +}; + +#endif |