diff options
author | Johnny Willemsen <jwillemsen@remedy.nl> | 2006-10-26 10:56:54 +0000 |
---|---|---|
committer | Johnny Willemsen <jwillemsen@remedy.nl> | 2006-10-26 10:56:54 +0000 |
commit | 0c11f823acca02331a653428151039a3204bfc52 (patch) | |
tree | 75d3bcfd9f0f90599f4e3294faa9468021c687fe /TAO/orbsvcs/tests/Event | |
parent | 9586f6ee6890d53a8b99cf6d20c722dee6fdaf9d (diff) | |
download | ATCD-0c11f823acca02331a653428151039a3204bfc52.tar.gz |
Diffstat (limited to 'TAO/orbsvcs/tests/Event')
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/AddrServer.cpp | 19 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/AddrServer.h | 53 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/Consumer.cpp | 143 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/Consumer.h | 66 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/README | 26 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/RtEC_UDP.mpc | 26 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/Supplier.cpp | 108 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/Supplier.h | 64 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/Test.idl | 11 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/receiver.cpp | 366 | ||||
-rwxr-xr-x | TAO/orbsvcs/tests/Event/UDP/run_test.pl | 47 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/sender.cpp | 312 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/svc.conf | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/UDP/svc.conf.xml | 6 |
14 files changed, 1249 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Event/UDP/AddrServer.cpp b/TAO/orbsvcs/tests/Event/UDP/AddrServer.cpp new file mode 100644 index 00000000000..05fd4d9c983 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/AddrServer.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "AddrServer.h" + +ACE_RCSID(EC_Examples, AddrServer, "$Id$") + +AddrServer::AddrServer (const RtecUDPAdmin::UDP_Addr& addr) + : addr_ (addr) +{ +} + +void +AddrServer::get_addr (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Addr_out addr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + addr = this->addr_; +} diff --git a/TAO/orbsvcs/tests/Event/UDP/AddrServer.h b/TAO/orbsvcs/tests/Event/UDP/AddrServer.h new file mode 100644 index 00000000000..8439914f22b --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/AddrServer.h @@ -0,0 +1,53 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef ADDRSERVER_H +#define ADDRSERVER_H +#include /**/ "ace/pre.h" + +#include "orbsvcs/RtecUDPAdminS.h" + +class AddrServer : public POA_RtecUDPAdmin::AddrServer +{ + // = TITLE + // A simple AddrServer + // + // = DESCRIPTION + // The EC is able to use multiple multicast groups to transmit its + // data, the is given control over the mapping between the Event + // (type,source) pair and the (ipaddr,port) pair using a + // AddrServer. + // This class implements a very simple server that simply maps the + // <type> component to the <ipaddr> and uses a fixed <port>, + // provided at initialization time. + // +public: + AddrServer (const RtecUDPAdmin::UDP_Addr& addr); + // Constructor + + // = The RtecUDPAdmin::AddrServer methods + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr_out addr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + RtecUDPAdmin::UDP_Addr addr_; + // The address +}; + +#include /**/ "ace/post.h" +#endif /* ADDRSERVER_H */ diff --git a/TAO/orbsvcs/tests/Event/UDP/Consumer.cpp b/TAO/orbsvcs/tests/Event/UDP/Consumer.cpp new file mode 100644 index 00000000000..596acc11938 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/Consumer.cpp @@ -0,0 +1,143 @@ +// $Id$ + +#include "Consumer.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/Event_Service_Constants.h" + +#include "TestC.h" + +ACE_RCSID (EC_Examples, + Consumer, + "$Id$") + +Consumer::Consumer (bool valuetype) + : event_count_ (0), + valuetype_ (valuetype) +{ +} + +void +Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) +{ + this->proxy_ = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushConsumer_var me = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Simple subscription, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::ConsumerQOS qos; + qos.is_gateway = 0; + + qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = 1; // The disjunction has one element + + RtecEventComm::EventHeader& h1 = + qos.dependencies[1].event.header; + h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h1.source = ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + this->proxy_->connect_push_consumer (me.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_TRY + { + // Disconnect from the proxy + this->proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Ignore exceptions + } + ACE_ENDTRY; + this->proxy_ = RtecEventChannelAdmin::ProxyPushSupplier::_nil (); + + // Deactivate this object + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + this->event_count_ += events.length (); + + for (size_t i = 0; i < events.length (); ++i) + { + if (this->valuetype_) + { + ValueTypeData * test_data = 0; + if (events[i].data.any_value >>= test_data) + { + ACE_DEBUG ((LM_DEBUG, "Consumer (%P|%t): Received a message: %s\n", + test_data->data ())); + if (ACE_OS::strcmp (test_data->data (), "ACE/TAO/CIAO") != 0) + { + ACE_ERROR ((LM_ERROR, "Consumer (%P|%t): ERROR received not expected message\n")); + } + } + else + { + ACE_ERROR ((LM_ERROR, "Consumer (%P|%t): ERROR failed to extract valuetype data\n")); + } + } + else + { + char* mystring = 0; + if (events[i].data.any_value >>= mystring) + { + ACE_DEBUG ((LM_DEBUG, "Consumer (%P|%t): Received a message: %s\n", + mystring)); + if (ACE_OS::strcmp (mystring, "ACE/TAO/CIAO") != 0) + { + ACE_ERROR ((LM_ERROR, "Consumer (%P|%t): ERROR received not expected message\n")); + } + } + else + { + ACE_ERROR ((LM_ERROR, "Consumer (%P|%t): ERROR failed to extract string data\n")); + } + } + } + + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t): %d events received\n", + this->event_count_)); +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/tests/Event/UDP/Consumer.h b/TAO/orbsvcs/tests/Event/UDP/Consumer.h new file mode 100644 index 00000000000..8d00da9a766 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/Consumer.h @@ -0,0 +1,66 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // This class is a consumer of events. + // It simply subscribes to one event type. + // +public: + Consumer (bool valuetype); + // Constructor + + void connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL); + // Connect to the Event Channel + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the event channel + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + CORBA::ULong event_count_; + // Keep track of the number of events received. + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_; + // The proxy + + bool valuetype_; +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/tests/Event/UDP/README b/TAO/orbsvcs/tests/Event/UDP/README new file mode 100644 index 00000000000..55aad804e20 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/README @@ -0,0 +1,26 @@ +# $Id$ + + This directory contains a very simple example of a +multicast-based federation of event services. + + The example is a single process that contains: + +1) An event service +2) A supplier +3) A consumer +4) The gateways required to send and receive data through the + multicast group. + + The tests should be executed as follows: + +$ MCast + + If you need to set the multicast group and port you can use +the -m option: + +$ MCast -m 224.100.2.1:12345 + + Run the test in multiple machines on the same network. If +there is only one process you should only receive 1000 events in the +local consumer. If there is more than one machine you should receive +more events. diff --git a/TAO/orbsvcs/tests/Event/UDP/RtEC_UDP.mpc b/TAO/orbsvcs/tests/Event/UDP/RtEC_UDP.mpc new file mode 100644 index 00000000000..caf503219e2 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/RtEC_UDP.mpc @@ -0,0 +1,26 @@ +// -*- MPC -*- +// $Id$ + +project (* sender): orbsvcsexe, rtevent_serv, rtsched { + exename = sender + Source_Files { + AddrServer.cpp + Supplier.cpp + sender.cpp + } + IDL_Files { + Test.idl + } +} + +project (* receiver) : orbsvcsexe, rtevent_serv, rtsched { + exename = receiver + Source_Files { + AddrServer.cpp + Consumer.cpp + receiver.cpp + } + IDL_Files { + Test.idl + } +} diff --git a/TAO/orbsvcs/tests/Event/UDP/Supplier.cpp b/TAO/orbsvcs/tests/Event/UDP/Supplier.cpp new file mode 100644 index 00000000000..b12ce21200e --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/Supplier.cpp @@ -0,0 +1,108 @@ +// $Id$ + +#include "Supplier.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/Event_Service_Constants.h" + +#include "TestC.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +Supplier::Supplier (bool valuetype) : valuetype_ (valuetype) +{ +} + +void +Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin + ACE_ENV_ARG_DECL) +{ + this->proxy_ = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushSupplier_var me = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Simple publication, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::SupplierQOS qos; + qos.is_gateway = 0; + + qos.publications.length (1); + RtecEventComm::EventHeader& h0 = + qos.publications[0].event.header; + h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h0.source = 1; // first free event source + + this->proxy_->connect_push_supplier (me.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + // Disconnect from the EC + ACE_TRY + { + this->proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Supplier::perform_push (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_TRY + { + // The event type and source must match our publications + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = 1; + // Avoid loops throught the event channel federations + event[0].header.ttl = 1; + + if (this->valuetype_) + { + OBV_ValueTypeData * test_data = 0; + ACE_NEW (test_data, OBV_ValueTypeData ()); + test_data->data ("ACE/TAO/CIAO"); + event[0].data.any_value <<= test_data; + } + else + { + event[0].data.any_value <<= CORBA::string_dup( "ACE/TAO/CIAO"); + } + + this->proxy_->push (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/tests/Event/UDP/Supplier.h b/TAO/orbsvcs/tests/Event/UDP/Supplier.h new file mode 100644 index 00000000000..f773b9f9237 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/Supplier.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Supplier +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Supplier : public POA_RtecEventComm::PushSupplier +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // This class is a supplier of events. + // It simply publishes one event type, when the perform_push() + // method is invoked it pushes the event through the event service + // +public: + Supplier (bool valuetype); + // Constructor + + void connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin + ACE_ENV_ARG_DECL); + // Connect to the event channel + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the event channel + + void perform_push (ACE_ENV_SINGLE_ARG_DECL); + // Push a single event + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_; + // The proxy + + bool valuetype_; +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/tests/Event/UDP/Test.idl b/TAO/orbsvcs/tests/Event/UDP/Test.idl new file mode 100644 index 00000000000..fc7e50d4fd2 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/Test.idl @@ -0,0 +1,11 @@ +//$Id$: + +#ifndef TAO_RTEC_MCAST_TEST_IDL +#define TAO_RTEC_MCAST_TEST_IDL + +valuetype ValueTypeData +{ + public string data; +}; + +#endif /* TAO_RTEC_MCAST_TEST_IDL */ diff --git a/TAO/orbsvcs/tests/Event/UDP/receiver.cpp b/TAO/orbsvcs/tests/Event/UDP/receiver.cpp new file mode 100644 index 00000000000..440ec66bb30 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/receiver.cpp @@ -0,0 +1,366 @@ +// $Id$ + + +#include "Consumer.h" +#include "AddrServer.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "tao/ORB_Core.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (EC_Examples, + MCast, + "$Id$") + +const char *udp_mcast_address = + ACE_DEFAULT_MULTICAST_ADDR ":10001"; +bool valuetype = false; + +int parse_args (int argc, char *argv[]); + +int +main (int argc, char* argv[]) +{ + // Register the default factory in the Service Configurator. + // If your platform supports static constructors then you can + // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO + // must run on platforms where static constructors do not work well, + // so we have to explicitly invoke this function. + TAO_EC_Default_Factory::init_svcs (); + + // The exception macros are described in $ACE_ROOT/docs/exceptions.html + // and defined in $ACE_ROOT/ace/CORBA_macros.h. + // If your platform supports native exceptions, and TAO was compiled + // with native exception support then you can simply use try/catch + // and avoid the ACE_ENV_SINGLE_ARG_PARAMETER argument. + // Unfortunately many embedded systems cannot use exceptions due to + // the space and time overhead. + // + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // **************** HERE STARTS THE ORB SETUP + + // Create the ORB, pass the argv list for parsing. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Parse the arguments, you usually want to do this after + // invoking ORB_init() because ORB_init() will remove all the + // -ORB options from the command line. + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-m udp_mcast_addr]\n")); + return 1; + } + + // This is the standard code to get access to the POA and + // activate it. + // The POA starts in the holding state, if it is not activated + // it will not process any requests. + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETS THE ORB SETUP + + // **************** HERE START THE LOCAL EVENT CHANNEL SETUP + + // This structure is used to define the startup time event + // channel configuration. + // This structure is described in + // + // $TAO_ROOT/docs/ec_options.html + // + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + + // Create the Event Channel implementation class + TAO_EC_Event_Channel ec_impl (attributes); + + // Activate the Event Channel, depending on the configuration + // that may involve creating some threads. + // But it should always be invoked because several internal data + // structures are initialized at that point. + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The event channel is activated as any other CORBA servant. + // In this case we use the simple implicit activation with the + // RootPOA + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP + + // **************** HERE STARTS THE FEDERATION SETUP + + // The next step is to setup the multicast gateways. + // There are two gateways involved, one sends the locally + // generated events to the federated peers, the second gateway + // receives multicast traffic and turns it into local events. + + // The sender requires a helper object to select what + // multicast group will carry what traffic, this is the + // so-called 'Address Server'. + // The intention is that advanced applications can use different + // multicast groups for different events, this can exploit + // network interfaces that filter unwanted multicast traffic. + // The helper object is accessed through an IDL interface, so it + // can reside remotely. + // In this example, and in many application, using a fixed + // multicast group is enough, and a local address server is the + // right approach. + + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (udp_mcast_address); + ACE_DEBUG ((LM_DEBUG, + "Multicast address is: %s\n", + udp_mcast_address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + AddrServer as_impl (addr); + RtecUDPAdmin::AddrServer_var address_server = + as_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + // Now we setup the sender: + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = TAO_ECG_UDP_Sender::create(); + sender->init (event_channel.in (), + address_server.in (), + endpoint + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Now we connect the sender as a consumer of events, it will + // receive any event from any source and send it to the "right" + // multicast group, as defined by the address server set above: + RtecEventChannelAdmin::ConsumerQOS sub; + sub.is_gateway = 1; + + sub.dependencies.length (1); + sub.dependencies[0].event.header.type = + ACE_ES_EVENT_ANY; // first free event type + sub.dependencies[0].event.header.source = + ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + sender->connect (sub ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // To receive events we need to setup an event handler: + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = TAO_ECG_UDP_Receiver::create(); + TAO_ECG_Mcast_EH mcast_eh (&(*receiver)); + + // The event handler uses the ORB reactor to wait for multicast + // traffic: + mcast_eh.reactor (orb->orb_core ()->reactor ()); + + // The multicast Event Handler needs to know to what multicast + // groups it should listen to. To do so it becomes an observer + // with the event channel, to determine the list of events + // required by all the local consumer. + // Then it register for the multicast groups that carry those + // events: + mcast_eh.open (event_channel.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Again the receiver connects to the event channel as a + // supplier of events, using the Observer features to detect + // local consumers and their interests: + receiver->init (event_channel.in (), + endpoint, + address_server.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The Receiver is also a supplier of events. The exact type of + // events is only known to the application, because it depends + // on the traffic carried by all the multicast groups that the + // different event handlers subscribe to. + // In this example we choose to simply describe our publications + // using wilcards, any event from any source. More advanced + // application could use the Observer features in the event + // channel to update this information (and reduce the number of + // multicast groups that each receive subscribes to). + // In a future version the event channel could perform some of + // those tasks automatically + RtecEventChannelAdmin::SupplierQOS pub; + pub.publications.length (1); + pub.publications[0].event.header.type = ACE_ES_EVENT_ANY; + pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + pub.is_gateway = 1; + + receiver->connect (pub ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE FEDERATION SETUP + + // **************** HERE STARTS THE CLIENT SETUP + + // First let us create a consumer and connect it to the event + // channel + Consumer consumer (valuetype); + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + consumer.connect (consumer_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLIENT SETUP + + // **************** HERE STARTS THE EVENT LOOP + + // Wait for events, including incoming multicast data. + // We could also use orb->run(), but that will not let us + // terminate the application in a nice way. + for (int i = 0; i != 1000; ++i) + { + CORBA::Boolean there_is_work = + orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + if (there_is_work) + { + // We use a TAO extension. The CORBA mechanism does not + // provide any decent way to control the duration of + // perform_work() or work_pending(), so just calling + // them results in a spin loop. + ACE_Time_Value tv (0, 50000); + orb->perform_work (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_Time_Value tv (0, 100000); + ACE_OS::sleep (tv); + } + + // **************** THAT COMPLETES THE EVENT LOOP + + // **************** HERE STARTS THE CLEANUP CODE + + consumer.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Now let us close the Receiver + receiver->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + int r = mcast_eh.shutdown (); + + if (r == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Closing MCast event handler\n"), 1); + } + + // The event channel must be destroyed, so it can release its + // resources, and inform all the clients that are still + // connected that it is going away. + event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Deactivating the event channel implementation is not strictly + // required, the POA will do it for us, but it is good manners: + { + // Using _this() activates with the default POA, we must gain + // access to that POA to deactivate the object. + // Notice that we 'know' that the default POA for this servant + // is the root POA, but the code is more robust if we don't + // rely on that. + PortableServer::POA_var poa = + ec_impl._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (&ec_impl ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // Now we can destroy the POA, the flags mean that we want to + // wait until the POA is really destroyed + poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Finally destroy the ORB + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLEANUP CODE + + ACE_DEBUG ((LM_DEBUG, + "MCast example terminated\n")); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "vm:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'm': + udp_mcast_address = get_opts.opt_arg (); + break; + + case 'v': + valuetype = true; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "[-m udp_mcast_address]" + "[-v]" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/tests/Event/UDP/run_test.pl b/TAO/orbsvcs/tests/Event/UDP/run_test.pl new file mode 100755 index 00000000000..fd33d4ee5ad --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/run_test.pl @@ -0,0 +1,47 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib "$ENV{ACE_ROOT}/bin"; +use PerlACE::Run_Test; + +$status = 0; + +foreach $i (@ARGV) { + if ($i eq '-v') { + $valuetype = '-v'; + } +} +$S = new PerlACE::Process ("sender", + "$valuetype"); +$R = new PerlACE::Process ("receiver", + "$valuetype"); + +print STDOUT "Starting sender\n"; +$S->Spawn (); + +sleep 1; + +print STDOUT "Starting receiver\n"; +$R->Spawn (); + +sleep 1; + +$consumer = $S->WaitKill (150); + +if ($consumer != 0) { + print STDERR "ERROR: consumer returned $consumer\n"; + $status = 1; +} + +$receiver = $R->WaitKill (150); + +if ($receiver != 0) { + print STDERR "ERROR: receiver returned $receiver\n"; + $status = 1; +} + +exit $status; diff --git a/TAO/orbsvcs/tests/Event/UDP/sender.cpp b/TAO/orbsvcs/tests/Event/UDP/sender.cpp new file mode 100644 index 00000000000..ccec0abef4f --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/sender.cpp @@ -0,0 +1,312 @@ +// $Id$ + + +#include "Supplier.h" +#include "AddrServer.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "tao/ORB_Core.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (EC_Examples, + MCast, + "$Id$") + +const char *udp_mcast_address = + ACE_DEFAULT_MULTICAST_ADDR ":10001"; +bool valuetype = false; + +int parse_args (int argc, char *argv[]); + +int +main (int argc, char* argv[]) +{ + // Register the default factory in the Service Configurator. + // If your platform supports static constructors then you can + // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO + // must run on platforms where static constructors do not work well, + // so we have to explicitly invoke this function. + TAO_EC_Default_Factory::init_svcs (); + + // The exception macros are described in $ACE_ROOT/docs/exceptions.html + // and defined in $ACE_ROOT/ace/CORBA_macros.h. + // If your platform supports native exceptions, and TAO was compiled + // with native exception support then you can simply use try/catch + // and avoid the ACE_ENV_SINGLE_ARG_PARAMETER argument. + // Unfortunately many embedded systems cannot use exceptions due to + // the space and time overhead. + // + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // **************** HERE STARTS THE ORB SETUP + + // Create the ORB, pass the argv list for parsing. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Parse the arguments, you usually want to do this after + // invoking ORB_init() because ORB_init() will remove all the + // -ORB options from the command line. + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-m udp_mcast_addr]\n")); + return 1; + } + + // This is the standard code to get access to the POA and + // activate it. + // The POA starts in the holding state, if it is not activated + // it will not process any requests. + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETS THE ORB SETUP + + // **************** HERE START THE LOCAL EVENT CHANNEL SETUP + + // This structure is used to define the startup time event + // channel configuration. + // This structure is described in + // + // $TAO_ROOT/docs/ec_options.html + // + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + + // Create the Event Channel implementation class + TAO_EC_Event_Channel ec_impl (attributes); + + // Activate the Event Channel, depending on the configuration + // that may involve creating some threads. + // But it should always be invoked because several internal data + // structures are initialized at that point. + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The event channel is activated as any other CORBA servant. + // In this case we use the simple implicit activation with the + // RootPOA + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP + + // **************** HERE STARTS THE FEDERATION SETUP + + // The next step is to setup the multicast gateways. + // There are two gateways involved, one sends the locally + // generated events to the federated peers, the second gateway + // receives multicast traffic and turns it into local events. + + // The sender requires a helper object to select what + // multicast group will carry what traffic, this is the + // so-called 'Address Server'. + // The intention is that advanced applications can use different + // multicast groups for different events, this can exploit + // network interfaces that filter unwanted multicast traffic. + // The helper object is accessed through an IDL interface, so it + // can reside remotely. + // In this example, and in many application, using a fixed + // multicast group is enough, and a local address server is the + // right approach. + + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (udp_mcast_address); + ACE_DEBUG ((LM_DEBUG, + "Multicast address is: %s\n", + udp_mcast_address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + AddrServer as_impl (addr); + RtecUDPAdmin::AddrServer_var address_server = + as_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + // Now we setup the sender: + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = TAO_ECG_UDP_Sender::create(); + sender->init (event_channel.in (), + address_server.in (), + endpoint + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Now we connect the sender as a consumer of events, it will + // receive any event from any source and send it to the "right" + // multicast group, as defined by the address server set above: + RtecEventChannelAdmin::ConsumerQOS sub; + sub.is_gateway = 1; + + sub.dependencies.length (1); + sub.dependencies[0].event.header.type = + ACE_ES_EVENT_ANY; // first free event type + sub.dependencies[0].event.header.source = + ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + sender->connect (sub ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE FEDERATION SETUP + + // **************** HERE STARTS THE CLIENT SETUP + + // And now create a supplier + Supplier supplier (valuetype); + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + supplier.connect (supplier_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLIENT SETUP + + // **************** HERE STARTS THE EVENT LOOP + + // Wait for events, including incoming multicast data. + // We could also use orb->run(), but that will not let us + // terminate the application in a nice way. + for (int i = 0; i != 1000; ++i) + { + CORBA::Boolean there_is_work = + orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + if (there_is_work) + { + // We use a TAO extension. The CORBA mechanism does not + // provide any decent way to control the duration of + // perform_work() or work_pending(), so just calling + // them results in a spin loop. + ACE_Time_Value tv (0, 50000); + orb->perform_work (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_Time_Value tv (0, 100000); + ACE_OS::sleep (tv); + supplier.perform_push (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // **************** THAT COMPLETES THE EVENT LOOP + + // **************** HERE STARTS THE CLEANUP CODE + + // First the easy ones + supplier.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // And also close the sender of events + sender->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The event channel must be destroyed, so it can release its + // resources, and inform all the clients that are still + // connected that it is going away. + event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Deactivating the event channel implementation is not strictly + // required, the POA will do it for us, but it is good manners: + { + // Using _this() activates with the default POA, we must gain + // access to that POA to deactivate the object. + // Notice that we 'know' that the default POA for this servant + // is the root POA, but the code is more robust if we don't + // rely on that. + PortableServer::POA_var poa = + ec_impl._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (&ec_impl ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // Now we can destroy the POA, the flags mean that we want to + // wait until the POA is really destroyed + poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Finally destroy the ORB + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLEANUP CODE + + ACE_DEBUG ((LM_DEBUG, + "MCast example terminated\n")); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "vm:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'm': + udp_mcast_address = get_opts.opt_arg (); + break; + + case 'v': + valuetype = true; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "[-m udp_mcast_address]" + "[-v]" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/tests/Event/UDP/svc.conf b/TAO/orbsvcs/tests/Event/UDP/svc.conf new file mode 100644 index 00000000000..d0297d4649e --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/svc.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECObserver basic -ECProxyPushConsumerCollection mt:copy_on_write:list -ECProxyPushSupplierCollection mt:copy_on_write:list -ECDispatching reactive -ECScheduling null -ECFiltering prefix -ECSupplierFilter per-supplier" diff --git a/TAO/orbsvcs/tests/Event/UDP/svc.conf.xml b/TAO/orbsvcs/tests/Event/UDP/svc.conf.xml new file mode 100644 index 00000000000..159faa97abc --- /dev/null +++ b/TAO/orbsvcs/tests/Event/UDP/svc.conf.xml @@ -0,0 +1,6 @@ +<?xml version='1.0'?> +<!-- Converted from ./orbsvcs/examples/RtEC/MCast/svc.conf by svcconf-convert.pl --> +<ACE_Svc_Conf> + <!-- $Id$ --> + <static id="EC_Factory" params="-ECObserver basic -ECProxyPushConsumerCollection mt:copy_on_write:list -ECProxyPushSupplierCollection mt:copy_on_write:list -ECDispatching reactive -ECScheduling null -ECFiltering prefix -ECSupplierFilter per-supplier"/> +</ACE_Svc_Conf> |