From 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c Mon Sep 17 00:00:00 2001 From: "William R. Otte" Date: Mon, 24 Jul 2006 15:50:21 +0000 Subject: Repo restructuring --- TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.cpp | 200 ++++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.h | 57 +++ TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.cpp | 159 +++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.h | 39 ++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.cpp | 185 ++++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.h | 38 ++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Makefile.am | 166 +++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/README | 19 + .../examples/RtEC/IIOPGateway/RtEC_IIOPGateway.mpc | 35 ++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.cpp | 195 ++++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.h | 46 ++ .../examples/RtEC/IIOPGateway/consumerec_crash.pl | 150 ++++++ TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf | 4 + TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf.xml | 5 + TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf | 6 + .../examples/RtEC/IIOPGateway/gateway.conf.xml | 5 + TAO/orbsvcs/examples/RtEC/IIOPGateway/run_test.pl | 124 +++++ TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.cpp | 32 ++ TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.h | 55 +++ TAO/orbsvcs/examples/RtEC/Kokyu/Makefile.am | 67 +++ TAO/orbsvcs/examples/RtEC/Kokyu/README | 66 +++ TAO/orbsvcs/examples/RtEC/Kokyu/RtECKokyu.mpc | 3 + TAO/orbsvcs/examples/RtEC/Kokyu/Service.cpp | 522 +++++++++++++++++++++ TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.cpp | 71 +++ TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.h | 87 ++++ TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf | 3 + TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf.xml | 7 + TAO/orbsvcs/examples/RtEC/MCast/AddrServer.cpp | 19 + TAO/orbsvcs/examples/RtEC/MCast/AddrServer.h | 53 +++ TAO/orbsvcs/examples/RtEC/MCast/Consumer.cpp | 104 ++++ TAO/orbsvcs/examples/RtEC/MCast/Consumer.h | 64 +++ TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp | 379 +++++++++++++++ TAO/orbsvcs/examples/RtEC/MCast/Makefile.am | 66 +++ TAO/orbsvcs/examples/RtEC/MCast/README | 26 + TAO/orbsvcs/examples/RtEC/MCast/RtEC_MCast.mpc | 5 + TAO/orbsvcs/examples/RtEC/MCast/Supplier.cpp | 94 ++++ TAO/orbsvcs/examples/RtEC/MCast/Supplier.h | 62 +++ TAO/orbsvcs/examples/RtEC/MCast/svc.conf | 2 + TAO/orbsvcs/examples/RtEC/MCast/svc.conf.xml | 6 + TAO/orbsvcs/examples/RtEC/Makefile.am | 17 + TAO/orbsvcs/examples/RtEC/Schedule/Consumer.cpp | 32 ++ TAO/orbsvcs/examples/RtEC/Schedule/Consumer.h | 55 +++ TAO/orbsvcs/examples/RtEC/Schedule/Makefile.am | 65 +++ TAO/orbsvcs/examples/RtEC/Schedule/README | 23 + .../examples/RtEC/Schedule/RtEC_Schedule.mpc | 5 + TAO/orbsvcs/examples/RtEC/Schedule/Schedule.h | 42 ++ TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp | 408 ++++++++++++++++ TAO/orbsvcs/examples/RtEC/Schedule/Supplier.cpp | 18 + TAO/orbsvcs/examples/RtEC/Schedule/Supplier.h | 54 +++ TAO/orbsvcs/examples/RtEC/Schedule/svc.conf | 2 + TAO/orbsvcs/examples/RtEC/Schedule/svc.conf.xml | 6 + TAO/orbsvcs/examples/RtEC/Simple/Consumer.cpp | 162 +++++++ TAO/orbsvcs/examples/RtEC/Simple/Consumer.h | 60 +++ TAO/orbsvcs/examples/RtEC/Simple/Makefile.am | 130 +++++ TAO/orbsvcs/examples/RtEC/Simple/README | 15 + TAO/orbsvcs/examples/RtEC/Simple/RtEC_Simple.mpc | 20 + TAO/orbsvcs/examples/RtEC/Simple/Service.cpp | 141 ++++++ TAO/orbsvcs/examples/RtEC/Simple/Supplier.cpp | 152 ++++++ TAO/orbsvcs/examples/RtEC/Simple/Supplier.h | 51 ++ TAO/orbsvcs/examples/RtEC/Simple/ec.conf | 2 + TAO/orbsvcs/examples/RtEC/Simple/ec.conf.xml | 6 + TAO/orbsvcs/examples/RtEC/Simple/run_test.pl | 83 ++++ 62 files changed, 4775 insertions(+) create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.h create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.h create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.h create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/README create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/RtEC_IIOPGateway.mpc create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.h create mode 100755 TAO/orbsvcs/examples/RtEC/IIOPGateway/consumerec_crash.pl create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf.xml create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf create mode 100644 TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf.xml create mode 100755 TAO/orbsvcs/examples/RtEC/IIOPGateway/run_test.pl create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.h create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/README create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/RtECKokyu.mpc create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Service.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.h create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf create mode 100644 TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf.xml create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/AddrServer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/AddrServer.h create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/Consumer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/Consumer.h create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/README create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/RtEC_MCast.mpc create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/Supplier.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/Supplier.h create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/svc.conf create mode 100644 TAO/orbsvcs/examples/RtEC/MCast/svc.conf.xml create mode 100644 TAO/orbsvcs/examples/RtEC/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Consumer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Consumer.h create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/README create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/RtEC_Schedule.mpc create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Schedule.h create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Supplier.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/Supplier.h create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/svc.conf create mode 100644 TAO/orbsvcs/examples/RtEC/Schedule/svc.conf.xml create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Consumer.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Consumer.h create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Makefile.am create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/README create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/RtEC_Simple.mpc create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Service.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Supplier.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/Supplier.h create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/ec.conf create mode 100644 TAO/orbsvcs/examples/RtEC/Simple/ec.conf.xml create mode 100755 TAO/orbsvcs/examples/RtEC/Simple/run_test.pl (limited to 'TAO/orbsvcs/examples/RtEC') diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.cpp new file mode 100644 index 00000000000..05b214aabbc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.cpp @@ -0,0 +1,200 @@ +// $Id$ + +#include "Consumer.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/CosNamingC.h" +#include "ace/Arg_Shifter.h" +#include "ace/OS_NS_string.h" + +ACE_RCSID (EC_Examples, + Consumer, + "$Id$") + +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +static const char* ecname = 0; + +int +main (int argc, char* argv[]) +{ + Consumer consumer; + + return consumer.run (argc, argv); +} + +// **************************************************************** + +Consumer::Consumer (void) + : event_count_ (0) +{ +} + +int +Consumer::run (int argc, char* argv[]) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // First parse our command line options + if (this->parse_args(argc, argv) != 0) + { + return -1; + } + + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Do *NOT* make a copy because we don't want the ORB to outlive + // the run() method. + this->orb_ = orb.in (); + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the event channel from the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (ecname); + + CORBA::Object_var ec_obj = + naming_context->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (event_channel.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get Event Channel.\n"), + 1); + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var consumer = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert (MY_SOURCE_ID, // Source ID + MY_EVENT_TYPE, // Event Type + 0); // handle to the rt_info + for (int i = 0; i < 10; i++) + { + qos.insert (MY_SOURCE_ID + i, // Source ID + MY_EVENT_TYPE + i, // Event Type + 0); // handle to the rt_info + } + supplier->connect_push_consumer (consumer.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Wait for events, using work_pending()/perform_work() may help + // or using another thread, this example is too simple for that. + orb->run (); + + // We don't do any cleanup, it is hard to do it after shutdown, + // and would complicate the example; plus it is almost + // impossible to do cleanup after ORB->run() because the POA is + // in the holding state. Applications should use + // work_pending()/perform_work() to do more interesting stuff. + // Check the supplier for the proper way to do cleanup. + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer::run"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + this->event_count_ += events.length (); + if (this->event_count_ % 100 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t): %d events received\n", + this->event_count_)); + } +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // In this example we shutdown the ORB when we disconnect from the + // EC (or rather the EC disconnects from us), but this doesn't have + // to be the case.... + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); +} + +int +Consumer::parse_args (int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcmp (arg, "-e") == 0) + { + arg_shifter.consume_arg (); + ecname = arg_shifter.get_current (); + } + + arg_shifter.ignore_arg (); + } + + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.h b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.h new file mode 100644 index 00000000000..c8268e1c3ea --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Consumer.h @@ -0,0 +1,57 @@ +/* -*- C++ -*- */ +/** + * @file Consumer.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * + * Consumer + */ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class Consumer + * + * @brief Simple consumer object + * + * This class is a consumer of events. It simply registers for one event type. + */ +class Consumer : public POA_RtecEventComm::PushConsumer +{ +public: + /// Constructor + Consumer (void); + + /// Run the test + int run (int argc, char* argv[]); + + // = The RtecEventComm::PushConsumer methods + + // The skeleton methods. + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + int parse_args (int argc, char *argv[]); + + /// Keep track of the number of events received. + CORBA::ULong event_count_; + + /// The orb, just a pointer because the ORB does not outlive the + /// run() method... + CORBA::ORB_ptr orb_; +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.cpp b/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.cpp new file mode 100644 index 00000000000..a25e9f67569 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.cpp @@ -0,0 +1,159 @@ +// $Id$ + +#include "EC.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/Event/EC_Gateway.h" +#include "ace/Arg_Shifter.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +static const char* ecname = 0; + +int +main (int argc, char* argv[]) +{ + EC channel; + + return channel.run (argc, argv); +} + +// **************************************************************** + +EC::EC (void) +{ +} + +int +EC::run (int argc, char* argv[]) +{ + TAO_EC_Default_Factory::init_svcs (); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // First parse our command line options + if (this->parse_args(argc, argv) != 0) + { + return -1; + } + + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var rootpoa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var root_poa_manager = + rootpoa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Create persistent POA + CORBA::PolicyList policies (2); + policies.length (2); + + policies[0] = + rootpoa->create_id_assignment_policy (PortableServer::USER_ID + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + policies[1] = + rootpoa->create_lifespan_policy (PortableServer::PERSISTENT + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_CString poaname = "POA"; + PortableServer::POA_var child_poa_ = + rootpoa->create_POA (poaname.c_str (), + root_poa_manager.in (), + policies + ACE_ENV_ARG_PARAMETER); + + // Create a local event channel and register it with the RootPOA. + TAO_EC_Event_Channel_Attributes attributes (rootpoa.in (), rootpoa.in ()); + attributes.consumer_reconnect = 1; + attributes.supplier_reconnect = 1; + + TAO_EC_Event_Channel ec_impl (attributes); + ec_impl.activate (); + + PortableServer::ObjectId_var ecId = PortableServer::string_to_ObjectId(ecname); + + child_poa_->activate_object_with_id(ecId.in(), &ec_impl ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var ec_obj = child_poa_->id_to_reference(ecId.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Find the Naming Service. + object = orb->resolve_initial_references("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::NamingContextExt_var naming_context = + CosNaming::NamingContextExt::_narrow(object.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Create a name. + CosNaming::Name name; + name.length (1); + name[0].id = CORBA::string_dup (ecname); + name[0].kind = CORBA::string_dup (""); + + // Register with the name server + naming_context->rebind (name, ec.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + root_poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Wait for events, using work_pending()/perform_work() may help + // or using another thread, this example is too simple for that. + orb->run (); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC::run"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +int +EC::parse_args (int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcmp (arg, "-e") == 0) + { + arg_shifter.consume_arg (); + ecname = arg_shifter.get_current (); + } + + arg_shifter.ignore_arg (); + } + + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.h b/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.h new file mode 100644 index 00000000000..2734de1dcd0 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/EC.h @@ -0,0 +1,39 @@ +/* -*- C++ -*- */ +/** + * @file EC.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * + * Event channel + */ + +#ifndef EC_H +#define EC_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class EC + * + * @brief Simple event channel + */ +class EC +{ +public: + /// Constructor + EC (void); + + /// Run the test + int run (int argc, char* argv[]); + +private: + int parse_args (int argc, char *argv[]); +}; + +#endif /* EC_H */ diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.cpp b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.cpp new file mode 100644 index 00000000000..c1be767ffd8 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.cpp @@ -0,0 +1,185 @@ +// $Id$ + +#include "Gateway.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/Event/EC_Gateway_IIOP.h" +#include "orbsvcs/Event/EC_Gateway_IIOP_Factory.h" +#include "ace/Arg_Shifter.h" +#include "ace/Dynamic_Service.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +static const char* supplierec = 0; +static const char* consumerec = 0; + +int +main (int argc, char* argv[]) +{ + Gateway gateway; + + return gateway.run (argc, argv); +} + +// **************************************************************** + +Gateway::Gateway (void) +{ +} + +int +Gateway::run (int argc, char* argv[]) +{ + TAO_EC_Gateway_IIOP_Factory::init_svcs (); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // First parse our command line options + if (this->parse_args(argc, argv) != 0) + { + return -1; + } + + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the event channel from the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name supplierecname (1); + supplierecname.length (1); + supplierecname[0].id = CORBA::string_dup (supplierec); + + CORBA::Object_var supplierec_obj = + naming_context->resolve (supplierecname ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name consumerecname (1); + consumerecname.length (1); + consumerecname[0].id = CORBA::string_dup (consumerec); + + CORBA::Object_var consumerec_obj = + naming_context->resolve (consumerecname ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var supplier_event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (supplierec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (supplier_event_channel.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the supplier event channel.\n"), + 1); + + RtecEventChannelAdmin::EventChannel_var consumer_event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (consumerec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (consumer_event_channel.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the consumer event channel.\n"), + 1); + + TAO_EC_Gateway_IIOP gateway; + + gateway.init(supplier_event_channel.in(), consumer_event_channel.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::ObjectId_var gateway_oid = + poa->activate_object(&gateway ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var gateway_obj = + poa->id_to_reference(gateway_oid.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::Observer_var obs = + RtecEventChannelAdmin::Observer::_narrow(gateway_obj.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::Observer_Handle local_ec_obs_handle = + consumer_event_channel->append_observer (obs.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Wait for events, using work_pending()/perform_work() may help + // or using another thread, this example is too simple for that. + orb->run (); + + consumer_event_channel->remove_observer (local_ec_obs_handle + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + poa->deactivate_object (gateway_oid.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Destroy the POA + poa->destroy (1, 0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Gateway::run"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +int +Gateway::parse_args (int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcmp (arg, "-s") == 0) + { + arg_shifter.consume_arg (); + supplierec = arg_shifter.get_current (); + } + if (ACE_OS::strcmp (arg, "-c") == 0) + { + arg_shifter.consume_arg (); + consumerec = arg_shifter.get_current (); + } + + arg_shifter.ignore_arg (); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.h b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.h new file mode 100644 index 00000000000..6c7d931d988 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Gateway.h @@ -0,0 +1,38 @@ +/* -*- C++ -*- */ +/** + * @file Gateway.h + * + * $Id$ + * + * @author Johnny Willemsen (jwillemsen@remedy.nl) + * + * IIOP Gateway + */ +#ifndef GATEWAY_H +#define GATEWAY_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class Gateway + * + * @brief Simple gateway + */ +class Gateway +{ +public: + /// Constructor + Gateway (void); + + /// Run the test + int run (int argc, char* argv[]); + +private: + int parse_args (int argc, char *argv[]); +}; + +#endif /* GATEWAY_H */ diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Makefile.am b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Makefile.am new file mode 100644 index 00000000000..73c1ace16cb --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Makefile.am @@ -0,0 +1,166 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.RtEC_IIOPGateway.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += Gateway + +Gateway_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Gateway_SOURCES = \ + Gateway.cpp \ + Gateway.h + +Gateway_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Makefile.RtEC_IIOPGateway_Consumer.am + +if BUILD_CORBA_MESSAGING + +noinst_PROGRAMS += Consumer + +Consumer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Consumer_SOURCES = \ + Consumer.cpp \ + Consumer.h + +Consumer_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_CORBA_MESSAGING + +## Makefile.RtEC_IIOPGateway_EC.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += EC + +EC_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +EC_SOURCES = \ + EC.cpp \ + EC.h + +EC_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Makefile.RtEC_IIOPGateway_Supplier.am + +if BUILD_CORBA_MESSAGING + +noinst_PROGRAMS += Supplier + +Supplier_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Supplier_SOURCES = \ + Supplier.cpp \ + Supplier.h + +Supplier_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/README b/TAO/orbsvcs/examples/RtEC/IIOPGateway/README new file mode 100644 index 00000000000..501bc694e31 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/README @@ -0,0 +1,19 @@ +# $Id$ + + This directory contains an example of the real-time event service +and the IIOP Gateway. There are four executables, the consumer, supplier, +the channel and the gateway. + + The idea is that we have a supplier that pushes to event channel +channel1, this channel1 pushes the events to the gateway, the gateway to +channel2 and then channel2 to the real consumer. + + Run using the run_test.pl script. + + In the script consumerec_crash script channel2 and the consumer +are killed and restarted after a few seconds. When the gateway uses the +reconnect policy you will see that after the timeout in the gateway has +expired the consumer receives events again. + + More advanced tests are available in +$TAO_ROOT/orbsvcs/tests/Event and $TAO_ROOT/orbsvcs/EC_* diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/RtEC_IIOPGateway.mpc b/TAO/orbsvcs/examples/RtEC/IIOPGateway/RtEC_IIOPGateway.mpc new file mode 100644 index 00000000000..2ece2c2a38d --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/RtEC_IIOPGateway.mpc @@ -0,0 +1,35 @@ +// -*- MPC -*- +// $Id$ + +project(*Consumer): messaging, rteventexe, naming { + requires += corba_messaging + + Source_Files { + Consumer.cpp + } +} + +project(*Supplier): messaging, rteventexe, naming { + requires += corba_messaging + + Source_Files { + Supplier.cpp + } +} + +project(*): messaging, rteventexe, rtevent_serv, naming { + requires += corba_messaging + + Source_Files { + Gateway.cpp + } +} + +project(*EC): messaging, rteventexe, rtevent_serv, naming { + requires += corba_messaging + + Source_Files { + EC.cpp + } +} + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.cpp new file mode 100644 index 00000000000..aad09c037e8 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.cpp @@ -0,0 +1,195 @@ +// $Id$ + +#include "Supplier.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/CosNamingC.h" +#include "ace/Arg_Shifter.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +static const char* ecname = 0; + +int +main (int argc, char* argv[]) +{ + Supplier supplier; + + return supplier.run (argc, argv); +} + +// **************************************************************** + +Supplier::Supplier (void) +{ +} + +int +Supplier::run (int argc, char* argv[]) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // First parse our command line options + if (this->parse_args(argc, argv) != 0) + { + return -1; + } + + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the event channel from the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (ecname); + + CORBA::Object_var ec_obj = + naming_context->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushSupplier_var supplier = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Publish the events the supplier provides. + ACE_SupplierQOS_Factory qos; + qos.insert (MY_SOURCE_ID, // Supplier's unique id + MY_EVENT_TYPE, // Event type + 0, // handle to the rt_info structure + 1); // number of calls + + consumer->connect_push_supplier (supplier.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Push the events... + ACE_Time_Value sleep_time (0, 10000); // 10 milliseconds + + RtecEventComm::EventSet event (1); + event.length (1); + for (int j = 0; j < 1; j++) + { + event[j].header.source = MY_SOURCE_ID; + event[j].header.ttl = 1; + event[j].header.type = MY_EVENT_TYPE; + } + + for (int i = 1; i != 4000; ++i) + { + if (i % 100 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Supplier (%P|%t): %d events send\n", + i)); + } + consumer->push (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_OS::sleep (sleep_time); + } + + // Disconnect from the EC + consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Destroy the EC.... + event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Deactivate this object... + PortableServer::ObjectId_var id = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Destroy the POA + poa->destroy (1, 0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Supplier::run"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +int +Supplier::parse_args (int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcmp (arg, "-e") == 0) + { + arg_shifter.consume_arg (); + ecname = arg_shifter.get_current (); + } + + arg_shifter.ignore_arg (); + } + + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.h b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.h new file mode 100644 index 00000000000..98e87ec39d2 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/Supplier.h @@ -0,0 +1,46 @@ +/* -*- C++ -*- */ +/** + * @file Supplier.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * + * IIOP Gateway + */ +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class Supplier + * + * @brief Simple supplier object + * + * This class is a supplier of events. It simply publishes one event type. + */ +class Supplier : public POA_RtecEventComm::PushSupplier +{ +public: + /// Constructor + Supplier (void); + + /// Run the test + int run (int argc, char* argv[]); + + // = The RtecEventComm::PushSupplier methods + + /// The skeleton methods. + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + int parse_args (int argc, char *argv[]); +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/consumerec_crash.pl b/TAO/orbsvcs/examples/RtEC/IIOPGateway/consumerec_crash.pl new file mode 100755 index 00000000000..3267bd38a30 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/consumerec_crash.pl @@ -0,0 +1,150 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../../../bin'; +use PerlACE::Run_Test; + +$status = 0; + +$ns_ior = PerlACE::LocalFile ("ns.ior"); +$conffile = PerlACE::LocalFile ("ec" . "$PerlACE::svcconf_ext"); +$gatewayconffile = PerlACE::LocalFile ("gateway" . "$PerlACE::svcconf_ext"); + +unlink $ns_ior; + +$NS = new PerlACE::Process ("../../../Naming_Service/Naming_Service", + "-o $ns_ior"); + +$T1 = new PerlACE::Process ("EC", + "-ORBInitRef NameService=file://$ns_ior " + . "-ORBsvcconf $conffile " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile supplierec.log " + . "-e channel1 "); + +$T2 = new PerlACE::Process ("EC", + "-ORBInitRef NameService=file://$ns_ior -ORBEndpoint iiop://localhost:6000 " + . "-ORBsvcconf $conffile " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile consumerec.log " + . "-e channel2 "); + +$G = new PerlACE::Process ("Gateway", + "-ORBInitRef NameService=file://$ns_ior " + . "-ORBSvcconf $gatewayconffile " + . "-c channel2 " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile gateway.log " + . "-s channel1 "); + +$C = new PerlACE::Process ("Consumer", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile consumer.log " + . "-e channel2 "); + +$S = new PerlACE::Process ("Supplier", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile supplier.log " + . "-e channel1 "); + +print STDOUT "Starting name server\n"; +$NS->Spawn (); + +if (PerlACE::waitforfile_timed ($ns_ior, 15) == -1) { + print STDERR "ERROR: cannot find file <$ns_ior>\n"; + $NS->Kill (); + exit 1; +} + +print STDOUT "Starting event channel 1\n"; +$T1->Spawn (); + +sleep 2; + +print STDOUT "Starting event channel 2\n"; +$T2->Spawn (); + +sleep 2; + +print STDOUT "Starting gateway\n"; +$G->Spawn (); + +sleep 2; + +print STDOUT "Starting consumer\n"; +$C->Spawn (); + +sleep 1; + +print STDOUT "Starting supplier\n"; +#$supplier = $S->SpawnWaitKill (12000); +$S->Spawn(); + +sleep 1; + +if ($supplier != 0) { + print STDERR "ERROR: supplier returned $supplier\n"; + $status = 1; +} + +#$consumer = $C->WaitKill (10); +# +#if ($consumer != 0) { +# print STDERR "ERROR: consumer returned $consumer\n"; +# $status = 1; +#} + +print STDOUT "Terminating event channel 2 and consumer in 10 seconds...\n"; +#$service = $T2->TerminateWaitKill (5); +$service = $T2->WaitKill (10); +$C->Kill(); + +if ($service != 0) { + print STDERR "ERROR: service returned $service\n"; + $status = 1; +} + +sleep 10; + +print STDOUT "Starting event channel 2 again...\n"; +$T2->Spawn (); + +sleep 2; + +print STDOUT "Starting consumer again...\n"; +$C->Spawn (); + +#$supplier = $C->WaitKill (15); +# +#if ($supplier != 0) { +# print STDERR "ERROR: supplier returned $supplier\n"; +# $status = 1; +#} + +print STDOUT "1500 seconds before termination...\n"; +sleep 1500; + +print STDOUT "Terminating supplier...\n"; +$S->TerminateWaitKill (5); + +print STDOUT "Terminating consumer...\n"; +$C->TerminateWaitKill (5); + +print STDOUT "Terminating gateway...\n"; +$G->TerminateWaitKill (5); + +print STDOUT "Terminating event channels...\n"; +$T1->TerminateWaitKill (5); +$T2->TerminateWaitKill (5); + +$nserver = $NS->TerminateWaitKill (5); + +if ($nserver != 0) { + print STDERR "ERROR: name server returned $nserver\n"; + $status = 1; +} + +unlink $ns_ior; + +exit $status; diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf b/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf new file mode 100644 index 00000000000..73633a3f946 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf @@ -0,0 +1,4 @@ +# $Id$ +#static EC_Factory "-ECobserver basic -ECDispatching reactive -ECFiltering basic -ECSupplierFiltering per-supplier -ECProxyConsumerLock thread -ECProxySupplierLock thread -ECConsumerControl reactive -ECSupplierControl reactive -ECConsumerControlPeriod 50000 -ECSupplierControlPeriod 50000" +static EC_Factory "-ECobserver basic " +#static EC_Gateway_IIOP "-ECGIIOPConsumerECControl reactive" diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf.xml b/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf.xml new file mode 100644 index 00000000000..8c526f5a8cb --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/ec.conf.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf b/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf new file mode 100644 index 00000000000..8319c781e4e --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf @@ -0,0 +1,6 @@ +# $Id$ +#static EC_Factory "-ECobserver basic -ECDispatching reactive -ECFiltering basic -ECSupplierFiltering per-supplier -ECProxyConsumerLock thread -ECProxySupplierLock thread -ECConsumerControl reactive -ECSupplierControl reactive -ECConsumerControlPeriod 50000 -ECSupplierControlPeriod 50000" +#static EC_Factory "-ECobserver basic " +#dynamic Logger Service_Object * ACE:_make_ACE_Logging_Strategy() "-f STDERR|VERBOSE_LITE" +static EC_Gateway_IIOP_Factory "-ECGIIOPConsumerECControl reconnect -ECGIIOPConsumerECControlPeriod 1000000 -ECGIIOPConsumerECControlTimeout 500000 -ECGIIOPUseConsumerProxyMap 0" +#static EC_Gateway_IIOP_Factory "-ECGIIOPConsumerECControl reactive -ECGIIOPConsumerECControlPeriod 100000 -ECGIIOPConsumerECControlTimeout 50000" diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf.xml b/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf.xml new file mode 100644 index 00000000000..39ef790c9d7 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/gateway.conf.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/IIOPGateway/run_test.pl b/TAO/orbsvcs/examples/RtEC/IIOPGateway/run_test.pl new file mode 100755 index 00000000000..ba1896e9eb6 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/IIOPGateway/run_test.pl @@ -0,0 +1,124 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../../../bin'; +use PerlACE::Run_Test; + +$status = 0; + +$ns_ior = PerlACE::LocalFile ("ns.ior"); +$conffile = PerlACE::LocalFile ("ec" . "$PerlACE::svcconf_ext"); +$gatewayconffile = PerlACE::LocalFile ("gateway" . "$PerlACE::svcconf_ext"); + +unlink $ns_ior; + +$NS = new PerlACE::Process ("../../../Naming_Service/Naming_Service", + "-o $ns_ior "); + +$T1 = new PerlACE::Process ("EC", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile supplierec.log " + . "-ORBsvcconf $conffile " + . "-e channel1 "); + +$T2 = new PerlACE::Process ("EC", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile consumerec.log " + . "-ORBsvcconf $conffile " + . "-e channel2 "); + +$G = new PerlACE::Process ("Gateway", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile gateway.log " + . "-ORBSvcconf $gatewayconffile " + . "-c channel2 " + . "-s channel1 "); + +$C = new PerlACE::Process ("Consumer", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile consumer.log " + . "-e channel2 "); + +$S = new PerlACE::Process ("Supplier", + "-ORBInitRef NameService=file://$ns_ior " +# . "-ORBDebug -ORBDebugLevel 10 -ORBLogFile supplier.log " + . "-e channel1 "); + +print STDOUT "Starting name server\n"; +$NS->Spawn (); + +if (PerlACE::waitforfile_timed ($ns_ior, 15) == -1) { + print STDERR "ERROR: cannot find file <$ns_ior>\n"; + $NS->Kill (); + exit 1; +} + +print STDOUT "Starting event channel 1\n"; +$T1->Spawn (); + +sleep 2; + +print STDOUT "Starting event channel 2\n"; +$T2->Spawn (); + +sleep 2; + +print STDOUT "Starting gateway\n"; +$G->Spawn (); + +sleep 2; + +print STDOUT "Starting consumer\n"; +$C->Spawn (); + +sleep 1; + +print STDOUT "Starting supplier\n"; +#$supplier = $S->SpawnWaitKill (12000); +$S->Spawn(); + +sleep 1; + +if ($supplier != 0) { + print STDERR "ERROR: supplier returned $supplier\n"; + $status = 1; +} + +$consumer = $C->WaitKill (10); + +if ($consumer != 0) { + print STDERR "ERROR: consumer returned $consumer\n"; + $status = 1; +} + +$service = $T2->TerminateWaitKill (5); + +if ($service != 0) { + print STDERR "ERROR: service returned $service\n"; + $status = 1; +} + +#$supplier = $C->WaitKill (15); +# +#if ($supplier != 0) { +# print STDERR "ERROR: supplier returned $supplier\n"; +# $status = 1; +#} + +print STDOUT "15 seconds before termination...\n"; +sleep 15; + +$nserver = $NS->TerminateWaitKill (5); + +if ($nserver != 0) { + print STDERR "ERROR: name server returned $nserver\n"; + $status = 1; +} + +unlink $ns_ior; + +exit $status; diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.cpp new file mode 100644 index 00000000000..4f23249c145 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.cpp @@ -0,0 +1,32 @@ +// $Id$ + +#include "Consumer.h" + +ACE_RCSID(EC_Examples, Consumer, "$Id$") + +Consumer::Consumer (void) +{ +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "Consumer (%P|%t) we received event type %d\n", + events[0].header.type)); +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.h b/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.h new file mode 100644 index 00000000000..bdbdbaad894 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Consumer.h @@ -0,0 +1,55 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // This class is a consumer of events. + // It simply register for two event typesone event type + // The class is just a helper to simplify common tasks in EC + // tests, such as subscribing for a range of events, disconnecting + // from the EC, informing the driver of shutdown messages, etc. + // + // There are several ways to connect and disconnect this class, + // and it is up to the driver program to use the right one. + // +public: + Consumer (void); + // Constructor + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Makefile.am b/TAO/orbsvcs/examples/RtEC/Kokyu/Makefile.am new file mode 100644 index 00000000000..f2e00e44bc0 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Makefile.am @@ -0,0 +1,67 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + + +## Makefile.RtECKokyu.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS = Service + +Service_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -I$(ACE_ROOT)/Kokyu + +Service_SOURCES = \ + Consumer.cpp \ + Service.cpp \ + Supplier.cpp \ + Consumer.h \ + Supplier.h + +Service_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTKokyuEvent.la \ + $(ACE_BUILDDIR)/Kokyu/libKokyu.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTSched.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/README b/TAO/orbsvcs/examples/RtEC/Kokyu/README new file mode 100644 index 00000000000..f7a98f7acc7 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/README @@ -0,0 +1,66 @@ +# $Id$ + +Shows how to use the scheduling service in conjunction with +the real-time event channel. The test also uses the Kokyu +dispatching module within the RTEC, which provides the +dispatching queues for the isolation of events based on +their preemption priority generated by the scheduler. The +test has two consumers and two suppliers. The test also +demonstrates how to use timers in the EC to trigger timeout +events for timeout consumers which inturn act as suppliers +to other consumers. The following shows the test setup. + + +LO_CRIT |-----| +1Hz EC Timer1 ----> TimerConsumer1 ---> Supplier1 --->| |---> Consumer1 + | EC | + | | +1/3Hz EC Timer2 ----> TimerConsumer2 ---> Supplier2 --->| |---> Consumer2 +HI_CRIT |-----| + +The event-channel cooperates with the scheduling service to +compute a schedule and assign priorities to each event. The event +channel will use different queues for those events, each queue +serviced by threads at different priorities. In the above +test case, there will be two dispatching queues, one for each +flow. The 1Hz flow will have higher priority than the 1/3Hz flow +wirh plain RMS scheduling. With MUF scheduling, the HI_CRIT +flow will have higher priority than the LO_CRIT flow. + +The example can be run as follows: + +$ ./Service -s + +Please make sure you run the example with root privileges. + +Expected output for RMS +----------------------- +You should see the 1Hz events dispatched by a higher priority +thread than the 1/3Hz events. Sample output is shown below. Here +2051 is the thread id of the thread dispatching 1/3Hz events +and 1026 is the thread id of the thread dispatching 1Hz events. +The latter runs at a higher real-time thread priority than the +former under RMS scheduling strategy. + +Consumer (27703|2051) we received event type 17 +Consumer (27703|1026) we received event type 16 +Consumer (27703|1026) we received event type 16 +Consumer (27703|1026) we received event type 16 +Consumer (27703|2051) we received event type 17 + +Expected output for MUF +----------------------- +You should see the 1/3Hz events dispatched by a higher priority +thread than the 1Hz events since the former is more critical +than the latter. Sample output is shown below. Here +2051 is the thread id of the thread dispatching 1Hz events +and 1026 is the thread id of the thread dispatching 1/3Hz events. +The latter runs at a higher real-time thread priority than the +former under MUF scheduling strategy. + +Consumer (28191|2051) we received event type 16 +Consumer (28191|2051) we received event type 16 +Consumer (28191|2051) we received event type 16 +Consumer (28191|1026) we received event type 17 +Consumer (28191|2051) we received event type 16 + diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/RtECKokyu.mpc b/TAO/orbsvcs/examples/RtEC/Kokyu/RtECKokyu.mpc new file mode 100644 index 00000000000..88d36ead38f --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/RtECKokyu.mpc @@ -0,0 +1,3 @@ +// $Id$ +project: orbsvcsexe, rtkokyuevent, rtevent_serv { +} diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Service.cpp b/TAO/orbsvcs/examples/RtEC/Kokyu/Service.cpp new file mode 100644 index 00000000000..3a01a04e9c0 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Service.cpp @@ -0,0 +1,522 @@ +// $Id$ + +#include "orbsvcs/Sched/Reconfig_Scheduler.h" +#include "orbsvcs/Runtime_Scheduler.h" +//#include "orbsvcs/Event/Module_Factory.h" +//#include "orbsvcs/Event/Event_Channel.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Scheduler_Factory.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/EC_Kokyu_Factory.h" +#include "Consumer.h" +#include "Supplier.h" + + +#include "ace/Get_Opt.h" +#include "ace/Sched_Params.h" +#include "ace/Auto_Ptr.h" +#include "ace/SString.h" +#include "ace/OS_NS_strings.h" +#include "ace/Thread.h" + +ACE_RCSID(EC_Examples, Service, "$Id$") + +namespace +{ + int config_run = 0; + ACE_CString sched_type ="rms"; +} + +inline RtecScheduler::Period_t time_val_to_period (const ACE_Time_Value &tv) +{ + //100s of nanoseconds + return (tv.sec () * 1000000 + tv.usec ())*10; +} + +int parse_args (int argc, char *argv[]); + +typedef TAO_Reconfig_Scheduler RECONFIG_RMS_SCHED_TYPE; + +typedef TAO_Reconfig_Scheduler RECONFIG_MUF_SCHED_TYPE; + +int +main (int argc, char* argv[]) +{ + //TAO_EC_Default_Factory::init_svcs (); + + TAO_EC_Kokyu_Factory::init_svcs (); + + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-o IOR_file_name]\n")); + return 1; + } + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************************************************************** + + // Create an scheduling service + POA_RtecScheduler::Scheduler* sched_impl = 0; + + if (ACE_OS::strcasecmp(sched_type.c_str(), "rms") == 0) + { + ACE_DEBUG ((LM_DEBUG, "Creating RMS scheduler\n")); + ACE_NEW_RETURN (sched_impl, + RECONFIG_RMS_SCHED_TYPE, + 1); + } + else if (ACE_OS::strcasecmp(sched_type.c_str(), "muf") == 0) + { + ACE_DEBUG ((LM_DEBUG, "Creating MUF scheduler\n")); + ACE_NEW_RETURN (sched_impl, + RECONFIG_MUF_SCHED_TYPE, + 1); + } + + RtecScheduler::Scheduler_var scheduler = + sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************************************************************** + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + attributes.scheduler = scheduler.in (); // no need to dup + + TAO_EC_Event_Channel ec_impl (attributes); + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + // **************************************************************** + + // Create a consumer, intialize its RT_Info structures, and + // connnect to the event channel.... + + Consumer consumer_impl1, consumer_impl2; + + RtecScheduler::handle_t consumer1_rt_info = + scheduler->create ("consumer1" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecScheduler::handle_t consumer2_rt_info = + scheduler->create ("consumer2" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + //consumer's rate will get propagated from the supplier. + //so no need to specify a period here. Specifying + //criticality is crucial since it propagates from + //consumer to supplier. + ACE_Time_Value tv (0,0); + TimeBase::TimeT tmp; + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + scheduler->set (consumer1_rt_info, + RtecScheduler::VERY_LOW_CRITICALITY, + tmp, tmp, tmp, + time_val_to_period (tv), + RtecScheduler::VERY_LOW_IMPORTANCE, + tmp, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + scheduler->set (consumer2_rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + tmp, tmp, tmp, + time_val_to_period (tv), + RtecScheduler::VERY_HIGH_IMPORTANCE, + tmp, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ConsumerQOS_Factory consumer_qos1, consumer_qos2; + //consumer_qos.start_disjunction_group (); + // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are + // reserved for the EC... + consumer_qos1.insert_type (ACE_ES_EVENT_UNDEFINED, + consumer1_rt_info); + + RtecEventChannelAdmin::ConsumerQOS qos = + consumer_qos1.get_ConsumerQOS (); +/* + for (int i=0;ifor_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy1 = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy2 = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var consumer1 = + consumer_impl1._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var consumer2 = + consumer_impl2._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting consumers\n")); + ACE_DEBUG ((LM_DEBUG, "connecting consumer1\n")); + supplier_proxy1->connect_push_consumer (consumer1.in (), + consumer_qos1.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting consumer2\n")); + supplier_proxy2->connect_push_consumer (consumer2.in (), + consumer_qos2.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "consumers connected\n")); + + // **************************************************************** + + RtecScheduler::handle_t supplier1_rt_info = + scheduler->create ("supplier1" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecScheduler::handle_t supplier2_rt_info = + scheduler->create ("supplier2" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::EventSourceID supplier_id1 = 1, supplier_id2 = 2; + ACE_SupplierQOS_Factory supplier_qos1, supplier_qos2; + supplier_qos1.insert (supplier_id1, + ACE_ES_EVENT_UNDEFINED, + supplier1_rt_info, + 1 /* number of calls, but what does that mean? */); + supplier_qos2.insert (supplier_id2, + ACE_ES_EVENT_UNDEFINED + 1, + supplier2_rt_info, + 1 /* number of calls, but what does that mean? */); + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy1 = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy2 = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + Supplier supplier_impl1(supplier_id1, consumer_proxy1.in ()); + Supplier supplier_impl2(supplier_id2, consumer_proxy2.in ()); + + RtecEventComm::PushSupplier_var supplier1 = + supplier_impl1._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushSupplier_var supplier2 = + supplier_impl2._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting suppliers\n")); + ACE_DEBUG ((LM_DEBUG, "connecting supplier1\n")); + consumer_proxy1->connect_push_supplier (supplier1.in (), + supplier_qos1.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting supplier2\n")); + consumer_proxy2->connect_push_supplier (supplier2.in (), + supplier_qos2.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "suppliers connected\n")); + + // **************************************************************** + + //Timer Registration part + + //Timeout consumers for the two suppliers. + Timeout_Consumer timeout_consumer_impl1(&supplier_impl1); + Timeout_Consumer timeout_consumer_impl2(&supplier_impl2); + + RtecScheduler::handle_t supplier1_timeout_consumer_rt_info = + scheduler->create ("supplier1_timeout_consumer" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + //Period = 1sec + tv.set (1,0); + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + + scheduler->set (supplier1_timeout_consumer_rt_info, + RtecScheduler::VERY_LOW_CRITICALITY, + tmp, tmp, tmp, + time_val_to_period (tv), + RtecScheduler::VERY_LOW_IMPORTANCE, + tmp, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecScheduler::handle_t supplier2_timeout_consumer_rt_info = + scheduler->create ("supplier2_timeout_consumer" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + //Period = 3sec + tv.set (3, 0); + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + + scheduler->set (supplier2_timeout_consumer_rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + tmp, tmp, tmp, + time_val_to_period (tv), + RtecScheduler::VERY_HIGH_IMPORTANCE, + tmp, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ConsumerQOS_Factory timer_qos1, timer_qos2; + timer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + 10000000, //in 100s of nanosec + supplier1_timeout_consumer_rt_info); + timer_qos2.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + 30000000, //in 100s of nanosec + supplier2_timeout_consumer_rt_info); + + RtecEventChannelAdmin::ProxyPushSupplier_var timeout_supplier_proxy1 = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var timeout_supplier_proxy2 = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var safe_timeout_consumer1 = + timeout_consumer_impl1._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var safe_timeout_consumer2 = + timeout_consumer_impl2._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting timeout consumers\n")); + timeout_supplier_proxy1-> + connect_push_consumer (safe_timeout_consumer1.in (), + timer_qos1.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + timeout_supplier_proxy2-> + connect_push_consumer (safe_timeout_consumer2.in (), + timer_qos2.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "timeout consumers connected\n")); + + // **************************************************************** + //Registering dependency between timeout consumers and our suppliers + //with the scheduler + + scheduler->add_dependency (supplier1_timeout_consumer_rt_info, + supplier1_rt_info, + 1, + RtecBase::TWO_WAY_CALL + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + scheduler->add_dependency (supplier2_timeout_consumer_rt_info, + supplier2_rt_info, + 1, + RtecBase::TWO_WAY_CALL + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************************************************************** + + // At this point the consumer and supplier are connected to the + // EC, they have provided their QoS info to the Scheduling + // Service and the EC has informed the Scheduler about the + // dependencies between them. + // We can now compute the schedule for this configuration... + + // The schedule is returned in this variables.... + + ACE_DEBUG ((LM_DEBUG, "Computing schedule\n")); + RtecScheduler::RT_Info_Set_var infos; + RtecScheduler::Config_Info_Set_var configs; + RtecScheduler::Dependency_Set_var dependencies; + RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies; + RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + + scheduler->get_rt_info_set (infos.out() ); + scheduler->get_dependency_set (dependencies.out() ); + scheduler->get_config_info_set (configs.out() ); + + ACE_DEBUG ((LM_DEBUG, "Printing intermediate results\n")); + ACE_Scheduler_Factory::dump_schedule (infos.in (), + dependencies.in (), + configs.in (), + unsafe_anomalies, + "schedule.out"); + + // Obtain the range of valid priorities in the current + // platform, the scheduler hard-code this values in the + // generated file, but in the future we may just use the + // "logical" priorities and define the mapping to OS + // priorities at run-time. + int min_os_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + int max_os_priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + scheduler->compute_scheduling (min_os_priority, + max_os_priority, + infos.out (), + dependencies.out (), + configs.out (), + anomalies.out () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Dump the schedule to a file.. + ACE_Scheduler_Factory::dump_schedule (infos.in (), + dependencies.in (), + configs.in (), + anomalies.in (), + "schedule.out"); + + // **************************************************************** + ACE_DEBUG ((LM_DEBUG, "Pushing events\n")); + + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + + int prio = ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); + ACE_OS::thr_setprio (thr_handle, prio); + +// // Generate a few events.... +// RtecEventComm::EventSet event1 (1); +// event1.length (1); +// event1[0].header.type = ACE_ES_EVENT_UNDEFINED; +// event1[0].header.source = supplier_id1; +// event1[0].header.ttl = 1; + +// RtecEventComm::EventSet event2 (1); +// event2.length (1); +// event2[0].header.type = ACE_ES_EVENT_UNDEFINED + 1; +// event2[0].header.source = supplier_id2; +// event2[0].header.ttl = 1; + +// for (int i = 0; i != 200; ++i) +// { +// if (i % 2 == 0) +// { +// consumer_proxy1->push (event1 ACE_ENV_ARG_PARAMETER); +// ACE_TRY_CHECK; +// } +// else +// { +// consumer_proxy2->push (event2 ACE_ENV_ARG_PARAMETER); +// ACE_TRY_CHECK; +// } + +// ACE_Time_Value rate (0, 10000); +// ACE_OS::sleep (rate); +// } + + ACE_DEBUG ((LM_DEBUG, "(%t) activating EC\n")); + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "EC activated\n")); + + orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + + // **************************************************************** + + // We should do a lot of cleanup (disconnect from the EC, + // deactivate all the objects with the POA, etc.) but this is + // just a simple demo so we are going to be lazy. + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "cs:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 's': + sched_type = ACE_TEXT_ALWAYS_CHAR(get_opts.opt_arg ()); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s %s" + "\n", + argv [0], + "-s "), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.cpp new file mode 100644 index 00000000000..11b8f70666b --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.cpp @@ -0,0 +1,71 @@ +// $Id$ + +#include "Supplier.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/RtecEventCommC.h" + +ACE_RCSID(EC_Examples, Supplier, "$Id$") + +Supplier::Supplier (RtecEventComm::EventSourceID id, + const RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy) +:id_ (id), + consumer_proxy_ (consumer_proxy) +{ +} + +void +Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL) +{ + RtecEventComm::EventSet event (1); + if (id_ == 1) + { + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = id_; + event[0].header.ttl = 1; + } + else + { + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED + 1; + event[0].header.source = id_; + event[0].header.ttl = 1; + } + + consumer_proxy_->push (event ACE_ENV_ARG_PARAMETER); +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +Timeout_Consumer::Timeout_Consumer (Supplier* supplier) + :supplier_impl_ (supplier) +{ +} + +void +Timeout_Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "TimeoutConsumer (%t) no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "(%t) Timeout Event received\n")); + supplier_impl_->timeout_occured (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +Timeout_Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.h b/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.h new file mode 100644 index 00000000000..977586a3ec0 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/Supplier.h @@ -0,0 +1,87 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Supplier +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/Event/EC_Event_Channel.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Supplier : public POA_RtecEventComm::PushSupplier +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // This class is a supplier of events. + // It simply register for two event typesone event type + // The class is just a helper to simplify common tasks in EC + // tests, such as subscribing for a range of events, disconnecting + // from the EC, informing the driver of shutdown messages, etc. + // + // There are several ways to connect and disconnect this class, + // and it is up to the driver program to use the right one. + // +public: + Supplier (RtecEventComm::EventSourceID id, + const RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy); + // Constructor + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + + void timeout_occured (ACE_ENV_SINGLE_ARG_DECL); + +private: + RtecEventComm::EventSourceID id_; + const RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy_; +}; + +class Timeout_Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Timer consumer object + // + // = DESCRIPTION + // This class is a consumer of timeout events from EC. + // It registers for timeout event with EC and calls + // the + // +public: + Timeout_Consumer (Supplier * supplier_impl); + // Constructor + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + Supplier *supplier_impl_; +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf b/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf new file mode 100644 index 00000000000..60b006dd1ef --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf @@ -0,0 +1,3 @@ +# $Id$ +#change SCHED_OTHER to SCHED_FIFO or SCHED_RR to run in Real time class +static EC_Factory "-ECProxyPushConsumerCollection mt:immediate:list -ECProxyPushSupplierCollection mt:immediate:list -ECdispatching kokyu SCHED_OTHER -ECscheduling kokyu -ECfiltering kokyu -ECproxyconsumerlock thread -ECproxysupplierlock thread -ECsupplierfiltering per-supplier" diff --git a/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf.xml b/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf.xml new file mode 100644 index 00000000000..41409e40ec9 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Kokyu/svc.conf.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.cpp b/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.cpp new file mode 100644 index 00000000000..05fd4d9c983 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "AddrServer.h" + +ACE_RCSID(EC_Examples, AddrServer, "$Id$") + +AddrServer::AddrServer (const RtecUDPAdmin::UDP_Addr& addr) + : addr_ (addr) +{ +} + +void +AddrServer::get_addr (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Addr_out addr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + addr = this->addr_; +} diff --git a/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.h b/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.h new file mode 100644 index 00000000000..8439914f22b --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/AddrServer.h @@ -0,0 +1,53 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef ADDRSERVER_H +#define ADDRSERVER_H +#include /**/ "ace/pre.h" + +#include "orbsvcs/RtecUDPAdminS.h" + +class AddrServer : public POA_RtecUDPAdmin::AddrServer +{ + // = TITLE + // A simple AddrServer + // + // = DESCRIPTION + // The EC is able to use multiple multicast groups to transmit its + // data, the is given control over the mapping between the Event + // (type,source) pair and the (ipaddr,port) pair using a + // AddrServer. + // This class implements a very simple server that simply maps the + // component to the and uses a fixed , + // provided at initialization time. + // +public: + AddrServer (const RtecUDPAdmin::UDP_Addr& addr); + // Constructor + + // = The RtecUDPAdmin::AddrServer methods + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr_out addr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + RtecUDPAdmin::UDP_Addr addr_; + // The address +}; + +#include /**/ "ace/post.h" +#endif /* ADDRSERVER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/MCast/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/MCast/Consumer.cpp new file mode 100644 index 00000000000..b85abca57ee --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/Consumer.cpp @@ -0,0 +1,104 @@ +// $Id$ + +#include "Consumer.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID (EC_Examples, + Consumer, + "$Id$") + +Consumer::Consumer (void) + : event_count_ (0) +{ +} + +void +Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) +{ + this->proxy_ = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushConsumer_var me = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Simple subscription, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::ConsumerQOS qos; + qos.is_gateway = 0; + + qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = 1; // The disjunction has one element + + RtecEventComm::EventHeader& h1 = + qos.dependencies[1].event.header; + h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h1.source = ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + this->proxy_->connect_push_consumer (me.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_TRY + { + // Disconnect from the proxy + this->proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Ignore exceptions + } + ACE_ENDTRY; + this->proxy_ = RtecEventChannelAdmin::ProxyPushSupplier::_nil (); + + // Deactivate this object + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + this->event_count_ += events.length (); + if (this->event_count_ % 100 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t): %d events received\n", + this->event_count_)); + } +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/MCast/Consumer.h b/TAO/orbsvcs/examples/RtEC/MCast/Consumer.h new file mode 100644 index 00000000000..536404b824b --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/Consumer.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // This class is a consumer of events. + // It simply subscribes to one event type. + // +public: + Consumer (void); + // Constructor + + void connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL); + // Connect to the Event Channel + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the event channel + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + CORBA::ULong event_count_; + // Keep track of the number of events received. + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_; + // The proxy +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp b/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp new file mode 100644 index 00000000000..f61cb958eeb --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp @@ -0,0 +1,379 @@ +// $Id$ + + +#include "Consumer.h" +#include "Supplier.h" +#include "AddrServer.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "tao/ORB_Core.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (EC_Examples, + MCast, + "$Id$") + +const char *udp_mcast_address = + ACE_DEFAULT_MULTICAST_ADDR ":10001"; + +int parse_args (int argc, char *argv[]); + +int +main (int argc, char* argv[]) +{ + // Register the default factory in the Service Configurator. + // If your platform supports static constructors then you can + // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO + // must run on platforms where static constructors do not work well, + // so we have to explicitly invoke this function. + TAO_EC_Default_Factory::init_svcs (); + + // The exception macros are described in $ACE_ROOT/docs/exceptions.html + // and defined in $ACE_ROOT/ace/CORBA_macros.h. + // If your platform supports native exceptions, and TAO was compiled + // with native exception support then you can simply use try/catch + // and avoid the ACE_ENV_SINGLE_ARG_PARAMETER argument. + // Unfortunately many embedded systems cannot use exceptions due to + // the space and time overhead. + // + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // **************** HERE STARTS THE ORB SETUP + + // Create the ORB, pass the argv list for parsing. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Parse the arguments, you usually want to do this after + // invoking ORB_init() because ORB_init() will remove all the + // -ORB options from the command line. + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-m udp_mcast_addr]\n")); + return 1; + } + + // This is the standard code to get access to the POA and + // activate it. + // The POA starts in the holding state, if it is not activated + // it will not process any requests. + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETS THE ORB SETUP + + // **************** HERE START THE LOCAL EVENT CHANNEL SETUP + + // This structure is used to define the startup time event + // channel configuration. + // This structure is described in + // + // $TAO_ROOT/docs/ec_options.html + // + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + + // Create the Event Channel implementation class + TAO_EC_Event_Channel ec_impl (attributes); + + // Activate the Event Channel, depending on the configuration + // that may involve creating some threads. + // But it should always be invoked because several internal data + // structures are initialized at that point. + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The event channel is activated as any other CORBA servant. + // In this case we use the simple implicit activation with the + // RootPOA + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP + + // **************** HERE STARTS THE FEDERATION SETUP + + // The next step is to setup the multicast gateways. + // There are two gateways involved, one sends the locally + // generated events to the federated peers, the second gateway + // receives multicast traffic and turns it into local events. + + // The sender requires a helper object to select what + // multicast group will carry what traffic, this is the + // so-called 'Address Server'. + // The intention is that advanced applications can use different + // multicast groups for different events, this can exploit + // network interfaces that filter unwanted multicast traffic. + // The helper object is accessed through an IDL interface, so it + // can reside remotely. + // In this example, and in many application, using a fixed + // multicast group is enough, and a local address server is the + // right approach. + + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (udp_mcast_address); + ACE_DEBUG ((LM_DEBUG, + "Multicast address is: %s\n", + udp_mcast_address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + AddrServer as_impl (addr); + RtecUDPAdmin::AddrServer_var address_server = + as_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + // Now we setup the sender: + TAO_EC_Servant_Var sender = TAO_ECG_UDP_Sender::create(); + sender->init (event_channel.in (), + address_server.in (), + endpoint + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Now we connect the sender as a consumer of events, it will + // receive any event from any source and send it to the "right" + // multicast group, as defined by the address server set above: + RtecEventChannelAdmin::ConsumerQOS sub; + sub.is_gateway = 1; + + sub.dependencies.length (1); + sub.dependencies[0].event.header.type = + ACE_ES_EVENT_ANY; // first free event type + sub.dependencies[0].event.header.source = + ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + sender->connect (sub ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // To receive events we need to setup an event handler: + TAO_EC_Servant_Var receiver = TAO_ECG_UDP_Receiver::create(); + TAO_ECG_Mcast_EH mcast_eh (&(*receiver)); + + // The event handler uses the ORB reactor to wait for multicast + // traffic: + mcast_eh.reactor (orb->orb_core ()->reactor ()); + + // The multicast Event Handler needs to know to what multicast + // groups it should listen to. To do so it becomes an observer + // with the event channel, to determine the list of events + // required by all the local consumer. + // Then it register for the multicast groups that carry those + // events: + mcast_eh.open (event_channel.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Again the receiver connects to the event channel as a + // supplier of events, using the Observer features to detect + // local consumers and their interests: + receiver->init (event_channel.in (), + endpoint, + address_server.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The Receiver is also a supplier of events. The exact type of + // events is only known to the application, because it depends + // on the traffic carried by all the multicast groups that the + // different event handlers subscribe to. + // In this example we choose to simply describe our publications + // using wilcards, any event from any source. More advanced + // application could use the Observer features in the event + // channel to update this information (and reduce the number of + // multicast groups that each receive subscribes to). + // In a future version the event channel could perform some of + // those tasks automatically + RtecEventChannelAdmin::SupplierQOS pub; + pub.publications.length (1); + pub.publications[0].event.header.type = ACE_ES_EVENT_ANY; + pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + pub.is_gateway = 1; + + receiver->connect (pub ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE FEDERATION SETUP + + // **************** HERE STARTS THE CLIENT SETUP + + // First let us create a consumer and connect it to the event + // channel + Consumer consumer; + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + consumer.connect (consumer_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // And now create a supplier + Supplier supplier; + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + supplier.connect (supplier_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLIENT SETUP + + // **************** HERE STARTS THE EVENT LOOP + + // Wait for events, including incoming multicast data. + // We could also use orb->run(), but that will not let us + // terminate the application in a nice way. + for (int i = 0; i != 1000; ++i) + { + CORBA::Boolean there_is_work = + orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + if (there_is_work) + { + // We use a TAO extension. The CORBA mechanism does not + // provide any decent way to control the duration of + // perform_work() or work_pending(), so just calling + // them results in a spin loop. + ACE_Time_Value tv (0, 50000); + orb->perform_work (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_Time_Value tv (0, 100000); + ACE_OS::sleep (tv); + supplier.perform_push (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // **************** THAT COMPLETES THE EVENT LOOP + + // **************** HERE STARTS THE CLEANUP CODE + + // First the easy ones + supplier.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + consumer.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Now let us close the Receiver + receiver->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + int r = mcast_eh.shutdown (); + + if (r == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Closing MCast event handler\n"), 1); + } + + // And also close the sender of events + sender->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The event channel must be destroyed, so it can release its + // resources, and inform all the clients that are still + // connected that it is going away. + event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Deactivating the event channel implementation is not strictly + // required, the POA will do it for us, but it is good manners: + { + // Using _this() activates with the default POA, we must gain + // access to that POA to deactivate the object. + // Notice that we 'know' that the default POA for this servant + // is the root POA, but the code is more robust if we don't + // rely on that. + PortableServer::POA_var poa = + ec_impl._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (&ec_impl ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // Now we can destroy the POA, the flags mean that we want to + // wait until the POA is really destroyed + poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Finally destroy the ORB + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLEANUP CODE + + ACE_DEBUG ((LM_DEBUG, + "MCast example terminated\n")); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "m:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'm': + udp_mcast_address = get_opts.opt_arg (); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "[-m udp_mcast_address]" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/MCast/Makefile.am b/TAO/orbsvcs/examples/RtEC/MCast/Makefile.am new file mode 100644 index 00000000000..48805140016 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/Makefile.am @@ -0,0 +1,66 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + + +## Makefile.RtEC_MCast.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS = MCast + +MCast_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +MCast_SOURCES = \ + AddrServer.cpp \ + Consumer.cpp \ + MCast.cpp \ + Supplier.cpp \ + AddrServer.h \ + Consumer.h \ + Supplier.h + +MCast_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTSched.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/examples/RtEC/MCast/README b/TAO/orbsvcs/examples/RtEC/MCast/README new file mode 100644 index 00000000000..55aad804e20 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/README @@ -0,0 +1,26 @@ +# $Id$ + + This directory contains a very simple example of a +multicast-based federation of event services. + + The example is a single process that contains: + +1) An event service +2) A supplier +3) A consumer +4) The gateways required to send and receive data through the + multicast group. + + The tests should be executed as follows: + +$ MCast + + If you need to set the multicast group and port you can use +the -m option: + +$ MCast -m 224.100.2.1:12345 + + Run the test in multiple machines on the same network. If +there is only one process you should only receive 1000 events in the +local consumer. If there is more than one machine you should receive +more events. diff --git a/TAO/orbsvcs/examples/RtEC/MCast/RtEC_MCast.mpc b/TAO/orbsvcs/examples/RtEC/MCast/RtEC_MCast.mpc new file mode 100644 index 00000000000..ebe69a8ddbc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/RtEC_MCast.mpc @@ -0,0 +1,5 @@ +// -*- MPC -*- +// $Id$ + +project : orbsvcsexe, rtevent_serv, rtsched { +} diff --git a/TAO/orbsvcs/examples/RtEC/MCast/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/MCast/Supplier.cpp new file mode 100644 index 00000000000..f835dcb6daa --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/Supplier.cpp @@ -0,0 +1,94 @@ +// $Id$ + +#include "Supplier.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +Supplier::Supplier (void) +{ +} + +void +Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin + ACE_ENV_ARG_DECL) +{ + this->proxy_ = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushSupplier_var me = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Simple publication, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::SupplierQOS qos; + qos.is_gateway = 0; + + qos.publications.length (1); + RtecEventComm::EventHeader& h0 = + qos.publications[0].event.header; + h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h0.source = 1; // first free event source + + this->proxy_->connect_push_supplier (me.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + // Disconnect from the EC + ACE_TRY + { + this->proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Supplier::perform_push (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_TRY + { + // The event type and source must match our publications + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = 1; + // Avoid loops throught the event channel federations + event[0].header.ttl = 1; + + this->proxy_->push (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/MCast/Supplier.h b/TAO/orbsvcs/examples/RtEC/MCast/Supplier.h new file mode 100644 index 00000000000..7c591f5fdb7 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/Supplier.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Supplier +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Supplier : public POA_RtecEventComm::PushSupplier +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // This class is a supplier of events. + // It simply publishes one event type, when the perform_push() + // method is invoked it pushes the event through the event service + // +public: + Supplier (void); + // Constructor + + void connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin + ACE_ENV_ARG_DECL); + // Connect to the event channel + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the event channel + + void perform_push (ACE_ENV_SINGLE_ARG_DECL); + // Push a single event + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_; + // The proxy +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/MCast/svc.conf b/TAO/orbsvcs/examples/RtEC/MCast/svc.conf new file mode 100644 index 00000000000..d0297d4649e --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/svc.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECObserver basic -ECProxyPushConsumerCollection mt:copy_on_write:list -ECProxyPushSupplierCollection mt:copy_on_write:list -ECDispatching reactive -ECScheduling null -ECFiltering prefix -ECSupplierFilter per-supplier" diff --git a/TAO/orbsvcs/examples/RtEC/MCast/svc.conf.xml b/TAO/orbsvcs/examples/RtEC/MCast/svc.conf.xml new file mode 100644 index 00000000000..159faa97abc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/svc.conf.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/Makefile.am b/TAO/orbsvcs/examples/RtEC/Makefile.am new file mode 100644 index 00000000000..7fc730d6850 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Makefile.am @@ -0,0 +1,17 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +SUBDIRS = \ + IIOPGateway \ + Kokyu \ + MCast \ + Schedule \ + Simple + diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.cpp new file mode 100644 index 00000000000..4f23249c145 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.cpp @@ -0,0 +1,32 @@ +// $Id$ + +#include "Consumer.h" + +ACE_RCSID(EC_Examples, Consumer, "$Id$") + +Consumer::Consumer (void) +{ +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "Consumer (%P|%t) we received event type %d\n", + events[0].header.type)); +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.h b/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.h new file mode 100644 index 00000000000..bdbdbaad894 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Consumer.h @@ -0,0 +1,55 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // This class is a consumer of events. + // It simply register for two event typesone event type + // The class is just a helper to simplify common tasks in EC + // tests, such as subscribing for a range of events, disconnecting + // from the EC, informing the driver of shutdown messages, etc. + // + // There are several ways to connect and disconnect this class, + // and it is up to the driver program to use the right one. + // +public: + Consumer (void); + // Constructor + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Makefile.am b/TAO/orbsvcs/examples/RtEC/Schedule/Makefile.am new file mode 100644 index 00000000000..84401da731f --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Makefile.am @@ -0,0 +1,65 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + + +## Makefile.RtEC_Schedule.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS = Service + +Service_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Service_SOURCES = \ + Consumer.cpp \ + Service.cpp \ + Supplier.cpp \ + Consumer.h \ + Schedule.h \ + Supplier.h + +Service_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTSched.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/README b/TAO/orbsvcs/examples/RtEC/Schedule/README new file mode 100644 index 00000000000..8435319a51b --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/README @@ -0,0 +1,23 @@ +# $Id$ + + Shows how to use the scheduling service in conjunction with +the real-time event channel. The test has a single consumer that +subscribes for two event types, using different RT_Infos for each. A +supplier pushes those events also using different RT_Infos. + The event-channel cooperates with the scheduling service to +compute a schedule and assign priorities to each event. The event +channel will use different queues for those events, each queue +serviced by threads at different priorities. + + The example can be run using a pre-computed schedule: + +$ ./Service + + or it can generate the schedule again: + +$ ./Service -c + + + NOTE: the current version uses the old event channel because +we haven't coimpleted the integration between the new EC and the +scheduling service. diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/RtEC_Schedule.mpc b/TAO/orbsvcs/examples/RtEC/Schedule/RtEC_Schedule.mpc new file mode 100644 index 00000000000..ebe69a8ddbc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/RtEC_Schedule.mpc @@ -0,0 +1,5 @@ +// -*- MPC -*- +// $Id$ + +project : orbsvcsexe, rtevent_serv, rtsched { +} diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Schedule.h b/TAO/orbsvcs/examples/RtEC/Schedule/Schedule.h new file mode 100644 index 00000000000..3a6d4463fa3 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Schedule.h @@ -0,0 +1,42 @@ +// $Id$ + +// This file was automatically generated by the Scheduler_Factory. +// Before editing the file please consider generating it again. + +#include "orbsvcs/Scheduler_Factory.h" + + +// There were no scheduling anomalies. + + +static ACE_Scheduler_Factory::POD_RT_Info infos[] = { +{"Dispatching_Task-250000.us", 1, 0, 0, 0, 250000, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 1, 58, 4, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"Dispatching_Task-500000.us", 2, 0, 0, 0, 500000, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 1, 58, 5, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"Dispatching_Task-1000000.us", 3, 0, 0, 0, 1000000, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 1, 58, 6, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"Dispatching_Task-2000000.us", 4, 0, 0, 0, 2000000, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 1, 58, 7, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"Dispatching_Task-10000000.us", 5, 0, 0, 0, 10000000, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 1, 58, 8, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{ "consumer_event_1", 6, 20000, 20000, 20000, 0, (RtecScheduler::Criticality_t) 4, (RtecScheduler::Importance_t) 0, 20000, 0, 59, 0, 0, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{ "consumer_event_2", 7, 10000, 10000, 10000, 0, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 10000, 0, 58, 1, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"(consumer_event_1#rep||consumer_event_2#rep)", 8, 0, 0, 0, 0, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 0, 58, 3, 1, (RtecScheduler::Info_Type_t) 2 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"consumer_event_1#rep", 9, 0, 0, 0, 0, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 0, 58, 0, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{"consumer_event_2#rep", 10, 0, 0, 0, 0, (RtecScheduler::Criticality_t) 0, (RtecScheduler::Importance_t) 0, 0, 0, 58, 2, 1, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{ "supplier_event_1", 11, 0, 0, 0, 100000, (RtecScheduler::Criticality_t) 4, (RtecScheduler::Importance_t) 0, 0, 1, 59, 1, 0, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED }, +{ "supplier_event_2", 12, 0, 0, 0, 200000, (RtecScheduler::Criticality_t) 4, (RtecScheduler::Importance_t) 0, 0, 1, 59, 2, 0, (RtecScheduler::Info_Type_t) 0 , (RtecScheduler::RT_Info_Enabled_Type_t) RtecScheduler::RT_INFO_ENABLED } +}; + +static int infos_size = sizeof(infos)/sizeof(infos[0]); + + +static ACE_Scheduler_Factory::POD_Config_Info configs[] = { + { 0, 59, (RtecScheduler::Dispatching_Type_t) 2 }, + { 1, 58, (RtecScheduler::Dispatching_Type_t) 2 } +}; + +static int configs_size = sizeof(configs)/sizeof(configs[0]); + + +// This sets up Scheduler_Factory to use the runtime version. +int scheduler_factory_setup = + ACE_Scheduler_Factory::use_runtime (configs_size, configs, infos_size, infos); + +// EOF diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp b/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp new file mode 100644 index 00000000000..bb27f0bad68 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp @@ -0,0 +1,408 @@ +// $Id$ + +#include "orbsvcs/Sched/Reconfig_Scheduler.h" +#include "orbsvcs/Runtime_Scheduler.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "Consumer.h" +#include "Supplier.h" + +#include "Schedule.h" + +#include "ace/Get_Opt.h" +#include "ace/Sched_Params.h" +#include "ace/Auto_Ptr.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(EC_Examples, Service, "$Id$") + +int config_run = 0; + +int parse_args (int argc, char *argv[]); + +typedef TAO_Reconfig_Scheduler RECONFIG_SCHED_TYPE; + +int +main (int argc, char* argv[]) +{ + TAO_EC_Default_Factory::init_svcs (); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-o IOR_file_name]\n")); + return 1; + } + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************************************************************** + +#if 0 + // Obtain a reference to the naming service... + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; +#endif /* 0 */ + + // **************************************************************** + + // Create an scheduling service + POA_RtecScheduler::Scheduler* sched_impl = 0; + if (config_run) + { + ACE_NEW_RETURN (sched_impl, + RECONFIG_SCHED_TYPE, + 1); + } + else + { + ACE_NEW_RETURN (sched_impl, + RECONFIG_SCHED_TYPE (configs_size, + configs, + infos_size, + infos, + 0, 0, + 0), + 1); + } + + RtecScheduler::Scheduler_var scheduler = + sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + +#if 0 + // Bind the scheduler with the naming service so clients + // (consumers and suppliers) can resolve it, some (old) + // implementations of the EC will try to do the same thing + // (yikes!) + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = CORBA::string_dup ("ScheduleService"); + // Register the servant with the Naming Context.... + naming_context->rebind (schedule_name, scheduler.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; +#endif /* 0 */ + + // **************************************************************** + + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + attributes.scheduler = scheduler.in (); // no need to dup + + TAO_EC_Event_Channel ec_impl (attributes); + ACE_DEBUG ((LM_DEBUG, "activating EC\n")); + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "EC activated\n")); + + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // **************************************************************** + + // Create a consumer, intialize its RT_Info structures, and + // connnect to the event channel.... + + Consumer consumer_impl; + + RtecScheduler::handle_t consumer_rt_info1 = + scheduler->create ("consumer_event_1" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Let's say that the execution time for event 1 is 2 + // milliseconds... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (consumer_rt_info1, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecScheduler::handle_t consumer_rt_info2 = + scheduler->create ("consumer_event_2" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Let's say that the execution time for event 2 is 1 + // milliseconds... + tv.set (0, 1000); + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (consumer_rt_info2, + RtecScheduler::VERY_LOW_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ConsumerQOS_Factory consumer_qos; + consumer_qos.start_disjunction_group (); + // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are + // reserved for the EC... + consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED, + consumer_rt_info1); + consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED + 1, + consumer_rt_info2); + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var consumer = + consumer_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting consumer\n")); + supplier_proxy->connect_push_consumer (consumer.in (), + consumer_qos.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "consumer connected\n")); + + // **************************************************************** + + Supplier supplier_impl; + + RtecScheduler::handle_t supplier_rt_info1 = + scheduler->create ("supplier_event_1" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + tv.set (0, 10000); + TimeBase::TimeT tmp; + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + RtecScheduler::Period_t rate = ACE_U64_TO_U32(tmp); + + scheduler->set (supplier_rt_info1, + RtecScheduler::VERY_HIGH_CRITICALITY, + 0, 0, 0, + rate, + RtecScheduler::VERY_LOW_IMPORTANCE, + 0, + 1, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecScheduler::handle_t supplier_rt_info2 = + scheduler->create ("supplier_event_2" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + tv.set (0, 20000); + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + rate = ACE_U64_TO_U32(tmp); + + scheduler->set (supplier_rt_info2, + RtecScheduler::VERY_HIGH_CRITICALITY, + 0, 0, 0, + rate, + RtecScheduler::VERY_LOW_IMPORTANCE, + 0, + 1, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::EventSourceID supplier_id = 1; + ACE_SupplierQOS_Factory supplier_qos; + supplier_qos.insert (supplier_id, + ACE_ES_EVENT_UNDEFINED, + supplier_rt_info1, + 1 /* number of calls, but what does that mean? */); + supplier_qos.insert (supplier_id, + ACE_ES_EVENT_UNDEFINED + 1, + supplier_rt_info2, + 1 /* number of calls, but what does that mean? */); + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushSupplier_var supplier = + supplier_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "connecting supplier\n")); + consumer_proxy->connect_push_supplier (supplier.in (), + supplier_qos.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "supplier connected\n")); + + // **************************************************************** + + // At this point the consumer and supplier are connected to the + // EC, they have provided their QoS info to the Scheduling + // Service and the EC has informed the Scheduler about the + // dependencies between them. + // We can now compute the schedule for this configuration... + + // The schedule is returned in this variables.... + + if (config_run) + { + ACE_DEBUG ((LM_DEBUG, "Computing schedule\n")); + RtecScheduler::RT_Info_Set_var infos; + RtecScheduler::Dependency_Set_var deps; + RtecScheduler::Config_Info_Set_var configs; + RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + + // Obtain the range of valid priorities in the current + // platform, the scheduler hard-code this values in the + // generated file, but in the future we may just use the + // "logical" priorities and define the mapping to OS + // priorities at run-time. + int min_os_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + int max_os_priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + scheduler->compute_scheduling (min_os_priority, + max_os_priority, + infos.out (), + deps.out (), + configs.out (), + anomalies.out () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Dump the schedule to a file.. + ACE_Scheduler_Factory::dump_schedule (infos.in (), + deps.in (), + configs.in (), + anomalies.in (), + "schedule.out"); + } + + // **************************************************************** + + ACE_DEBUG ((LM_DEBUG, "Pushing events\n")); + + // Generate a few events.... + + RtecEventComm::EventSet event1 (1); + event1.length (1); + event1[0].header.type = ACE_ES_EVENT_UNDEFINED; + event1[0].header.source = supplier_id; + event1[0].header.ttl = 1; + + RtecEventComm::EventSet event2 (1); + event2.length (1); + event2[0].header.type = ACE_ES_EVENT_UNDEFINED + 1; + event2[0].header.source = supplier_id; + event2[0].header.ttl = 1; + + for (int i = 0; i != 200; ++i) + { + if (i % 2 == 0) + { + consumer_proxy->push (event1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + { + consumer_proxy->push (event2 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + ACE_Time_Value rate (0, 10000); + ACE_OS::sleep (rate); + } + + // **************************************************************** + + // We should do a lot of cleanup (disconnect from the EC, + // deactivate all the objects with the POA, etc.) but this is + // just a simple demo so we are going to be lazy. + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "c"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'c': + config_run = 1; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-c (config run)" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.cpp new file mode 100644 index 00000000000..947378d616a --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.cpp @@ -0,0 +1,18 @@ +// $Id$ + +#include "Supplier.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +Supplier::Supplier (void) +{ +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.h b/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.h new file mode 100644 index 00000000000..b0391f7602b --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Supplier.h @@ -0,0 +1,54 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Supplier +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Supplier : public POA_RtecEventComm::PushSupplier +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // This class is a supplier of events. + // It simply register for two event typesone event type + // The class is just a helper to simplify common tasks in EC + // tests, such as subscribing for a range of events, disconnecting + // from the EC, informing the driver of shutdown messages, etc. + // + // There are several ways to connect and disconnect this class, + // and it is up to the driver program to use the right one. + // +public: + Supplier (void); + // Constructor + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf b/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf new file mode 100644 index 00000000000..c0bbc7fc673 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECProxyPushConsumerCollection mt:immediate:list -ECProxyPushSupplierCollection mt:immediate:list -ECdispatching priority -ECscheduling priority -ECfiltering priority -ECproxyconsumerlock thread -ECproxysupplierlock thread -ECsupplierfiltering per-supplier" diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf.xml b/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf.xml new file mode 100644 index 00000000000..8d634c164f8 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Schedule/svc.conf.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Simple/Consumer.cpp new file mode 100644 index 00000000000..e8af9a77ed8 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Consumer.cpp @@ -0,0 +1,162 @@ +// $Id$ + +#include "Consumer.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/CosNamingC.h" + +ACE_RCSID (EC_Examples, + Consumer, + "$Id$") + +int +main (int argc, char* argv[]) +{ + Consumer consumer; + + return consumer.run (argc, argv); +} + +// **************************************************************** + +Consumer::Consumer (void) + : event_count_ (0) +{ +} + +int +Consumer::run (int argc, char* argv[]) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Do *NOT* make a copy because we don't want the ORB to outlive + // the run() method. + this->orb_ = orb.in (); + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the event channel from the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup ("EventService"); + + CORBA::Object_var ec_obj = + naming_context->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushConsumer_var consumer = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Simple subscription, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::ConsumerQOS qos; + qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = ACE_ES_EVENT_SOURCE_ANY; + + RtecEventComm::EventHeader& h1 = + qos.dependencies[1].event.header; + h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h1.source = ACE_ES_EVENT_SOURCE_ANY; + + supplier->connect_push_consumer (consumer.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Wait for events, using work_pending()/perform_work() may help + // or using another thread, this example is too simple for that. + orb->run (); + + // We don't do any cleanup, it is hard to do it after shutdown, + // and would complicate the example; plus it is almost + // impossible to do cleanup after ORB->run() because the POA is + // in the holding state. Applications should use + // work_pending()/perform_work() to do more interesting stuff. + // Check the supplier for the proper way to do cleanup. + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer::run"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +void +Consumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t) no events\n")); + return; + } + + this->event_count_ += events.length (); + if (this->event_count_ % 100 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Consumer (%P|%t): %d events received\n", + this->event_count_)); + } +} + +void +Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // In this example we shutdown the ORB when we disconnect from the + // EC (or rather the EC disconnects from us), but this doesn't have + // to be the case.... + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); +} + diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Consumer.h b/TAO/orbsvcs/examples/RtEC/Simple/Consumer.h new file mode 100644 index 00000000000..19005b96ec1 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Consumer.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Consumer +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef CONSUMER_H +#define CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Consumer : public POA_RtecEventComm::PushConsumer +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // This class is a consumer of events. + // It simply registers for one event type. + // +public: + Consumer (void); + // Constructor + + int run (int argc, char* argv[]); + // Run the test + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + CORBA::ULong event_count_; + // Keep track of the number of events received. + + CORBA::ORB_ptr orb_; + // The orb, just a pointer because the ORB does not outlive the + // run() method... +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Makefile.am b/TAO/orbsvcs/examples/RtEC/Simple/Makefile.am new file mode 100644 index 00000000000..d9066e3eddc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Makefile.am @@ -0,0 +1,130 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.RtEC_Simple_Consumer.am + +if BUILD_CORBA_MESSAGING + +noinst_PROGRAMS += Consumer + +Consumer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Consumer_SOURCES = \ + Consumer.cpp \ + Consumer.h + +Consumer_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_CORBA_MESSAGING + +## Makefile.RtEC_Simple_Service.am + +if BUILD_CORBA_MESSAGING +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += Service + +Service_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Service_SOURCES = \ + Service.cpp \ + Consumer.h \ + Supplier.h + +Service_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_CORBA_MESSAGING + +## Makefile.RtEC_Simple_Supplier.am + +if BUILD_CORBA_MESSAGING + +noinst_PROGRAMS += Supplier + +Supplier_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +Supplier_SOURCES = \ + Supplier.cpp \ + Supplier.h + +Supplier_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/examples/RtEC/Simple/README b/TAO/orbsvcs/examples/RtEC/Simple/README new file mode 100644 index 00000000000..96dae4b1cca --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/README @@ -0,0 +1,15 @@ +# $Id$ + + This directory contains possibly the most simple example of +the real-time event service. It contains three executables; a +consumer, supplier and a program to create the event channel itself. + + Run using the run_test.pl script or: + +$ ../../../Naming_Service/Naming_Service +$ ./Service +$ ./Consumer +$ ./Supplier + + more advanced tests are available in +$TAO_ROOT/orbsvcs/tests/Event and $TAO_ROOT/orbsvcs/tests/EC_* diff --git a/TAO/orbsvcs/examples/RtEC/Simple/RtEC_Simple.mpc b/TAO/orbsvcs/examples/RtEC/Simple/RtEC_Simple.mpc new file mode 100644 index 00000000000..b98ff0f88e4 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/RtEC_Simple.mpc @@ -0,0 +1,20 @@ +// -*- MPC -*- +// $Id$ + +project(*Service) : orbsvcsexe, rtevent_serv, naming { + source_files { + Service.cpp + } +} + +project(*Supplier) : orbsvcsexe, rtevent_skel, naming { + source_files { + Supplier.cpp + } +} + +project(*Consumer) : orbsvcsexe, rtevent_skel, naming { + source_files { + Consumer.cpp + } +} diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp b/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp new file mode 100644 index 00000000000..a3bd5e5740d --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp @@ -0,0 +1,141 @@ +// $Id$ + +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "ace/Get_Opt.h" +#include "orbsvcs/CosNamingC.h" +#include "ace/OS_NS_stdio.h" + +ACE_RCSID (EC_Examples, + Service, + "$Id$") + +const char *ior_output_file = "ec.ior"; + +int parse_args (int argc, char *argv[]); + +int +main (int argc, char* argv[]) +{ + TAO_EC_Default_Factory::init_svcs (); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + + TAO_EC_Event_Channel ec_impl (attributes); + ec_impl.activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Create a name. + CosNaming::Name name; + name.length (1); + name[0].id = CORBA::string_dup ("EventService"); + name[0].kind = CORBA::string_dup (""); + + // Register with the name server + naming_context->bind (name, event_channel.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Example code: How to write ior to file + CORBA::String_var ior = + orb->object_to_string (event_channel.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); + // If the ior_output_file exists, output the ior to it + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + // Wait for events, using work_pending()/perform_work() may help + // or using another thread, this example is too simple for that. + orb->run (); + + // We don't do any cleanup, it is hard to do it after shutdown, + // and would complicate the example; plus it is almost + // impossible to do cleanup after ORB->run() because the POA is + // in the holding state. Applications should use + // work_pending()/perform_work() to do more interesting stuff. + // Check the supplier for the proper way to do cleanup. + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Simple/Supplier.cpp new file mode 100644 index 00000000000..21ced08639e --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Supplier.cpp @@ -0,0 +1,152 @@ +// $Id$ + +#include "Supplier.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/CosNamingC.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (EC_Examples, + Supplier, + "$Id$") + +int +main (int argc, char* argv[]) +{ + Supplier supplier; + + return supplier.run (argc, argv); +} + +// **************************************************************** + +Supplier::Supplier (void) +{ +} + +int +Supplier::run (int argc, char* argv[]) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // ORB initialization boiler plate... + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain the event channel from the naming service + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup ("EventService"); + + CORBA::Object_var ec_obj = + naming_context->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // The canonical protocol to connect to the EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventComm::PushSupplier_var supplier = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Simple publication, but usually the helper classes in + // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this. + RtecEventChannelAdmin::SupplierQOS qos; + qos.publications.length (1); + RtecEventComm::EventHeader& h0 = + qos.publications[0].event.header; + h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h0.source = 1; // first free event source + + consumer->connect_push_supplier (supplier.in (), qos + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Push the events... + ACE_Time_Value sleep_time (0, 10000); // 10 milliseconds + + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = 1; + event[0].header.ttl = 1; + + for (int i = 0; i != 2000; ++i) + { + consumer->push (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_OS::sleep (sleep_time); + } + + // Disconnect from the EC + consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Destroy the EC.... + event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Deactivate this object... + PortableServer::ObjectId_var id = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Destroy the POA + poa->destroy (1, 0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Supplier::run"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +void +Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Supplier.h b/TAO/orbsvcs/examples/RtEC/Simple/Supplier.h new file mode 100644 index 00000000000..b1fce544ccc --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/Supplier.h @@ -0,0 +1,51 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// Supplier +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef SUPPLIER_H +#define SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Supplier : public POA_RtecEventComm::PushSupplier +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // This class is a supplier of events. + // It simply publishes one event type. + // +public: + Supplier (void); + // Constructor + + int run (int argc, char* argv[]); + // Run the test + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: +}; + +#endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/Simple/ec.conf b/TAO/orbsvcs/examples/RtEC/Simple/ec.conf new file mode 100644 index 00000000000..d3d61260f68 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/ec.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECDispatching reactive -ECFiltering basic -ECSupplierFiltering per-supplier -ECProxyConsumerLock thread -ECProxySupplierLock thread -ECConsumerControl reactive -ECSupplierControl reactive -ECConsumerControlPeriod 50000 -ECSupplierControlPeriod 50000" diff --git a/TAO/orbsvcs/examples/RtEC/Simple/ec.conf.xml b/TAO/orbsvcs/examples/RtEC/Simple/ec.conf.xml new file mode 100644 index 00000000000..63807cba8d3 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/ec.conf.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/TAO/orbsvcs/examples/RtEC/Simple/run_test.pl b/TAO/orbsvcs/examples/RtEC/Simple/run_test.pl new file mode 100755 index 00000000000..ca45c8b972e --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/Simple/run_test.pl @@ -0,0 +1,83 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../../../bin'; +use PerlACE::Run_Test; + +$status = 0; + +$ns_ior = PerlACE::LocalFile ("ns.ior"); +$conffile = PerlACE::LocalFile ("ec" . "$PerlACE::svcconf_ext"); + +unlink $ns_ior; + +$NS = new PerlACE::Process ("../../../Naming_Service/Naming_Service", + "-o $ns_ior "); + +$T = new PerlACE::Process ("Service", + "-ORBInitRef NameService=file://$ns_ior " + . "-ORBsvcconf $conffile"); + +$C = new PerlACE::Process ("Consumer", + "-ORBInitRef NameService=file://$ns_ior "); + +$S = new PerlACE::Process ("Supplier", + "-ORBInitRef NameService=file://$ns_ior "); + + + +print STDOUT "Starting name server\n"; +$NS->Spawn (); + +if (PerlACE::waitforfile_timed ($ns_ior, 15) == -1) { + print STDERR "ERROR: cannot find file <$ns_ior>\n"; + $NS->Kill (); + exit 1; +} + +print STDOUT "Starting event server\n"; +$T->Spawn (); + +sleep 2; + +print STDOUT "Starting consumer\n"; +$C->Spawn (); + +sleep 1; + +print STDOUT "Starting supplier\n"; +$supplier = $S->SpawnWaitKill (120); + +if ($supplier != 0) { + print STDERR "ERROR: supplier returned $supplier\n"; + $status = 1; +} + +$consumer = $C->WaitKill (15); + +if ($consumer != 0) { + print STDERR "ERROR: consumer returned $consumer\n"; + $status = 1; +} + +$service = $T->TerminateWaitKill (5); + +if ($service != 0) { + print STDERR "ERROR: service returned $service\n"; + $status = 1; +} + +$nserver = $NS->TerminateWaitKill (5); + +if ($nserver != 0) { + print STDERR "ERROR: name server returned $nserver\n"; + $status = 1; +} + +unlink $ns_ior; + +exit $status; -- cgit v1.2.1