diff options
author | dengg <dengg@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2006-10-11 00:53:59 +0000 |
---|---|---|
committer | dengg <dengg@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2006-10-11 00:53:59 +0000 |
commit | 0f03d1c15caca953f26943fe12d3caea1c89820e (patch) | |
tree | f8dd6bef9684ad98b07a44ec8669713edf77e8e1 | |
parent | b63ee9699615cb2b33623e312e009dd12206cc40 (diff) | |
download | ATCD-0f03d1c15caca953f26943fe12d3caea1c89820e.tar.gz |
Added interfaces and initial implementation to support federated event channels.
-rw-r--r-- | ciao/Deployment_Events.idl | 8 | ||||
-rw-r--r-- | ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp | 147 | ||||
-rw-r--r-- | ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h | 39 | ||||
-rw-r--r-- | ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl | 13 |
4 files changed, 200 insertions, 7 deletions
diff --git a/ciao/Deployment_Events.idl b/ciao/Deployment_Events.idl index 73e342d0ae7..de5e5a6a646 100644 --- a/ciao/Deployment_Events.idl +++ b/ciao/Deployment_Events.idl @@ -51,18 +51,24 @@ module CIAO struct AddrServer { string name; + unsigned short port; + string address; }; typedef sequence<AddrServer> AddrServers; struct UDPSender { string name; + string addr_serv_id; }; typedef sequence<UDPSender> UDPSenders; struct UPDReceiver { string name; + string addr_serv_id; + boolean is_multicast; + unsigned short listen_port; }; typedef sequence<UPDReceiver> UPDReceivers; @@ -82,9 +88,9 @@ module CIAO string svc_cfg_file; EventFilters filters; + AddrServers addr_srvs; UDPSenders senders; UPDReceivers receivers; - EventHandlers ehs; }; typedef sequence < EventServiceDeploymentDescription > EventServiceDeploymentDescriptions; diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp index e8099d54629..919ca0ab599 100644 --- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp +++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp @@ -13,6 +13,8 @@ #include "CIAO_RTEvent.h" #include "ciao/CIAO_common.h" +#include "SimpleAddressServer.h" +#include <tao/ORB_Core.h> #include <sstream> @@ -317,6 +319,151 @@ namespace CIAO ACE_CHECK; } + ::CORBA::Boolean + RTEventService::create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + // Initialize the address server with the desired address. + // This will be used by the sender object and the multicast + // receiver. + ACE_INET_Addr send_addr (port, address); + SimpleAddressServer addr_srv_impl (send_addr); + + PortableServer::ObjectId_var addr_srv_oid = + this->root_poa_->activate_object (&addr_srv_impl); + + CORBA::Object_var addr_srv_obj = + this->root_poa_->id_to_reference (addr_srv_oid.in()); + + RtecUDPAdmin::AddrServer_var addr_srv = + RtecUDPAdmin::AddrServer::_narrow (addr_srv_obj.in()); + + this->addr_serv_map_.bind ( + name, + RtecUDPAdmin::AddrServer::_duplicate (addr_srv.in ())); + + return true; + } + + ::CORBA::Boolean + RTEventService::create_sender ( + const char * addr_serv_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + // Create and initialize the sender object + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = + TAO_ECG_UDP_Sender::create(); + + TAO_ECG_UDP_Out_Endpoint endpoint; + + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n")); + return false; + } + + // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the sender will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + false); + + RtecUDPAdmin::AddrServer_var addr_srv; + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + sender->init (this->rt_event_channel_.in (), + addr_srv.in (), + clone); + + // Setup the subscription and connect to the EC + ACE_ConsumerQOS_Factory cons_qos_fact; + cons_qos_fact.start_disjunction_group (); + cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0); + RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS (); + sender->connect (sub); + + return true; + } + + ::CORBA::Boolean + RTEventService::create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + // Create and initialize the receiver + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = + TAO_ECG_UDP_Receiver::create(); + + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n")); + return false; + } + + // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the receiver will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + false); + + RtecUDPAdmin::AddrServer_var addr_srv; + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + receiver->init (this->rt_event_channel_.in (), + clone, + addr_srv.in ()); + + // Setup the registration and connect to the event channel + ACE_SupplierQOS_Factory supp_qos_fact; + supp_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS (); + receiver->connect (pub); + + // Create the appropriate event handler and register it with the reactor + auto_ptr<ACE_Event_Handler> eh; + + if (is_multicast) + { + auto_ptr<TAO_ECG_Mcast_EH> mcast_eh (new TAO_ECG_Mcast_EH (receiver.in())); + mcast_eh->reactor (this->orb_->orb_core ()->reactor ()); + mcast_eh->open (this->rt_event_channel_.in()); + ACE_AUTO_PTR_RESET (eh, mcast_eh.release(), ACE_Event_Handler); + } + else + { + auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in())); + udp_eh->reactor (this->orb_->orb_core ()->reactor ()); + + ACE_INET_Addr local_addr (listen_port); + if (udp_eh->open (local_addr) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open event handler.\n")); + return false; + } + ACE_AUTO_PTR_RESET (eh, udp_eh.release(), ACE_Event_Handler); + } + + return true; + } + ////////////////////////////////////////////////////////////////////// /// Supplier Servant Implementation diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h index 3d5ced528ba..e932ac3a8bb 100644 --- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h +++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h @@ -28,6 +28,11 @@ #include "orbsvcs/orbsvcs/Event_Utilities.h" #include "orbsvcs/orbsvcs/Event/EC_Event_Channel.h" #include "orbsvcs/orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "orbsvcs/Event/ECG_UDP_EH.h" #include "ace/Hash_Map_Manager.h" namespace CIAO @@ -102,6 +107,28 @@ namespace CIAO ::CORBA::SystemException, ::Components::BadEventType)); + virtual ::CORBA::Boolean create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_sender ( + const char * addr_serv_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + private: // @@ (GD) This is the place where use could provide a parameter // which specifies the event channel service configuration file. @@ -159,6 +186,18 @@ namespace CIAO ACE_Equal_To<ACE_CString>, ACE_Null_Mutex> proxy_supplier_map_; + /** + * @var ACE_Hash_Map_Manager<> addr_serv_map_ + * + * A map which managers a set of address servers for event channel + * federation purpose. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecUDPAdmin::AddrServer_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> addr_serv_map_; + }; /** diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl index 0fa81702a7c..a3c5d3c5eca 100644 --- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl +++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl @@ -38,14 +38,15 @@ module CIAO interface CIAO_RT_Evnet_Service : CIAO_Event_Service { - void create_addr_server (); + boolean create_addr_serv (in string name, + in unsigned short port, + in string address); - void create_sender (); - void create_receiver (); - void create_event_handler (); + boolean create_sender (in string addr_serv_id); - void connect_sender (); - void connect_receiver(); + boolean create_receiver (in string addr_serv_id, + in boolean is_multicast, + in unsigned short listen_port); }; }; |