summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordengg <dengg@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2006-10-11 00:53:59 +0000
committerdengg <dengg@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2006-10-11 00:53:59 +0000
commit0f03d1c15caca953f26943fe12d3caea1c89820e (patch)
treef8dd6bef9684ad98b07a44ec8669713edf77e8e1
parentb63ee9699615cb2b33623e312e009dd12206cc40 (diff)
downloadATCD-0f03d1c15caca953f26943fe12d3caea1c89820e.tar.gz
Added interfaces and initial implementation to support federated event channels.
-rw-r--r--ciao/Deployment_Events.idl8
-rw-r--r--ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp147
-rw-r--r--ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h39
-rw-r--r--ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl13
4 files changed, 200 insertions, 7 deletions
diff --git a/ciao/Deployment_Events.idl b/ciao/Deployment_Events.idl
index 73e342d0ae7..de5e5a6a646 100644
--- a/ciao/Deployment_Events.idl
+++ b/ciao/Deployment_Events.idl
@@ -51,18 +51,24 @@ module CIAO
struct AddrServer
{
string name;
+ unsigned short port;
+ string address;
};
typedef sequence<AddrServer> AddrServers;
struct UDPSender
{
string name;
+ string addr_serv_id;
};
typedef sequence<UDPSender> UDPSenders;
struct UPDReceiver
{
string name;
+ string addr_serv_id;
+ boolean is_multicast;
+ unsigned short listen_port;
};
typedef sequence<UPDReceiver> UPDReceivers;
@@ -82,9 +88,9 @@ module CIAO
string svc_cfg_file;
EventFilters filters;
+ AddrServers addr_srvs;
UDPSenders senders;
UPDReceivers receivers;
- EventHandlers ehs;
};
typedef sequence < EventServiceDeploymentDescription >
EventServiceDeploymentDescriptions;
diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp
index e8099d54629..919ca0ab599 100644
--- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp
+++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp
@@ -13,6 +13,8 @@
#include "CIAO_RTEvent.h"
#include "ciao/CIAO_common.h"
+#include "SimpleAddressServer.h"
+#include <tao/ORB_Core.h>
#include <sstream>
@@ -317,6 +319,151 @@ namespace CIAO
ACE_CHECK;
}
+ ::CORBA::Boolean
+ RTEventService::create_addr_serv (
+ const char * name,
+ ::CORBA::UShort port,
+ const char * address
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException))
+ {
+ // Initialize the address server with the desired address.
+ // This will be used by the sender object and the multicast
+ // receiver.
+ ACE_INET_Addr send_addr (port, address);
+ SimpleAddressServer addr_srv_impl (send_addr);
+
+ PortableServer::ObjectId_var addr_srv_oid =
+ this->root_poa_->activate_object (&addr_srv_impl);
+
+ CORBA::Object_var addr_srv_obj =
+ this->root_poa_->id_to_reference (addr_srv_oid.in());
+
+ RtecUDPAdmin::AddrServer_var addr_srv =
+ RtecUDPAdmin::AddrServer::_narrow (addr_srv_obj.in());
+
+ this->addr_serv_map_.bind (
+ name,
+ RtecUDPAdmin::AddrServer::_duplicate (addr_srv.in ()));
+
+ return true;
+ }
+
+ ::CORBA::Boolean
+ RTEventService::create_sender (
+ const char * addr_serv_id
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException))
+ {
+ // Create and initialize the sender object
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender =
+ TAO_ECG_UDP_Sender::create();
+
+ TAO_ECG_UDP_Out_Endpoint endpoint;
+
+ if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1)
+ {
+ ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n"));
+ return false;
+ }
+
+ // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
+ // If we don't clone our endpoint and pass &endpoint, the sender will
+ // attempt to delete endpoint during shutdown.
+ TAO_ECG_UDP_Out_Endpoint* clone;
+ ACE_NEW_RETURN (clone,
+ TAO_ECG_UDP_Out_Endpoint (endpoint),
+ false);
+
+ RtecUDPAdmin::AddrServer_var addr_srv;
+ if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0)
+ return false;
+
+ sender->init (this->rt_event_channel_.in (),
+ addr_srv.in (),
+ clone);
+
+ // Setup the subscription and connect to the EC
+ ACE_ConsumerQOS_Factory cons_qos_fact;
+ cons_qos_fact.start_disjunction_group ();
+ cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0);
+ RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS ();
+ sender->connect (sub);
+
+ return true;
+ }
+
+ ::CORBA::Boolean
+ RTEventService::create_receiver (
+ const char * addr_serv_id,
+ ::CORBA::Boolean is_multicast,
+ ::CORBA::UShort listen_port
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException))
+ {
+ // Create and initialize the receiver
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver =
+ TAO_ECG_UDP_Receiver::create();
+
+ TAO_ECG_UDP_Out_Endpoint endpoint;
+ if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1)
+ {
+ ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n"));
+ return false;
+ }
+
+ // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
+ // If we don't clone our endpoint and pass &endpoint, the receiver will
+ // attempt to delete endpoint during shutdown.
+ TAO_ECG_UDP_Out_Endpoint* clone;
+ ACE_NEW_RETURN (clone,
+ TAO_ECG_UDP_Out_Endpoint (endpoint),
+ false);
+
+ RtecUDPAdmin::AddrServer_var addr_srv;
+ if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0)
+ return false;
+
+ receiver->init (this->rt_event_channel_.in (),
+ clone,
+ addr_srv.in ());
+
+ // Setup the registration and connect to the event channel
+ ACE_SupplierQOS_Factory supp_qos_fact;
+ supp_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1);
+ RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS ();
+ receiver->connect (pub);
+
+ // Create the appropriate event handler and register it with the reactor
+ auto_ptr<ACE_Event_Handler> eh;
+
+ if (is_multicast)
+ {
+ auto_ptr<TAO_ECG_Mcast_EH> mcast_eh (new TAO_ECG_Mcast_EH (receiver.in()));
+ mcast_eh->reactor (this->orb_->orb_core ()->reactor ());
+ mcast_eh->open (this->rt_event_channel_.in());
+ ACE_AUTO_PTR_RESET (eh, mcast_eh.release(), ACE_Event_Handler);
+ }
+ else
+ {
+ auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in()));
+ udp_eh->reactor (this->orb_->orb_core ()->reactor ());
+
+ ACE_INET_Addr local_addr (listen_port);
+ if (udp_eh->open (local_addr) == -1)
+ {
+ ACE_DEBUG ((LM_ERROR, "Cannot open event handler.\n"));
+ return false;
+ }
+ ACE_AUTO_PTR_RESET (eh, udp_eh.release(), ACE_Event_Handler);
+ }
+
+ return true;
+ }
+
//////////////////////////////////////////////////////////////////////
/// Supplier Servant Implementation
diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h
index 3d5ced528ba..e932ac3a8bb 100644
--- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h
+++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h
@@ -28,6 +28,11 @@
#include "orbsvcs/orbsvcs/Event_Utilities.h"
#include "orbsvcs/orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event/ECG_Mcast_EH.h"
+#include "orbsvcs/Event/ECG_UDP_Sender.h"
+#include "orbsvcs/Event/ECG_UDP_Receiver.h"
+#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
+#include "orbsvcs/Event/ECG_UDP_EH.h"
#include "ace/Hash_Map_Manager.h"
namespace CIAO
@@ -102,6 +107,28 @@ namespace CIAO
::CORBA::SystemException,
::Components::BadEventType));
+ virtual ::CORBA::Boolean create_addr_serv (
+ const char * name,
+ ::CORBA::UShort port,
+ const char * address
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException));
+
+ virtual ::CORBA::Boolean create_sender (
+ const char * addr_serv_id
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException));
+
+ virtual ::CORBA::Boolean create_receiver (
+ const char * addr_serv_id,
+ ::CORBA::Boolean is_multicast,
+ ::CORBA::UShort listen_port
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((
+ ::CORBA::SystemException));
+
private:
// @@ (GD) This is the place where use could provide a parameter
// which specifies the event channel service configuration file.
@@ -159,6 +186,18 @@ namespace CIAO
ACE_Equal_To<ACE_CString>,
ACE_Null_Mutex> proxy_supplier_map_;
+ /**
+ * @var ACE_Hash_Map_Manager<> addr_serv_map_
+ *
+ * A map which managers a set of address servers for event channel
+ * federation purpose.
+ */
+ ACE_Hash_Map_Manager_Ex<ACE_CString,
+ RtecUDPAdmin::AddrServer_var,
+ ACE_Hash<ACE_CString>,
+ ACE_Equal_To<ACE_CString>,
+ ACE_Null_Mutex> addr_serv_map_;
+
};
/**
diff --git a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl
index 0fa81702a7c..a3c5d3c5eca 100644
--- a/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl
+++ b/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl
@@ -38,14 +38,15 @@ module CIAO
interface CIAO_RT_Evnet_Service :
CIAO_Event_Service
{
- void create_addr_server ();
+ boolean create_addr_serv (in string name,
+ in unsigned short port,
+ in string address);
- void create_sender ();
- void create_receiver ();
- void create_event_handler ();
+ boolean create_sender (in string addr_serv_id);
- void connect_sender ();
- void connect_receiver();
+ boolean create_receiver (in string addr_serv_id,
+ in boolean is_multicast,
+ in unsigned short listen_port);
};
};