diff options
Diffstat (limited to 'modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp')
-rw-r--r-- | modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp | 833 |
1 files changed, 833 insertions, 0 deletions
diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp new file mode 100644 index 00000000000..1ef5b1ad889 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp @@ -0,0 +1,833 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_RTEvent.cpp + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#include "CIAO_RTEvent.h" +#include "ciao/CIAO_common.h" +#include "SimpleAddressServer.h" +#include "tao/ORB_Core.h" +#include "tao/AnyTypeCode/Any_Unknown_IDL_Type.h" +#include "orbsvcs/CosNamingC.h" + +#include <sstream> + +namespace CIAO +{ + + + RTEventService::RTEventService (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + const char * ec_name) : + orb_ (CORBA::ORB::_duplicate (orb)), + root_poa_ (PortableServer::POA::_duplicate (poa)) + { + this->create_rt_event_channel (ec_name); + } + + + RTEventService::~RTEventService (void) + { + } + + + Supplier_Config_ptr + RTEventService::create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Supplier_Config_impl * supplier_config = 0; + ACE_NEW_RETURN (supplier_config, + RTEvent_Supplier_Config_impl (this->root_poa_.in ()), + Supplier_Config::_nil ()); + RTEvent_Supplier_Config_var return_rtec = + supplier_config->_this (); + return return_rtec._retn (); + } + + + Consumer_Config_ptr + RTEventService::create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Consumer_Config_impl * consumer_config = 0; + ACE_NEW_RETURN (consumer_config, + RTEvent_Consumer_Config_impl (this->root_poa_.in ()), + Consumer_Config::_nil ()); + RTEvent_Consumer_Config_var return_rtec = + consumer_config->_this (); + return return_rtec._retn (); + } + + + // @@TODO: We might want to maintain a map for managing multiple proxy consumers + // to multiple event suppliers. + void + RTEventService::connect_event_supplier ( + Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_supplier\n")); + } + + RTEvent_Supplier_Config_ptr rt_config = + RTEvent_Supplier_Config::_narrow (supplier_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + // Get a proxy push consumer from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->rt_event_channel_->for_suppliers (); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_push_consumer = + supplier_admin->obtain_push_consumer(); + + // Create and register supplier servant + RTEventServiceSupplier_impl * supplier_servant = 0; + ACE_NEW (supplier_servant, + RTEventServiceSupplier_impl (root_poa_.in ())); + RtecEventComm::PushSupplier_var push_supplier = + supplier_servant->_this (); + + RtecEventChannelAdmin::SupplierQOS_var qos = + rt_config->rt_event_qos (); + + ACE_SupplierQOS_Factory supplier_qos; + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + proxy_push_consumer->connect_push_supplier (push_supplier.in (), + supplier_qos.get_SupplierQOS ()); + + + this->proxy_consumer_map_.bind ( + supplier_config->supplier_id (), + proxy_push_consumer._retn ()); + } + + void + RTEventService::connect_event_consumer ( + Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_consumer\n")); + } + + RTEvent_Consumer_Config_ptr rt_config = + RTEvent_Consumer_Config::_narrow (consumer_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + Components::EventConsumerBase_var consumer = + consumer_config->consumer (); + + if (CORBA::is_nil (consumer.in ())) + ACE_DEBUG ((LM_DEBUG, "nil event consumer\n")); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->rt_event_channel_->for_consumers (); + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier = + consumer_admin->obtain_push_supplier (); + + // Create and register consumer servant + RTEventServiceConsumer_impl * consumer_servant = 0; + ACE_NEW (consumer_servant, + RTEventServiceConsumer_impl ( + root_poa_.in (), + consumer.in ())); + RtecEventComm::PushConsumer_var push_consumer = + consumer_servant->_this (); + + RtecEventChannelAdmin::ConsumerQOS_var qos = + rt_config->rt_event_qos (); + + ACE_DEBUG ((LM_DEBUG, "\n======== ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + + if (qos->dependencies.length () == 0) + { + qos->dependencies.length (1); + qos->dependencies[0].event.header.type = ACE_ES_EVENT_ANY; + qos->dependencies[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + qos->dependencies[0].rt_info = 0; + + ACE_DEBUG ((LM_DEBUG, "\n======== Normalized ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + } + + proxy_supplier->connect_push_consumer (push_consumer.in (), + qos.in () + //qos_factory.get_ConsumerQOS () + ); + + ACE_CString consumer_id = + consumer_config->consumer_id (); + + this->proxy_supplier_map_.bind (consumer_id.c_str (), proxy_supplier._retn ()); + } + + void + RTEventService::disconnect_event_supplier ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + ACE_UNUSED_ARG (connection_id); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + this->proxy_consumer_map_.unbind (connection_id, proxy_consumer); + + proxy_consumer->disconnect_push_consumer (); + } + + void + RTEventService::disconnect_event_consumer ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier; + + this->proxy_supplier_map_.unbind (connection_id, proxy_supplier); + + proxy_supplier->disconnect_push_supplier (); + } + + void + RTEventService::push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_UNUSED_ARG (ev); + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::push_event------\n")); + } + } + + void + RTEventService::ciao_push_event ( + Components::EventBase * ev, + const char * source_id, + CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::BadEventType)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::ciao_push_event------\n")); + } + RtecEventComm::EventSet events (1); + events.length (1); + + ACE_Hash<ACE_CString> hasher; + + events[0].header.source = hasher (source_id); + events[0].header.type = ACE_ES_EVENT_ANY; + events[0].header.ttl = 10; + + // We can't use the Any insert operator here, since it will put the + // EventBase typecode into the Any, and the actual eventtype's fields + // (if any) will get truncated when the Any is demarshaled. So the + // signature of this method has been changed to pass in the derived + // typecode, and TAO-specific methods are used to assign it as the + // Any's typecode and encode the value. This incurs an extra + // encoding, which we may want to try to optimize someday. + TAO_OutputCDR out; + out << ev; + TAO_InputCDR in (out); + TAO::Unknown_IDL_Type *unk = 0; + ACE_NEW (unk, + TAO::Unknown_IDL_Type (tc, in)); + events[0].data.any_value.replace (unk); + + ACE_DEBUG ((LM_DEBUG, "******* push event for source string: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* push event for source id: %i\n", events[0].header.source)); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + if (this->proxy_consumer_map_.find (source_id, proxy_consumer) != 0) + { + ACE_DEBUG ((LM_DEBUG, + "CIAO (%P|%t) - RTEventService::ciao_push_event, " + "Error in finding the proxy consumer object.\n")); + throw Components::BadEventType (); + } + + proxy_consumer->push (events); + } + + void + RTEventService::create_rt_event_channel (const char * ec_name) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::EventService_Factory_impl::create_rt_event_channel\n")); + } + + TAO_EC_Default_Factory::init_svcs (); + + TAO_EC_Event_Channel_Attributes attributes (this->root_poa_.in (), + this->root_poa_.in ()); + TAO_EC_Event_Channel * ec_servant = 0; + ACE_NEW (ec_servant, TAO_EC_Event_Channel (attributes)); + ec_servant->activate (); + this->rt_event_channel_ = ec_servant->_this (); + + if (false) + { + // Find the Naming Service. + CORBA::Object_var obj = orb_->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Bind the Event Channel using Naming Services + CosNaming::Name_var name = root_context->to_name (ec_name); + ACE_DEBUG ((LM_DEBUG, "\nRegister naming: %s\n", ec_name)); + root_context->rebind (name.in(), rt_event_channel_.in()); + } + } + + ::CORBA::Boolean + RTEventService::create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_ERROR, "Create an address server using port [%d]\n", port)); + + // Initialize the address server with the desired address. + // This will be used by the sender object and the multicast + // receiver. + ACE_INET_Addr send_addr (port, address); + + SimpleAddressServer * addr_srv_impl = new SimpleAddressServer (send_addr); + + PortableServer::ObjectId_var addr_srv_oid = + this->root_poa_->activate_object (addr_srv_impl); + CORBA::Object_var addr_srv_obj = + this->root_poa_->id_to_reference (addr_srv_oid.in()); + RtecUDPAdmin::AddrServer_var addr_srv = + RtecUDPAdmin::AddrServer::_narrow (addr_srv_obj.in()); + +/* + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (address); + ACE_DEBUG ((LM_DEBUG, + "udp mcast address is: %s\n", + address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + SimpleAddressServer as_impl (addr); + RtecUDPAdmin::AddrServer_var addr_srv = + as_impl._this (); +*/ + + this->addr_serv_map_.bind ( + name, + RtecUDPAdmin::AddrServer::_duplicate (addr_srv.in ())); + + + return true; + } + + ::CORBA::Boolean + RTEventService::create_sender ( + const char * addr_serv_id) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a Sender object with addr_serv_id: %s\n",addr_serv_id )); + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + RtecUDPAdmin::AddrServer_var addr_srv; + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + // Now we setup the sender: + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = TAO_ECG_UDP_Sender::create(); + sender->init (this->rt_event_channel_.in (), + addr_srv.in (), + endpoint); + + // Setup the subscription and connect to the EC + ACE_ConsumerQOS_Factory cons_qos_fact; + cons_qos_fact.start_disjunction_group (); + cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0); + RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS (); + sub.is_gateway = 1; + sender->connect (sub); + + return true; + } + + ::CORBA::Boolean + RTEventService::create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a receiver object with addr_serv_id: %s\n",addr_serv_id )); + + // Create and initialize the receiver + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = + TAO_ECG_UDP_Receiver::create(); + + // AddressServer is necessary when "multicast" is enabled, but not for "udp" + if (is_multicast) + { + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n")); + return false; + } + + // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the receiver will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + false); + + RtecUDPAdmin::AddrServer_var addr_srv; + + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + receiver->init (this->rt_event_channel_.in (), + clone, + addr_srv.in ()); + } + else + { + receiver->init (this->rt_event_channel_.in (), 0, 0); + } + + // Setup the registration and connect to the event channel + ACE_SupplierQOS_Factory supp_qos_fact; + supp_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS (); + receiver->connect (pub); + + // Create the appropriate event handler and register it with the reactor + + if (is_multicast) + { + auto_ptr<TAO_ECG_Mcast_EH> mcast_eh (new TAO_ECG_Mcast_EH (receiver.in())); + mcast_eh->reactor (this->orb_->orb_core ()->reactor ()); + mcast_eh->open (this->rt_event_channel_.in()); + mcast_eh.release(); + } + else + { + ACE_DEBUG ((LM_DEBUG, "\nUDP Event Handler Port [%d]\n", listen_port)); + + //auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in())); + TAO_ECG_UDP_EH * udp_eh = new TAO_ECG_UDP_EH (receiver.in()); + + udp_eh->reactor (this->orb_->orb_core ()->reactor ()); + + ACE_INET_Addr local_addr (listen_port); + if (udp_eh->open (local_addr) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open event handler on port [%d]\n", listen_port)); + return false; + } + //udp_eh.release (); + } + + return true; + } + + + ::RtecEventChannelAdmin::EventChannel_ptr + RTEventService::tao_rt_event_channel () + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + return this->rt_event_channel_.in (); + } + + ////////////////////////////////////////////////////////////////////// + /// Supplier Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceSupplier_impl::RTEventServiceSupplier_impl ( + PortableServer::POA_ptr poa) : + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + void + RTEventServiceSupplier_impl::disconnect_push_supplier (void) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceConsumer_impl::RTEventServiceConsumer_impl ( + PortableServer::POA_ptr poa, + Components::EventConsumerBase_ptr consumer) : + poa_ (PortableServer::POA::_duplicate (poa)), + event_consumer_ (Components::EventConsumerBase::_duplicate (consumer)) + { + } + + void + RTEventServiceConsumer_impl::push (const RtecEventComm::EventSet& events) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::push\n")); + } + + for (size_t i = 0; i < events.length (); ++i) + { + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + + Components::EventBase * ev = 0; + try + { + TAO::Unknown_IDL_Type *unk = + dynamic_cast<TAO::Unknown_IDL_Type *> (events[i].data.any_value.impl ()); + TAO_InputCDR for_reading (unk->_tao_get_cdr ()); + + if (for_reading >> ev) + { + ev->_add_ref (); + this->event_consumer_->push_event (ev); + } + else + { + ACE_ERROR ((LM_ERROR, "CIAO::RTEventServiceConsumer_impl::push(), " + "failed to extract event\n")); + } + } + catch (const CORBA::Exception& ex) + { + ACE_ERROR ((LM_ERROR, + "CORBA EXCEPTION caught\n")); + ex._tao_print_exception ("RTEventServiceConsumer_impl::push()\n"); + } + } + + } + + void + RTEventServiceConsumer_impl::disconnect_push_consumer (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::disconnect_push_consumer\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + + ////////////////////////////////////////////////////////////////////// + /// Supplier Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Supplier_Config_impl::RTEvent_Supplier_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl (void) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl\n")); + } + } + + void + RTEvent_Supplier_Config_impl::supplier_id ( + const char * supplier_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "supplier's id: %s\n", supplier_id)); + } + + this->supplier_id_ = supplier_id; + + ACE_Hash<ACE_CString> hasher; + + RtecEventComm::EventSourceID source_id = + hasher (this->supplier_id_.c_str ()); +/* + this->qos_.insert (source_id, + ACE_ES_EVENT_ANY, + 0, + 1); + +*/ + + this->qos_.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + ACE_DEBUG ((LM_DEBUG, "supplier's source id is: %d\n", source_id)); + } + + CONNECTION_ID + RTEvent_Supplier_Config_impl::supplier_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->supplier_id_.c_str ()); + } + + EventServiceType + RTEvent_Supplier_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + RtecEventChannelAdmin::SupplierQOS * + RTEvent_Supplier_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::SupplierQOS * supplier_qos = 0; + ACE_NEW_RETURN (supplier_qos, + RtecEventChannelAdmin::SupplierQOS (this->qos_.get_SupplierQOS ()), + 0); + return supplier_qos; + } + + void + RTEvent_Supplier_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Consumer_Config_impl::RTEvent_Consumer_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl (void) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl\n")); + } + + void + RTEvent_Consumer_Config_impl::start_conjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::start_conjunction_group\n")); + + this->qos_.start_conjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::start_disjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + // Note, since we only support basic builder here... + if (size == 0L) + this->qos_.start_disjunction_group (); + else + this->qos_.start_disjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::insert_source ( + const char * source_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_Hash<ACE_CString> hasher; + RtecEventComm::EventSourceID int_source_id = hasher (source_id); + + ACE_DEBUG ((LM_DEBUG, "******* the source string is: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* the source id is: %i\n", int_source_id)); + + this->qos_.insert_source (int_source_id, 0); + } + + void + RTEvent_Consumer_Config_impl::insert_type ( + ::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + if (event_type == 0L) + this->qos_.insert_type (ACE_ES_EVENT_ANY, 0); + else + this->qos_.insert_type (event_type, + 0); + } + + void + RTEvent_Consumer_Config_impl::consumer_id ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, + "RTEvent_Consumer_Config_impl::set_consumer_id:%s\n", + consumer_id)); + } + + this->consumer_id_ = consumer_id; + } + + + void + RTEvent_Consumer_Config_impl::consumer ( + Components::EventConsumerBase_ptr consumer) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + this->consumer_ = Components::EventConsumerBase::_duplicate (consumer); + } + + CONNECTION_ID + RTEvent_Consumer_Config_impl::consumer_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->consumer_id_.c_str ()); + } + + + EventServiceType + RTEvent_Consumer_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + Components::EventConsumerBase_ptr + RTEvent_Consumer_Config_impl::consumer () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "RTEvent_Consumer_Config_impl::get_consumer\n")); + } + + return Components::EventConsumerBase::_duplicate (this->consumer_.in ()); + } + + RtecEventChannelAdmin::ConsumerQOS * + RTEvent_Consumer_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::ConsumerQOS * consumer_qos = 0; + ACE_NEW_RETURN (consumer_qos, + RtecEventChannelAdmin::ConsumerQOS (this->qos_.get_ConsumerQOS ()), + 0); + + return consumer_qos; + } + + void + RTEvent_Consumer_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::destroy\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } +} |