diff options
author | Abdullah Sowayan <sowayan@users.noreply.github.com> | 2008-10-21 20:47:43 +0000 |
---|---|---|
committer | Abdullah Sowayan <sowayan@users.noreply.github.com> | 2008-10-21 20:47:43 +0000 |
commit | d2911d5b9eb897d3da7d458ebf5ba8b998bc7763 (patch) | |
tree | 3158327d3787df5b439329fac177f20a12857c62 /TAO/DevGuideExamples/EventServices | |
parent | f8ea2bc5a4d98525f6f290d8272663e46aa1de74 (diff) | |
download | ATCD-d2911d5b9eb897d3da7d458ebf5ba8b998bc7763.tar.gz |
Tue Oct 21 19:10:21 UTC 2008 Abdullah Sowayan <abdullah.sowayan@lmco.com>
Diffstat (limited to 'TAO/DevGuideExamples/EventServices')
65 files changed, 3651 insertions, 0 deletions
diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..d7a67dbd214 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumerMain.cpp @@ -0,0 +1,83 @@ +// EchoEventConsumerMain.cpp +// Main program for a PushConsumer of Echo events. + + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/CosEventCommC.h> +#include <orbsvcs/CosEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> + +const int EVENTS_TILL_SHUTDOWN = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str("CosEventService"); + + // Downcast the object reference to an EventChannel reference. + CosEventChannelAdmin::EventChannel_var echoEC = + CosEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(echoEC.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "Found the EchoEventChannel." << std::endl; + + // Get a ConsumerAdmin object from the EventChannel. + CosEventChannelAdmin::ConsumerAdmin_var consumerAdmin = + echoEC->for_consumers(); + + // Get a ProxyPushSupplier from the ConsumerAdmin. + CosEventChannelAdmin::ProxyPushSupplier_var supplier = + consumerAdmin->obtain_push_supplier(); + + // Instantiate an EchoEventConsumer_i servant. + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENTS_TILL_SHUTDOWN); + + // Register it with the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + CosEventComm::PushConsumer_var consumer = + CosEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect to the ProxyPushSupplier, passing our PushConsumer object + // reference to it. + supplier->connect_push_consumer(consumer.in()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + orb->destroy(); + + std::cout << "Test complete." << std::endl; + + return 0; + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Consumer: Caught CORBA::Exception: " << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..cf1468ca9fa --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.cpp @@ -0,0 +1,42 @@ +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <iostream> + +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(CosEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ +} + +// Override the push() operation. +void EchoEventConsumer_i::push(const CORBA::Any & data) +{ + // Extract event data from the any. + const char* eventData; + if (data >>= eventData) + { + std::cout << "EchoEventConsumer_i::push(): Received event: " + << eventData << std::endl; + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Override the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.h new file mode 100644 index 00000000000..483c17de403 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventConsumer_i.h @@ -0,0 +1,29 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/CosEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/CosEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_CosEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const CORBA::Any & data); + + virtual void disconnect_push_consumer(); + + private: + CORBA::ORB_var orb_; + CosEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..713b8490c13 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/EchoEventSupplierMain.cpp @@ -0,0 +1,70 @@ +// Main program for a PushSupplier of Echo events. + +#include <orbsvcs/CosEventCommC.h> +#include <orbsvcs/CosEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> + +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EventChannel. + obj = root_context->resolve_str("CosEventService"); + + // Downcast the object reference to an EventChannel reference. + CosEventChannelAdmin::EventChannel_var echoEC = + CosEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(echoEC.in())) { + std::cerr << "Could not resolve EchoEventChannel." << std::endl; + return 1; + } + + // Get a SupplierAdmin object from the EventChannel. + CosEventChannelAdmin::SupplierAdmin_var supplierAdmin = + echoEC->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + CosEventChannelAdmin::ProxyPushConsumer_var consumer = + supplierAdmin->obtain_push_consumer(); + + // Connect to the ProxyPushConsumer as a PushSupplier + // (passing a nil PushSupplier object reference to it because + // we don't care to be notified about disconnects). + consumer->connect_push_supplier(CosEventComm::PushSupplier::_nil()); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup("Hello, world."); + + // Send one event per second. (approx) + while (1) { + // Insert the event data into an any. + CORBA::Any any; + any <<= eventData; + + // Now push the event to the consumer + consumer->push(any); + + ACE_Time_Value event_delay(0, 1000 * EVENT_DELAY_MS); + orb->run(event_delay); + } + + return 0; + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Supplier Caught CORBA::Exception. " << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/OMG_Basic.mpc b/TAO/DevGuideExamples/EventServices/OMG_Basic/OMG_Basic.mpc new file mode 100644 index 00000000000..c625d77490d --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/OMG_Basic.mpc @@ -0,0 +1,18 @@ +project(*Supplier): namingexe, event_skel { + exename = EchoEventSupplier + includes += ../common + Source_Files { + EchoEventSupplierMain.cpp + } +} + +project(*Consumer): namingexe, event_skel { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/README b/TAO/DevGuideExamples/EventServices/OMG_Basic/README new file mode 100644 index 00000000000..475f9c6e0a2 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/README @@ -0,0 +1,62 @@ +OMG Event Service + + +File: DevGuideExamples/EventServices/OMG_Basic/README + + +This directory contains a simple example of using the CosEvent service. +This example uses the push/push model: + + EchoEventSupplier ----> CosEvent_Service ----> EchoEventConsumer + +This example also works fine with the CosEvent_Service server. + +------------------------------------------------------------------------- + +Note: To test this, you must first run the Naming Service and the +CosEvent Service, e.g.: + +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior & +$TAO_ROOT/orbsvcs/CosEvent_Service/CosEvent_Service -ORBInitRef NameService=file://ns.ior& + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier -ORBInitRef NameService=file://ns.ior + + It will publish an event to the event channel every second. + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ORBInitRef NameService=file://ns.ior + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the CosEventComm::PushConsumer interface. + + + +Execution via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl -ExeSubDir <Release> + + + diff --git a/TAO/DevGuideExamples/EventServices/OMG_Basic/run_test.pl b/TAO/DevGuideExamples/EventServices/OMG_Basic/run_test.pl new file mode 100644 index 00000000000..78ced481eb9 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_Basic/run_test.pl @@ -0,0 +1,61 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$nsiorfile = PerlACE::LocalFile ("ns.ior"); +$esiorfile = PerlACE::LocalFile ("es.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile"; + +unlink $nsiorfile; +unlink $esiorfile; + +# start Naming Service + +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $nsiorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$nsiorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start Event Service +$EventService = "$ENV{TAO_ROOT}/orbsvcs/CosEvent_Service/CosEvent_Service"; +$ES = new PerlACE::Process($EventService, "-o $esiorfile $arg_ns_ref"); +$ES->Spawn(); +if (PerlACE::waitforfile_timed ($esiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$esiorfile>\n"; + $ES->Kill(); + unlink $nsiorfile; + exit 1; +} + +# start EchoEventSupplier +$S = new PerlACE::Process("EchoEventSupplier", $arg_ns_ref); +$S->Spawn(); + +# start EchoEventConsumer +$C = new PerlACE::Process("EchoEventConsumer", $arg_ns_ref); +$C->Spawn(); + +$CRET = $C->WaitKill(60); +$S->Kill(); +$NS->Kill(); +$ES->Kill(); + +unlink $nsiorfile; +unlink $esiorfile; + +if ($CRET != 0) { + print STDERR "ERROR: Client returned <$CRET>\n"; + exit 1 ; +} + +exit 0; + + diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..f8c9316ea86 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumerMain.cpp @@ -0,0 +1,81 @@ +// EchoEventConsumerMain.cpp +// Main program for a PushConsumer of Echo events. + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/CosEventCommC.h> +#include <orbsvcs/CosEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const int EVENT_LIMIT = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str("CosEventService"); + + // Downcast the object reference to an EventChannel reference. + CosEventChannelAdmin::EventChannel_var echoEC = + CosEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(echoEC.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "Found the EchoEventChannel." << std::endl; + + // Get a ConsumerAdmin object from the EventChannel. + CosEventChannelAdmin::ConsumerAdmin_var consumerAdmin = + echoEC->for_consumers(); + + // Get a ProxyPushSupplier from the ConsumerAdmin. + CosEventChannelAdmin::ProxyPushSupplier_var supplier = + consumerAdmin->obtain_push_supplier(); + + // Instantiate an EchoEventConsumer_i servant. + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENT_LIMIT); + + // Register it with the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + CosEventComm::PushConsumer_var consumer = + CosEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect to the ProxyPushSupplier, passing our PushConsumer object + // reference to it. + supplier->connect_push_consumer(consumer.in()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + orb->destroy(); + + std::cout << "Test completed." << std::endl; + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..cf1468ca9fa --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.cpp @@ -0,0 +1,42 @@ +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <iostream> + +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(CosEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ +} + +// Override the push() operation. +void EchoEventConsumer_i::push(const CORBA::Any & data) +{ + // Extract event data from the any. + const char* eventData; + if (data >>= eventData) + { + std::cout << "EchoEventConsumer_i::push(): Received event: " + << eventData << std::endl; + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Override the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.h new file mode 100644 index 00000000000..483c17de403 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventConsumer_i.h @@ -0,0 +1,29 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/CosEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/CosEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_CosEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const CORBA::Any & data); + + virtual void disconnect_push_consumer(); + + private: + CORBA::ORB_var orb_; + CosEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..b35ac72fdf8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/EchoEventSupplierMain.cpp @@ -0,0 +1,88 @@ +// Main program for a PushSupplier of Echo events. + +#include <orbsvcs/CosEventCommC.h> +#include <orbsvcs/CosEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> +#include <orbsvcs/CosEvent/CEC_EventChannel.h> +#include <orbsvcs/CosEvent/CEC_Default_Factory.h> + +#include <iostream> +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the CEC Factory so we can customize the CEC + TAO_CEC_Default_Factory::init_svcs (); + + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Get the root POA + CORBA::Object_var poa_object = orb->resolve_initial_references("RootPOA"); + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + -1); + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in ()); + PortableServer::POAManager_var poa_manager = root_poa->the_POAManager (); + poa_manager->activate (); + + // Create and activate the event channel servant + CosEventChannelAdmin::EventChannel_var echoEC; + + TAO_CEC_EventChannel_Attributes attr(root_poa.in(), root_poa.in()); + TAO_CEC_EventChannel* ec = new TAO_CEC_EventChannel(attr); + ec->activate(); + PortableServer::ObjectId_var oid = root_poa->activate_object(ec); + CORBA::Object_var ec_obj = root_poa->id_to_reference(oid.in()); + echoEC = CosEventChannelAdmin::EventChannel::_narrow(ec_obj.in()); + + // Bind the EventChannel. + CosNaming::Name_var name = root_context->to_name("CosEventService"); + root_context->rebind(name.in(), echoEC.in()); + + // Get a SupplierAdmin object from the EventChannel. + CosEventChannelAdmin::SupplierAdmin_var supplierAdmin = + echoEC->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + CosEventChannelAdmin::ProxyPushConsumer_var consumer = + supplierAdmin->obtain_push_consumer(); + + // Connect to the ProxyPushConsumer as a PushSupplier + // (passing a nil PushSupplier object reference to it because + // we don't care to be notified about disconnects). + consumer->connect_push_supplier(CosEventComm::PushSupplier::_nil()); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup("Hello, world."); + + while (1) { + // Insert the event data into an any. + CORBA::Any any; + any <<= eventData; + + // Now push the event to the consumer + consumer->push(any); + + ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS); + orb->run(tv); + } + orb->destroy(); + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught unknown CORBA::Exception exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/OMG_SupplierSideEC.mpc b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/OMG_SupplierSideEC.mpc new file mode 100644 index 00000000000..1caedc76b33 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/OMG_SupplierSideEC.mpc @@ -0,0 +1,19 @@ +project(*Supplier): namingexe, event_serv { + exename = EchoEventSupplier + includes += ../common + + Source_Files { + EchoEventSupplierMain.cpp + } +} + +project(*Consumer): namingexe, event_skel { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/README b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/README new file mode 100644 index 00000000000..aea980a3a44 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/README @@ -0,0 +1,61 @@ +OMG Event Service + + +File: DevGuideExamples/EventServices/OMG_SupplierSideEC/README + + +This directory contains an example that extends the previous examples +so that the event supplier creates its own local event channel. All +other code is identical to that in EventServices/OMG_Basic. + + EchoEventSupplier (contains EC) ------> EchoEventConsumer + +By default, the supplier will create a "Native" EC. Passing -wrapper +will force creation of a "Wrapper" EC that will utilize a Real-Time +Event Channel (RTEC) as the underlying implementation. + +------------------------------------------------------------------------- + +Note: To test this, you must first run the Naming Service + +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior& + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier -ORBInitRef NameService=file://ns.ior + + It will create an event channel and publish an event to it every + second. + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ORBInitRef NameService=file://ns.ior + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the CosEventComm::PushConsumer interface. + + + +Exeuction via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl diff --git a/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/run_test.pl b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/run_test.pl new file mode 100644 index 00000000000..80e9b7846c7 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_SupplierSideEC/run_test.pl @@ -0,0 +1,48 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$iorfile = PerlACE::LocalFile ("ns.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$iorfile"; + +unlink $iorfile; + +# start Naming Service +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $iorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($iorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$iorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start EchoEventSupplier +$S = new PerlACE::Process("EchoEventSupplier", $arg_ns_ref); +$S->Spawn(); + +# Allow time for the supplier to register with the Naming Service +sleep(2); + +# start EchoEventConsumer +$C = new PerlACE::Process("EchoEventConsumer", $arg_ns_ref); +$CRET = $C->SpawnWaitKill(60); + +$S->Kill(); +$NS->Kill(); + +unlink $iorfile; + +if ($CRET != 0) { + print STDERR "ERROR: Client returned <$CRET>\n"; + exit 1 ; +} + + +exit 0; + + diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/ConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/ConsumerMain.cpp new file mode 100644 index 00000000000..5fedaa7c3f5 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/ConsumerMain.cpp @@ -0,0 +1,103 @@ +// ConsumerMain.cpp +// Main program for a TypedPushConsumer of Messenger objects. + + +#include "Messenger_i.h" +#include "Consumer_i.h" + +#include <orbsvcs/CosTypedEventCommC.h> +#include <orbsvcs/CosTypedEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> +#include <tao/AnyTypeCode/TypeCode.h> +#include <ace/OS_NS_stdio.h> +#include <iostream> + +const int EVENTS_TILL_SHUTDOWN = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EventChannel. + obj = root_context->resolve_str("CosEventService"); + + // Downcast the object reference to an TypedEventChannel reference. + CosTypedEventChannelAdmin::TypedEventChannel_var ec = + CosTypedEventChannelAdmin::TypedEventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not narrow TypedEventChannel." << std::endl; + return 1; + } + std::cout << "Found the TypedEventChannel." << std::endl; + + // Get a ConsumerAdmin object from the EventChannel. + CosTypedEventChannelAdmin::TypedConsumerAdmin_var consumerAdmin = + ec->for_consumers(); + + // Get a ProxyPushSupplier from the ConsumerAdmin. + CosEventChannelAdmin::ProxyPushSupplier_var supplier = + consumerAdmin->obtain_typed_push_supplier(::_tc_Messenger->id()); + + // Instantiate an Messenger_i servant. + Messenger_i servant(orb.in(), supplier.in(), EVENTS_TILL_SHUTDOWN); + + // Register it with the RootPOA. + // Activate the POA here before we connect our consumer. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var messenger_obj = poa->id_to_reference(oid.in()); + Consumer_i consumer_servant(orb.in(), messenger_obj.in()); + PortableServer::ObjectId_var cons_oid = + poa->activate_object(&consumer_servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(cons_oid.in()); + CosTypedEventComm::TypedPushConsumer_var consumer = + CosTypedEventComm::TypedPushConsumer::_narrow(consumer_obj.in()); + + // Connect to the ProxyPushSupplier, passing our PushConsumer object + // reference to it. + supplier->connect_push_consumer(consumer.in()); + + std::cout << "Ready to receive events..." << std::endl; + + CORBA::String_var str = orb->object_to_string (consumer.in ()); + const char* ior_file_name = "Consumer.ior"; + FILE *output_file= ACE_OS::fopen (ACE_TEXT_CHAR_TO_TCHAR(ior_file_name), + ACE_LIB_TEXT("w")); + if (output_file == 0) { + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_file_name), + 1); + } + ACE_OS::fprintf (output_file, "%s", str.in ()); + ACE_OS::fclose (output_file); + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + // Disconnect the ProxyPushSupplier. + orb->destroy(); + + std::cout << "Test complete." << std::endl; + + return 0; + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Caught CORBA::Exception. " << std::endl << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.cpp b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.cpp new file mode 100644 index 00000000000..57b72a2ee21 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.cpp @@ -0,0 +1,39 @@ +// Implements a PushConsumer. + +#include "Consumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <iostream> + +Consumer_i::Consumer_i(CORBA::ORB_ptr orb, + CORBA::Object_ptr obj) + : orb_(CORBA::ORB::_duplicate(orb)) + , object_(CORBA::Object::_duplicate(obj)) +{ +} + + +CORBA::Object_ptr +Consumer_i::get_typed_consumer () + throw (CORBA::SystemException) +{ + return CORBA::Object::_duplicate(object_.in()); +} + +// Override the push() operation. +void Consumer_i::push(const CORBA::Any &) + throw(CORBA::SystemException) +{ + throw CORBA::NO_IMPLEMENT (); +} + +// Override the disconnect_push_consumer() operation. +void Consumer_i::disconnect_push_consumer() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.h b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.h new file mode 100644 index 00000000000..d4d4388eaab --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Consumer_i.h @@ -0,0 +1,32 @@ +// Consumer_i.h +// Implements a TypedPushConsumer. + +#ifndef _Consumer_i_h_ +#define _Consumer_i_h_ + +#include <orbsvcs/CosTypedEventCommS.h> // for POA_CosTypedEventComm::TypedPushConsumer + +class Consumer_i +: public virtual POA_CosTypedEventComm::TypedPushConsumer +{ + public: + // Constructor + Consumer_i(CORBA::ORB_ptr orb, + CORBA::Object_ptr obj); + + // Override operations from TypedPushConsumer interface. + virtual CORBA::Object_ptr get_typed_consumer () + throw (CORBA::SystemException); + + virtual void push(const CORBA::Any & data) + throw(CORBA::SystemException); + + virtual void disconnect_push_consumer() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; + CORBA::Object_var object_; +}; + +#endif // _Consumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger.idl b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger.idl new file mode 100644 index 00000000000..6f6cdf2f33c --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger.idl @@ -0,0 +1,10 @@ +// Messenger.idl + +interface Messenger +{ + // Modified to make message an in parameter and + // remove the return value. + void send_message(in string user_name, + in string subject, + in string message); +}; diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.cpp b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.cpp new file mode 100644 index 00000000000..5d91988b9a1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.cpp @@ -0,0 +1,40 @@ +/* -*- C++ -*- $Id$ */ + +// ****** Code generated by the The ACE ORB (TAO) IDL Compiler ******* +// TAO and the TAO IDL Compiler have been developed by the Center for +// Distributed Object Computing at Washington University, St. Louis. +// +// Information about TAO is available at: +// http://www.cs.wustl.edu/~schmidt/TAO.html + +#include "Messenger_i.h" +#include <iostream> +// Implementation skeleton constructor +Messenger_i::Messenger_i (CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(CosEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ +} + +// Implementation skeleton destructor +Messenger_i::~Messenger_i (void) +{ +} + +void Messenger_i::send_message (const char * user_name, + const char * subject, + const char * message) + throw (CORBA::SystemException) +{ + std::cout << "Message from: " << user_name << std::endl; + std::cout << "Subject: " << subject << std::endl; + std::cout << "Message: " << message << std::endl; + + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.h b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.h new file mode 100644 index 00000000000..1b71a298e0a --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/Messenger_i.h @@ -0,0 +1,46 @@ +/* -*- C++ -*- $Id$ */ + +// ****** Code generated by the The ACE ORB (TAO) IDL Compiler ******* +// TAO and the TAO IDL Compiler have been developed by the Center for +// Distributed Object Computing at Washington University, St. Louis. +// +// Information about TAO is available at: +// http://www.cs.wustl.edu/~schmidt/TAO.html + +#ifndef MESSENGER_I_H_ +#define MESSENGER_I_H_ + +#include "MessengerS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include <orbsvcs/CosTypedEventChannelAdminC.h> + +//Class Messenger_i +class Messenger_i : public virtual POA_Messenger +{ +public: + //Constructor + Messenger_i (CORBA::ORB_ptr orb, + CosEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + //Destructor + virtual ~Messenger_i (void); + + virtual void send_message (const char * user_name, + const char * subject, + const char * message) + throw (CORBA::SystemException); + +private: + CORBA::ORB_var orb_; + CosEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; + +}; + + +#endif /* MESSENGERI_H_ */ diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/OMG_TypedEC.mpc b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/OMG_TypedEC.mpc new file mode 100644 index 00000000000..27c9419cc04 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/OMG_TypedEC.mpc @@ -0,0 +1,19 @@ +project(*Supplier): namingexe, event_skel { + exename = Supplier + includes += ../common + Source_Files { + SupplierMain.cpp + MessengerC.cpp + } +} + +project(*Consumer): namingexe, event_skel { + exename = Consumer + includes += ../common + + Source_Files { + ConsumerMain.cpp + Consumer_i.cpp + Messenger_i.cpp + } +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/README b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/README new file mode 100644 index 00000000000..bae2c8463e8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/README @@ -0,0 +1,17 @@ + +$TAO_ROOT/DevGuideExamples/EventServices/OMG_TypedEC + +This example is a simple demonstration of Typed Event Channel usage. +The Messenger interface is slightly modified for use in this example +to allow suppliers to send messages to all connected consumers. To +run the example, execute the following commands: + +export InterfaceRepositoryIOR=file://ifr.ior +export NameServiceIOR=file://ns.ior +$TAO_ROOT/orbsvcs/IFR_Service/IFR_Service -o ifr.ior & +$ACE_ROOT/bin/tao_ifr Messenger.idl +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior & +$TAO_ROOT/orbsvcs/CosEvent_Service/CosEvent_Service -t & +./Consumer & +./Supplier + diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/SupplierMain.cpp b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/SupplierMain.cpp new file mode 100644 index 00000000000..4d0567aefc1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/SupplierMain.cpp @@ -0,0 +1,74 @@ +// Main program for a TypedPushSupplier of Messenger objects. + +#include <orbsvcs/CosTypedEventCommC.h> +#include <orbsvcs/CosTypedEventChannelAdminC.h> +#include <orbsvcs/CosNamingC.h> +#include <tao/AnyTypeCode/TypeCode.h> + +#include <iostream> +#include "MessengerC.h" + +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EventChannel. + obj = root_context->resolve_str("CosEventService"); + + // Downcast the object reference to a TypedEventChannel reference. + CosTypedEventChannelAdmin::TypedEventChannel_var ec = + CosTypedEventChannelAdmin::TypedEventChannel::_narrow(obj.in ()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not resolve TypedEventChannel." << std::endl; + return 1; + } + + // Get a SupplierAdmin object from the EventChannel. + CosTypedEventChannelAdmin::TypedSupplierAdmin_var supplierAdmin = + ec->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + CosTypedEventChannelAdmin::TypedProxyPushConsumer_var consumer = + supplierAdmin->obtain_typed_push_consumer(::_tc_Messenger->id()); + + // Connect to the ProxyPushConsumer as a PushSupplier + // (passing a nil PushSupplier object reference to it because + // we don't care to be notified about disconnects). + consumer->connect_push_supplier(CosEventComm::PushSupplier::_nil()); + + // Obtain the interface from the event channel + CORBA::Object_var messenger_obj = consumer->get_typed_consumer(); + + // Narrow the interface + Messenger_var messenger = Messenger::_narrow(messenger_obj.in () ); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup("Hello, world."); + + // Send one event per second. (approx) + while (1) { + messenger->send_message("King Lizard", + "Proclamations", + eventData.in()); + + ACE_Time_Value event_delay(0, 1000 * EVENT_DELAY_MS); + orb->run(event_delay); + } + + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Caught CORBA::Exception. " << std::endl << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/OMG_TypedEC/run_test.pl b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/run_test.pl new file mode 100644 index 00000000000..11adc6d33e6 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/OMG_TypedEC/run_test.pl @@ -0,0 +1,94 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$nsiorfile = PerlACE::LocalFile ("ns.ior"); +$ifriorfile = PerlACE::LocalFile ("ifr.ior"); +$esiorfile = PerlACE::LocalFile ("es.ior"); +$consiorfile = PerlACE::LocalFile ("Consumer.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile"; +$arg_ifr_ref = "-ORBInitRef InterfaceRepository=file://$ifriorfile"; + +unlink $nsiorfile; +unlink $ifriorfile; +unlink $esiorfile; +unlink $consiorfile; + +# start the Interface Repository Service +$IFRService = "$ENV{TAO_ROOT}/orbsvcs/IFR_Service/IFR_Service"; +$IF = new PerlACE::Process ($IFRService, "-o $ifriorfile"); +$IF->Spawn (); +if (PerlACE::waitforfile_timed ($ifriorfile, 15) == -1) { + print STDERR "ERROR: cannot find file <$ifriorfile>\n"; + $IF->Kill (); + exit 1; +} + +# load the IFR with the Messenger interface info +$TAO_IFR = "$ENV{ACE_ROOT}/bin/tao_ifr"; +$TI = new PerlACE::Process ($TAO_IFR, + "$arg_ifr_ref Messenger.idl"); +$TI->SpawnWaitKill (60); + +# start Naming Service +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $nsiorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$nsiorfile>\n"; + $NS->Kill(); + $IF->Kill (); + exit 1; +} + +# start Event Service +$EventService = "$ENV{TAO_ROOT}/orbsvcs/CosEvent_Service/CosEvent_Service"; +$ES = new PerlACE::Process($EventService, + "-t -o $esiorfile $arg_ns_ref $arg_ifr_ref"); +$ES->Spawn(); +if (PerlACE::waitforfile_timed ($esiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$esiorfile>\n"; + $ES->Kill(); + $NS->Kill (); + $IF->Kill (); + unlink $nsiorfile; + exit 1; +} + +# start Consumer +$C = new PerlACE::Process("Consumer", "$arg_ns_ref"); +$C->Spawn(); +if (PerlACE::waitforfile_timed ($consiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$consiorfile>\n"; + $ES->Kill(); + $NS->Kill (); + $IF->Kill (); + $C->Kill (); + exit 1; +} + +# start Supplier +$S = new PerlACE::Process("Supplier", "$arg_ns_ref"); +$S->Spawn(); + +$CRET = $C->WaitKill(60); +$S->Kill(); +$NS->Kill(); +$ES->Kill(); +$IF->Kill(); + +unlink $nsiorfile; +unlink $ifriorfile; +unlink $esiorfile; +unlink $consiorfile; + +if ($CRET != 0) { + print STDERR "ERROR: Client returned <$CRET>\n"; + exit 1 ; +} + +exit 0; diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..ba0a01976d4 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumerMain.cpp @@ -0,0 +1,88 @@ +// Main program for a PushConsumer of Echo events. + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const int EVENT_LIMIT = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str("EventService"); + + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "EchoEventConsumerMain.cpp: Found the EchoEventChannel." << std::endl; + + // Obtain a reference to the consumer administration object. + RtecEventChannelAdmin::ConsumerAdmin_var admin = ec->for_consumers(); + + // Obtain a reference to the push supplier proxy. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + admin->obtain_push_supplier(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventConsumer_i servant. + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENT_LIMIT); + + // Register it with the RootPOA. + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushConsumer_var consumer = + RtecEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect as a consumer. + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (1); + qos.insert_type (ACE_ES_EVENT_ANY, // Event Type + 0); // handle to the rt_info + supplier->connect_push_consumer (consumer.in (), + qos.get_ConsumerQOS ()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "EchoEventConsumerMain.cpp: Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + // Disconnect the ProxyPushSupplier. + orb->destroy(); + + std::cout << "Test completed." << std::endl; + + return 0; + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Caught CORBA::Exception" << std::endl << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..08bb270168b --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.cpp @@ -0,0 +1,56 @@ +// EchoEventConsumer_i.cpp +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <ace/OS_NS_stdio.h> +#include <sstream> + +// Constructor duplicates the ORB reference. +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ + // Nothing to do. +} + +// Implement the push() operation. +void EchoEventConsumer_i::push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException) +{ + // Loop through the events, looking for shutdown events. + for (u_int i = 0; i < events.length (); ++i) { + //ACE_OS::printf("."); + // Extract event data from the any. + const char* eventData; + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + if (events[i].data.any_value >>= eventData) { + out << " text: " << eventData; + } + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Implement the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.h new file mode 100644 index 00000000000..c59aa7944a8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventConsumer_i.h @@ -0,0 +1,31 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/RtecEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_RtecEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException); + + virtual void disconnect_push_consumer() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..57101632a30 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplierMain.cpp @@ -0,0 +1,107 @@ +// EchoEventSupplierMain.cpp +// Main program for a PushSupplier of Echo events. + +#include "EchoEventSupplier_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Get the Event Channel using Naming Services + obj = root_context->resolve_str("EventService"); + + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not resolve EchoEventChannel." << std::endl; + return 1; + } + + // Get a SupplierAdmin object from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + admin->obtain_push_consumer(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventSupplier_i servant. + EchoEventSupplier_i servant(orb.in()); + + // Register it with the RootPOA. + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushSupplier_var supplier = + RtecEventComm::PushSupplier::_narrow(supplier_obj.in()); + + // Publish the events the supplier provides. + ACE_SupplierQOS_Factory qos; + qos.insert (MY_SOURCE_ID, // Supplier's unique id + MY_EVENT_TYPE, // Event type + 0, // handle to the rt_info structure + 1); // number of calls + + // Connect as a supplier of the published events. + consumer->connect_push_supplier (supplier.in (), + qos.get_SupplierQOS ()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup("Hello, world."); + + // Create an event set for one event + RtecEventComm::EventSet events (1); + events.length (1); + + // Initialize event header. + events[0].header.source = MY_SOURCE_ID; + events[0].header.type = MY_EVENT_TYPE; + + // Initialize data fields in event. + events[0].data.any_value <<= eventData; + + std::cout << "Supplier starting sending of events" << std::endl; + + while (1) { + consumer->push (events); + ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS); + orb->run(tv); + } + + orb->destroy(); + + return 0; + } + catch(const CORBA::Exception& ex) + { + std::cerr << "Supplier::main() Caught CORBA::Exception" << std::endl << ex << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.cpp new file mode 100644 index 00000000000..c2339c37ad1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.cpp @@ -0,0 +1,24 @@ +// EchoEventSupplier_i.cpp +// Implements a PushSupplier. + +#include "EchoEventSupplier_i.h" +#include <tao/PortableServer/PS_CurrentC.h> + +// Constructor duplicates the ORB reference. +EchoEventSupplier_i::EchoEventSupplier_i(CORBA::ORB_ptr orb) + : orb_(CORBA::ORB::_duplicate(orb)) +{ + // Nothing to do. +} + +// Override the disconnect_push_Supplier() operation. +void EchoEventSupplier_i::disconnect_push_supplier() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.h new file mode 100644 index 00000000000..b06f44cf8e3 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/EchoEventSupplier_i.h @@ -0,0 +1,22 @@ +// EchoEventSupplier_i.h +// Implements a PushSupplier. + +#ifndef _EchoEventSupplier_i_h_ +#define _EchoEventSupplier_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushSupplier + +class EchoEventSupplier_i : public virtual POA_RtecEventComm::PushSupplier +{ + public: + // Constructor + EchoEventSupplier_i(CORBA::ORB_ptr orb); + + virtual void disconnect_push_supplier() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; +}; + +#endif // _EchoEventSupplier_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/README b/TAO/DevGuideExamples/EventServices/RTEC_Basic/README new file mode 100644 index 00000000000..5c4e89eacc8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/README @@ -0,0 +1,57 @@ +Real-Time Event Service + + +File: DevGuideExamples/EventServices/RTEC_Basic/README + + +This directory contains a simple example of using the RT Event service. +This example uses the push/push model: + + EchoEventSupplier ------> Event_Service ------> EchoEventConsumer + +------------------------------------------------------------------------- + +Note: To test this, you must first run the Naming Service and the +Event Service, e.g.: + +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior& +$TAO_ROOT/orbsvcs/Event_Service/Event_Service -ORBInitRef NameService=file://ns.ior& + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier -ORBInitRef NameService=file://ns.ior + + It will publish an event to the event channel every 1 second. + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ORBInitRef NameService=file://ns.ior + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the RtecEventComm::PushConsumer interface. + + + +Exeuction via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/RTEC_Basic.mpc b/TAO/DevGuideExamples/EventServices/RTEC_Basic/RTEC_Basic.mpc new file mode 100644 index 00000000000..a67c5f1c819 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/RTEC_Basic.mpc @@ -0,0 +1,20 @@ +project(*Supplier): namingexe, rteventexe, { + exename = EchoEventSupplier + includes += ../common + + Source_Files { + EchoEventSupplierMain.cpp + EchoEventSupplier_i.cpp + } +} + +project(*Consumer): namingexe, rteventexe, { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Basic/run_test.pl b/TAO/DevGuideExamples/EventServices/RTEC_Basic/run_test.pl new file mode 100644 index 00000000000..3fcc37a237c --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Basic/run_test.pl @@ -0,0 +1,60 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$nsiorfile = PerlACE::LocalFile ("ns.ior"); +$esiorfile = PerlACE::LocalFile ("es.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile"; + +unlink $nsiorfile; +unlink $esiorfile; + +# start Naming Service +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $nsiorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$nsiorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start Event Service +$EventService = "$ENV{TAO_ROOT}/orbsvcs/Event_Service/Event_Service"; +$ES = new PerlACE::Process($EventService, "-o $esiorfile $arg_ns_ref"); +$ES->Spawn(); +if (PerlACE::waitforfile_timed ($esiorfile, 15) == -1) { + print STDERR "ERROR: cannot find file <$esiorfile>\n"; + $ES->Kill(); + unlink $nsiorfile; + exit 1; +} + +# start EchoEventSupplier +$S = new PerlACE::Process("EchoEventSupplier", $arg_ns_ref); +$S->Spawn(); + +# start EchoEventConsumer +$C = new PerlACE::Process("EchoEventConsumer", $arg_ns_ref); +$CRET = $C->SpawnWaitKill(60); + +$S->Kill(); +$NS->Kill(); +$ES->Kill(); + +unlink $nsiorfile; +unlink $esiorfile; + +if ($CRET != 0) { + print STDERR "ERROR: Client returned <$CRET>\n"; + exit 1 ; +} + + +exit 0; + + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..d6b08f99a87 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumerMain.cpp @@ -0,0 +1,103 @@ +// EchoEventConsumerMain.cpp +// Main program for a PushConsumer of Echo events. + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +const int EVENT_LIMIT = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + const char* ecname = "EventService"; + for (int i = 0; argv[i] != 0; i++) { + if (strcmp(argv[i], "-ecname") == 0) { + if (argv[i+1] != 0) { + ecname = argv[i+1]; + } else { + std::cerr << "Missing Event channel name" << std::endl; + } + } + } + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context + = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str(ecname); + + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "EchoEventConsumerMain.cpp: Found the EchoEventChannel." << std::endl; + + // Obtain a reference to the consumer administration object. + RtecEventChannelAdmin::ConsumerAdmin_var admin = ec->for_consumers(); + + // Obtain a reference to the push supplier proxy. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + admin->obtain_push_supplier(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventConsumer_i servant and register it + // with the RootPOA + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENT_LIMIT); + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushConsumer_var consumer = + RtecEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect as a consumer. + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert (MY_SOURCE_ID, // Source ID + MY_EVENT_TYPE, // Event Type + 0); // handle to the rt_info + supplier->connect_push_consumer (consumer.in (), + qos.get_ConsumerQOS ()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "EchoEventConsumerMain.cpp: Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + // Disconnect the ProxyPushSupplier. + orb->destroy(); + + std::cout << "Test completed." << std::endl; + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..08bb270168b --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.cpp @@ -0,0 +1,56 @@ +// EchoEventConsumer_i.cpp +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <ace/OS_NS_stdio.h> +#include <sstream> + +// Constructor duplicates the ORB reference. +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ + // Nothing to do. +} + +// Implement the push() operation. +void EchoEventConsumer_i::push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException) +{ + // Loop through the events, looking for shutdown events. + for (u_int i = 0; i < events.length (); ++i) { + //ACE_OS::printf("."); + // Extract event data from the any. + const char* eventData; + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + if (events[i].data.any_value >>= eventData) { + out << " text: " << eventData; + } + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Implement the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.h new file mode 100644 index 00000000000..c59aa7944a8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventConsumer_i.h @@ -0,0 +1,31 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/RtecEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_RtecEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException); + + virtual void disconnect_push_consumer() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..5ecf0c4c43f --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplierMain.cpp @@ -0,0 +1,204 @@ +// EchoEventSupplierMain.cpp +// Main program for a PushSupplier of Echo events. + +#include "EchoEventSupplier_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> +#include <orbsvcs/Event/EC_Event_Channel.h> +#include <orbsvcs/Event/EC_Gateway_IIOP.h> +#include <orbsvcs/Event/EC_Default_Factory.h> + +#include <ace/Thread_Manager.h> +#include <iostream> +#include <fstream> + +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +const int EVENT_DELAY_MS = 10; + +ACE_THR_FUNC_RETURN orb_thread(void *orb_ptr) +{ + CORBA::ORB_var orb = CORBA::ORB::_duplicate((CORBA::ORB_ptr) orb_ptr); + orb->run(); + return 0; +} + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the EC Factory so we can customize the EC + TAO_EC_Default_Factory::init_svcs (); + + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + const char* ecname = "EventService"; + const char* remote_ecname = 0; + const char* iorfile = 0; + for (int i = 0; argv[i] != 0; i++) { + if (strcmp(argv[i], "-ecname") == 0) { + if (argv[i+1] != 0) { + i++; + ecname = argv[i]; + } else { + std::cerr << "Missing Event channel name" << std::endl; + } + } + if (strcmp(argv[i], "-gateway") == 0) { + if (argv[i+1] != 0) { + i++; + remote_ecname = argv[i]; + } else { + std::cerr << "Missing Event channel name" << std::endl; + } + } + if (strcmp(argv[i], "-iorfile") == 0) { + if (argv[i+1] != 0) { + i++; + iorfile = argv[i]; + } + } + } + + // Get the POA + CORBA::Object_var object = orb->resolve_initial_references ("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ()); + PortableServer::POAManager_var poa_manager = poa->the_POAManager (); + poa_manager->activate (); + + // Spawn a thread for the orb + ACE_Thread_Manager *thread_mgr = ACE_Thread_Manager::instance(); + thread_mgr->spawn(orb_thread, orb.in()); + + // Create a local event channel and register it with the RootPOA. + TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ()); + TAO_EC_Event_Channel ec_impl (attributes); + ec_impl.activate (); + PortableServer::ObjectId_var oid = poa->activate_object(&ec_impl); + CORBA::Object_var ec_obj = poa->id_to_reference(oid.in()); + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in()); + + // Find the Naming Service. + object = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(object.in()); + CosNaming::Name_var name = root_context->to_name(ecname); + root_context->rebind(name.in(), ec.in()); + + // Get a SupplierAdmin object from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + admin->obtain_push_consumer(); + + // Instantiate an EchoEventSupplier_i servant. + EchoEventSupplier_i servant(orb.in()); + + // Register it with the RootPOA. + oid = poa->activate_object(&servant); + CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushSupplier_var supplier = + RtecEventComm::PushSupplier::_narrow(supplier_obj.in()); + + // Publish the events the supplier provides. + ACE_SupplierQOS_Factory qos; + qos.insert (MY_SOURCE_ID, // Supplier's unique id + MY_EVENT_TYPE, // Event type + 0, // handle to the rt_info structure + 1); // number of calls + + // Connect as a supplier of the published events. + consumer->connect_push_supplier (supplier.in (), + qos.get_SupplierQOS ()); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup(ecname); + + // Create an event set for one event + RtecEventComm::EventSet event (1); + event.length (1); + // Initialize event header. + event[0].header.source = MY_SOURCE_ID; + event[0].header.ttl = 1; + event[0].header.type = MY_EVENT_TYPE; + // Initialize data fields in event. + event[0].data.any_value <<= eventData; + + TAO_EC_Gateway_IIOP gateway; + int gateway_initialized = 0; + + std::cout << "Supplier starting sending of events.\n"; + + while (1) { + + consumer->push (event); + ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS); + orb->run(tv); + + if ((remote_ecname != 0) && (!gateway_initialized)) { + + try { + // Get the remote event channel object + CORBA::Object_var obj = root_context->resolve_str(remote_ecname); + RtecEventChannelAdmin::EventChannel_var remote_ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + + int ok = 0; + if (!CORBA::is_nil(remote_ec.in())) { + // Now check if we can talk to it... + try { + RtecEventChannelAdmin::SupplierAdmin_var adm = + remote_ec->for_suppliers(); + ok = 1; + } catch(const CORBA::UserException&) { + // What is the correct exception(s) to catch here? + } + } + + // There is a good remote event channel so initialize the + // gateway. + if (ok) { + gateway.init(remote_ec.in(), ec.in()); + + PortableServer::ObjectId_var gateway_oid = + poa->activate_object(&gateway); + CORBA::Object_var gateway_obj = + poa->id_to_reference(gateway_oid.in()); + RtecEventChannelAdmin::Observer_var obs = + RtecEventChannelAdmin::Observer::_narrow(gateway_obj.in()); + RtecEventChannelAdmin::Observer_Handle local_ec_obs_handle = + ec->append_observer (obs.in ()); + ACE_UNUSED_ARG (local_ec_obs_handle); + gateway_initialized = 1; + std::cout << "Gateway initialized\n"; + if (iorfile != 0) { + CORBA::String_var str = orb->object_to_string( ec.in() ); + std::ofstream iorFile( iorfile ); + iorFile << str.in() << std::endl; + iorFile.close(); + } + } + } catch(const CosNaming::NamingContext::NotFound&) { + // Try again later... + } + } + } + + orb->destroy(); + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.cpp new file mode 100644 index 00000000000..c2339c37ad1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.cpp @@ -0,0 +1,24 @@ +// EchoEventSupplier_i.cpp +// Implements a PushSupplier. + +#include "EchoEventSupplier_i.h" +#include <tao/PortableServer/PS_CurrentC.h> + +// Constructor duplicates the ORB reference. +EchoEventSupplier_i::EchoEventSupplier_i(CORBA::ORB_ptr orb) + : orb_(CORBA::ORB::_duplicate(orb)) +{ + // Nothing to do. +} + +// Override the disconnect_push_Supplier() operation. +void EchoEventSupplier_i::disconnect_push_supplier() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.h new file mode 100644 index 00000000000..b06f44cf8e3 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/EchoEventSupplier_i.h @@ -0,0 +1,22 @@ +// EchoEventSupplier_i.h +// Implements a PushSupplier. + +#ifndef _EchoEventSupplier_i_h_ +#define _EchoEventSupplier_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushSupplier + +class EchoEventSupplier_i : public virtual POA_RtecEventComm::PushSupplier +{ + public: + // Constructor + EchoEventSupplier_i(CORBA::ORB_ptr orb); + + virtual void disconnect_push_supplier() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; +}; + +#endif // _EchoEventSupplier_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/README b/TAO/DevGuideExamples/EventServices/RTEC_Federated/README new file mode 100644 index 00000000000..c3c2e9b8576 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/README @@ -0,0 +1,84 @@ +Real-Time Event Service + + +File: DevGuideExamples/EventServices/RTEC_Federated/README + + +This directory contains an example that shows how to create and +federate real-time event channels. + +------------------------------------------------------------------------- + +Note: To test this, you must first run the Naming Service, e.g.: + + $TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior& + +After running the naming service, start a couple of suppliers: + + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -ORBSvcConf supplier.conf -ecname name1 -gateway name2 + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -ORBSvcConf supplier.conf -ecname name2 -gateway name1 + +Now start some consumers: + + ./EchoEventConsumer -ORBInitRef NameService=file://ns.ior -ecname name1 + ./EchoEventConsumer -ORBInitRef NameService=file://ns.ior -ecname name2 + +It may be easiest to start these in separate windows. You should +see events from both suppliers on both event channels. + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier -ORBInitRef NameService=file://ns.ior -ORBSvcConf supplier.conf -ecname <name> -gateway <rname> -iorfile <file> + + This will create a local RTEC event channel and bind it under + the root context of the naming service with the name <name>. + It will also create a gateway that links from the remote event + channel bound under <rname> to the locally created event channel. + After initializing the local event channel, it will idle until + it locates the remote event channel, initialize the gateway, + and then publish an event to the local event channel every 10 + milliseconds. This event will contain the string <name> in the + any_value field. When the gateway is initialized, you'll see a + message stating "Gateway initialized". If you pass the -iorfile + parameter, this server also writes the EC's IOR to <file> when + the gateway is initialized. + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ORBInitRef NameService=file://ns.ior -ecname <name> + + This will look for an event channel bound to <name> in the Root context + of the Naming Service. It will consume events from this channel and + print the type, source, and string contents contained in any_value. + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the RtecEventComm::PushConsumer interface. + + +Exeuction via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl + + + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/RTEC_Federated.mpc b/TAO/DevGuideExamples/EventServices/RTEC_Federated/RTEC_Federated.mpc new file mode 100644 index 00000000000..718344d8d9e --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/RTEC_Federated.mpc @@ -0,0 +1,20 @@ +project(*Supplier): namingexe, rteventexe, rtevent_serv { + exename = EchoEventSupplier + includes += ../common + + Source_Files { + EchoEventSupplierMain.cpp + EchoEventSupplier_i.cpp + } +} + +project(*Consumer): namingexe, rteventexe { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/run_test.pl b/TAO/DevGuideExamples/EventServices/RTEC_Federated/run_test.pl new file mode 100644 index 00000000000..7f2282debce --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/run_test.pl @@ -0,0 +1,94 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$iorfile = PerlACE::LocalFile ("ns.ior"); +$ec1iorfile = PerlACE::LocalFile ("ec1.ior"); +$ec2iorfile = PerlACE::LocalFile ("ec2.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$iorfile"; + +unlink $iorfile; +unlink $ec1iorfile; +unlink $ec2iorfile; + +# start Naming Service +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $iorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($iorfile, 10) == -1) { + print STDERR "ERROR: cannot find file <$iorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start Supplier +if ( -e "supplier.conf" ) +{ + $supplier_conf_file = "supplier.conf"; +} +else{ + $supplier_conf_file = "../supplier.conf"; +} +$args1 = "-ORBSvcConf $supplier_conf_file -ecname ec1 -gateway ec2 -iorfile $ec1iorfile"; +$S1 = new PerlACE::Process("EchoEventSupplier", "$arg_ns_ref $args1"); +$S1->Spawn(); + +$args2 = "-ORBSvcConf $supplier_conf_file -ecname ec2 -gateway ec1 -iorfile $ec2iorfile"; +$S2 = new PerlACE::Process("EchoEventSupplier", "$arg_ns_ref $args2"); +$S2->Spawn(); + +if ((PerlACE::waitforfile_timed ($ec1iorfile, 15) == -1) || + (PerlACE::waitforfile_timed ($ec2iorfile, 2) == -1)) { + print STDERR "ERROR: cannot find files <$ec1iorfile> and <$ec2iorfile>\n"; + $NS->Kill(); + $S1->Kill(); + $S2->Kill(); + exit 1; +} + +$args3 = "-ecname ec1"; +$C1 = new PerlACE::Process("EchoEventConsumer", "$arg_ns_ref $args3"); +$C1->Spawn(); + +$args4 = "-ecname ec2"; +$C2 = new PerlACE::Process("EchoEventConsumer", "$arg_ns_ref $args4"); +$C2->Spawn(); + +if ($C1->WaitKill(60) == -1) { + print STDERR "consumer1 timedout \n"; + + $C2->Kill(); + $S1->Kill(); + $S2->Kill(); + $NS->Kill(); + + unlink $iorfile; + + exit 1; +} + +if ($C2->WaitKill(10) == -1) { + print STDERR "consumer2 timedout \n"; + + $S1->Kill(); + $S2->Kill(); + $NS->Kill(); + + unlink $iorfile; + + exit 1; +} + +$NS->Kill(); +$S1->Kill(); +$S2->Kill(); + +unlink $iorfile; +unlink $ec1iorfile; +unlink $ec2iorfile; + +exit 0; diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Federated/supplier.conf b/TAO/DevGuideExamples/EventServices/RTEC_Federated/supplier.conf new file mode 100644 index 00000000000..87ebf02489e --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Federated/supplier.conf @@ -0,0 +1,6 @@ +# $Id +# Use 5 dispatching threads and the rw wait strategy to resolve deadlock +# issues in the gateway at disconnect time. +static Resource_Factory "-ORBFlushingStrategy blocking" +static EC_Factory "-ECobserver basic -ECDispatching mt -ECDispatchingThreads 5" +static Client_Strategy_Factory "-ORBWaitStrategy rw -ORBTransportMuxStrategy exclusive" diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..c926df1a023 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumerMain.cpp @@ -0,0 +1,91 @@ +// EchoEventConsumerMain.cpp +// Main program for a PushConsumer of Echo events. + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const RtecEventComm::EventSourceID MY_SUPPLIER_ID_START = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventSourceID MY_SUPPLIER_ID_END = ACE_ES_EVENT_SOURCE_ANY + 3; +const RtecEventComm::EventType MY_EVENT_START = ACE_ES_EVENT_UNDEFINED + 1; +const RtecEventComm::EventType MY_EVENT_END = ACE_ES_EVENT_UNDEFINED + 4; + +const int EVENT_LIMIT = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = + CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str("EventService"); + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "EchoEventConsumerMain.cpp: Found the EchoEventChannel." << std::endl; + + // Obtain a reference to the consumer administration object. + RtecEventChannelAdmin::ConsumerAdmin_var admin = ec->for_consumers(); + + // Obtain a reference to the push supplier proxy. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + admin->obtain_push_supplier(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventConsumer_i servant and register it + // with the RootPOA + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENT_LIMIT); + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushConsumer_var consumer = + RtecEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect as a consumer. + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (1); + qos.insert_type (MY_EVENT_START, // Event Type + 0); // handle to the rt_info + supplier->connect_push_consumer (consumer.in (), + qos.get_ConsumerQOS ()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "EchoEventConsumerMain.cpp: Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + // Disconnect the ProxyPushSupplier. + orb->destroy(); + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..08bb270168b --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.cpp @@ -0,0 +1,56 @@ +// EchoEventConsumer_i.cpp +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <ace/OS_NS_stdio.h> +#include <sstream> + +// Constructor duplicates the ORB reference. +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ + // Nothing to do. +} + +// Implement the push() operation. +void EchoEventConsumer_i::push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException) +{ + // Loop through the events, looking for shutdown events. + for (u_int i = 0; i < events.length (); ++i) { + //ACE_OS::printf("."); + // Extract event data from the any. + const char* eventData; + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + if (events[i].data.any_value >>= eventData) { + out << " text: " << eventData; + } + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Implement the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.h new file mode 100644 index 00000000000..c59aa7944a8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventConsumer_i.h @@ -0,0 +1,31 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/RtecEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_RtecEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException); + + virtual void disconnect_push_consumer() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..5232c12ffd0 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplierMain.cpp @@ -0,0 +1,118 @@ +// EchoEventSupplierMain.cpp +// Main program for a PushSupplier of Echo events. + +#include "EchoEventSupplier_i.h" + +#include <orbsvcs/RtecEventCommC.h> // for Event Communication interfaces +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> // for Naming Service interfaces + +#include <iostream> +const RtecEventComm::EventSourceID MY_SOURCE_ID_START = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventSourceID MY_SOURCE_ID_END = ACE_ES_EVENT_SOURCE_ANY + 3; +const RtecEventComm::EventType MY_EVENT_START = ACE_ES_EVENT_UNDEFINED + 1; +const RtecEventComm::EventType MY_EVENT_END = ACE_ES_EVENT_UNDEFINED + 4; + +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Get the Event Channel using Naming Services + obj = root_context->resolve_str("EventService"); + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not resolve EchoEventChannel." << std::endl; + return 1; + } + + // Get a SupplierAdmin object from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers(); + + // Get a ProxyPushConsumer from the SupplierAdmin. + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + admin->obtain_push_consumer(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventConsumer_i servant. + EchoEventSupplier_i servant(orb.in()); + + // Register it with the RootPOA. + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushSupplier_var supplier = + RtecEventComm::PushSupplier::_narrow(supplier_obj.in()); + + // Publish the events the supplier provides. + ACE_SupplierQOS_Factory qos; + RtecEventComm::EventSourceID supplier_id = MY_SOURCE_ID_START; + for (; supplier_id <= MY_SOURCE_ID_END; ++supplier_id) { + RtecEventComm::EventType event_type = MY_EVENT_START; + for (; event_type <= MY_EVENT_END; ++event_type) + { + qos.insert (supplier_id, event_type, + 0, // handle to the rt_info structure + 1); // number of calls + } + } + + // Connect as a supplier of the published events. + consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ()); + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup("Hello, world."); + + // Create an event set for one event + RtecEventComm::EventSet event (1); + event.length (1); + + // Initialize event header. + event[0].header.source = MY_SOURCE_ID_START; + event[0].header.type = MY_EVENT_START; + + // Initialize data fields in event. + event[0].data.any_value <<= eventData; + + std::cout << "Supplier starting sending events.\n"; + while (1) { + + event[0].header.type++; + if (event[0].header.type > MY_EVENT_END) { + event[0].header.type = MY_EVENT_START; + } + event[0].header.source++; + if (event[0].header.source > MY_SOURCE_ID_END) { + event[0].header.source = MY_SOURCE_ID_START; + } + consumer->push (event); + + ACE_Time_Value tv(0, EVENT_DELAY_MS * 1000); + orb->run(tv); + } + + orb->destroy(); + + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.cpp new file mode 100644 index 00000000000..112e33922c5 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.cpp @@ -0,0 +1,25 @@ +// EchoEventSupplier_i.cpp +// Implements a PushSupplier. + +#include "EchoEventSupplier_i.h" +#include <tao/PortableServer/PS_CurrentC.h> + +// Constructor duplicates the ORB reference. +EchoEventSupplier_i::EchoEventSupplier_i(CORBA::ORB_ptr orb) + : orb_(CORBA::ORB::_duplicate(orb)) +{ + // Nothing to do. +} + +// Override the disconnect_push_Supplier() operation. +void EchoEventSupplier_i::disconnect_push_supplier() + throw(CORBA::SystemException) +{ + + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.h b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.h new file mode 100644 index 00000000000..b06f44cf8e3 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/EchoEventSupplier_i.h @@ -0,0 +1,22 @@ +// EchoEventSupplier_i.h +// Implements a PushSupplier. + +#ifndef _EchoEventSupplier_i_h_ +#define _EchoEventSupplier_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushSupplier + +class EchoEventSupplier_i : public virtual POA_RtecEventComm::PushSupplier +{ + public: + // Constructor + EchoEventSupplier_i(CORBA::ORB_ptr orb); + + virtual void disconnect_push_supplier() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; +}; + +#endif // _EchoEventSupplier_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/README b/TAO/DevGuideExamples/EventServices/RTEC_Filter/README new file mode 100644 index 00000000000..c1cd7a23fad --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/README @@ -0,0 +1,57 @@ +Real-Time Event Service + + +File: DevGuideExamples/EventServices/RTEC_Filter/README + + +This directory contains some example code for the filters, correlations, +and timeouts features of the RT Event service. For now the code is all +in the EchoEventConsumerMain.cpp file. Simply, comment out the filters/ +timeouts/correlations required, remake, and run as below. + +------------------------------------------------------------------------- + +Note: To test this, you must first run the Naming Service and the +Event Service, e.g.: + +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior& +$TAO_ROOT/orbsvcs/Event_Service/Event_Service -ORBSvcConf ec.conf -ORBInitRef NameService=file://ns.ior& + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier -ORBInitRef NameService=file://ns.ior + + It will publish an event to the event channel every 1 second. + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ORBInitRef NameService=file://ns.ior + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the RtecEventComm::PushConsumer interface. + + + +Exeuction via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/RTEC_Filter.mpc b/TAO/DevGuideExamples/EventServices/RTEC_Filter/RTEC_Filter.mpc new file mode 100644 index 00000000000..a67c5f1c819 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/RTEC_Filter.mpc @@ -0,0 +1,20 @@ +project(*Supplier): namingexe, rteventexe, { + exename = EchoEventSupplier + includes += ../common + + Source_Files { + EchoEventSupplierMain.cpp + EchoEventSupplier_i.cpp + } +} + +project(*Consumer): namingexe, rteventexe, { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/ec.conf b/TAO/DevGuideExamples/EventServices/RTEC_Filter/ec.conf new file mode 100644 index 00000000000..0d41159fbd9 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/ec.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECFiltering prefix" diff --git a/TAO/DevGuideExamples/EventServices/RTEC_Filter/run_test.pl b/TAO/DevGuideExamples/EventServices/RTEC_Filter/run_test.pl new file mode 100644 index 00000000000..c4fa28f1ec1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_Filter/run_test.pl @@ -0,0 +1,61 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +$nsiorfile = PerlACE::LocalFile ("ns.ior"); +$esiorfile = PerlACE::LocalFile ("es.ior"); +$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile"; + +unlink $nsiorfile; +unlink $esiorfile; + +# start Naming Service + +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "-o $nsiorfile"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$nsiorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start Event Service +$EventService = "$ENV{TAO_ROOT}/orbsvcs/Event_Service/Event_Service"; +$ES = new PerlACE::Process($EventService, "-o $esiorfile $arg_ns_ref"); +$ES->Spawn(); +if (PerlACE::waitforfile_timed ($esiorfile, 15) == -1) { + print STDERR "ERROR: cannot find file <$esiorfile>\n"; + $ES->Kill(); + unlink $nsiorfile; + exit 1; +} + +# start EchoEventSupplier +$S = new PerlACE::Process("EchoEventSupplier", $arg_ns_ref); +$S->Spawn(); + +# start EchoEventConsumer +$C = new PerlACE::Process("EchoEventConsumer", $arg_ns_ref); +$C->Spawn(); + +$CRET = $C->WaitKill(60); +$S->Kill(); +$NS->Kill(); +$ES->Kill(); + +unlink $nsiorfile; +unlink $esiorfile; + +if ($CRET != 0) { + print STDERR "ERROR: Client returned <$CRET>\n"; + exit 1 ; +} + +exit 0; + + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumerMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumerMain.cpp new file mode 100644 index 00000000000..e3a6d584cdc --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumerMain.cpp @@ -0,0 +1,103 @@ +// EchoEventConsumerMain.cpp +// Main program for a PushConsumer of Echo events. + +#include "EchoEventConsumer_i.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> + +#include <iostream> +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +const int EVENT_LIMIT = 20; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + const char* ecname = "EventService"; + for (int i = 0; argv[i] != 0; i++) { + if (strcmp(argv[i], "-ecname") == 0) { + if (argv[i+1] != 0) { + ecname = argv[i+1]; + } else { + std::cerr << "Missing Event channel name" << std::endl; + } + } + } + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Find the EchoEventChannel. + obj = root_context->resolve_str(ecname); + + // Downcast the object reference to an EventChannel reference. + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(obj.in()); + if (CORBA::is_nil(ec.in())) { + std::cerr << "Could not narrow EchoEventChannel." << std::endl; + return 1; + } + std::cout << "EchoEventConsumerMain.cpp: Found the EchoEventChannel." << std::endl; + + // Obtain a reference to the consumer administration object. + RtecEventChannelAdmin::ConsumerAdmin_var admin = ec->for_consumers(); + + // Obtain a reference to the push supplier proxy. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + admin->obtain_push_supplier(); + + // Get the RootPOA. + obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in()); + + // Instantiate an EchoEventConsumer_i servant. + EchoEventConsumer_i servant(orb.in(), supplier.in(), EVENT_LIMIT); + + // Register it with the RootPOA. + PortableServer::ObjectId_var oid = poa->activate_object(&servant); + CORBA::Object_var consumer_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushConsumer_var consumer = + RtecEventComm::PushConsumer::_narrow(consumer_obj.in()); + + // Connect as a consumer. + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert (MY_SOURCE_ID, // Source ID + MY_EVENT_TYPE, // Event Type + 0); // handle to the rt_info + supplier->connect_push_consumer (consumer.in (), + qos.get_ConsumerQOS ()); + + // Activate the POA via its POAManager. + PortableServer::POAManager_var poa_manager = poa->the_POAManager(); + poa_manager->activate(); + + std::cout << "EchoEventConsumerMain.cpp: Ready to receive events..." << std::endl; + + // Enter the ORB event loop. + orb->run(); + + // If we have reached this, we must be shutting down... + // Disconnect the ProxyPushSupplier. + orb->destroy(); + + return 0; + + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.cpp new file mode 100644 index 00000000000..08bb270168b --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.cpp @@ -0,0 +1,56 @@ +// EchoEventConsumer_i.cpp +// Implements a PushConsumer. + +#include "EchoEventConsumer_i.h" +#include <tao/PortableServer/PS_CurrentC.h> +#include <ace/OS_NS_stdio.h> +#include <sstream> + +// Constructor duplicates the ORB reference. +EchoEventConsumer_i::EchoEventConsumer_i( + CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit) + : orb_(CORBA::ORB::_duplicate(orb)) + , supplier_(RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(supplier)) + , event_limit_(event_limit) +{ + // Nothing to do. +} + +// Implement the push() operation. +void EchoEventConsumer_i::push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException) +{ + // Loop through the events, looking for shutdown events. + for (u_int i = 0; i < events.length (); ++i) { + //ACE_OS::printf("."); + // Extract event data from the any. + const char* eventData; + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + if (events[i].data.any_value >>= eventData) { + out << " text: " << eventData; + } + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + } + if (--event_limit_ <= 0) { + supplier_->disconnect_push_supplier(); + orb_->shutdown(0); + } +} + +// Implement the disconnect_push_consumer() operation. +void EchoEventConsumer_i::disconnect_push_consumer() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.h b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.h new file mode 100644 index 00000000000..c59aa7944a8 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventConsumer_i.h @@ -0,0 +1,31 @@ +// EchoEventConsumer_i.h +// Implements a PushConsumer. + +#ifndef _EchoEventConsumer_i_h_ +#define _EchoEventConsumer_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushConsumer +#include <orbsvcs/RtecEventChannelAdminC.h> + +class EchoEventConsumer_i : public virtual POA_RtecEventComm::PushConsumer +{ + public: + // Constructor + EchoEventConsumer_i(CORBA::ORB_ptr orb, + RtecEventChannelAdmin::ProxyPushSupplier_ptr supplier, + int event_limit); + + // Override operations from PushConsumer interface. + virtual void push(const RtecEventComm::EventSet& events) + throw(CORBA::SystemException); + + virtual void disconnect_push_consumer() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_; + int event_limit_; +}; + +#endif // _EchoEventConsumer_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplierMain.cpp b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplierMain.cpp new file mode 100644 index 00000000000..1ea25b53d15 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplierMain.cpp @@ -0,0 +1,243 @@ +// EchoEventSupplierMain.cpp +// Main program for a PushSupplier of Echo events. + +#include "EchoEventSupplier_i.h" +#include "SimpleAddressServer.h" + +#include <orbsvcs/RtecEventCommC.h> +#include <orbsvcs/RtecEventChannelAdminC.h> +#include <orbsvcs/Time_Utilities.h> +#include <orbsvcs/Event_Utilities.h> +#include <orbsvcs/CosNamingC.h> +#include <orbsvcs/Event/EC_Event_Channel.h> +#include <orbsvcs/Event/EC_Default_Factory.h> +#include <orbsvcs/Event/ECG_Mcast_EH.h> +#include <orbsvcs/Event/ECG_UDP_Sender.h> +#include <orbsvcs/Event/ECG_UDP_Receiver.h> +#include <orbsvcs/Event/ECG_UDP_Out_Endpoint.h> +#include <orbsvcs/Event/ECG_UDP_EH.h> + +#include <tao/ORB_Core.h> + +#include <ace/Auto_Ptr.h> +#include <iostream> +#include <fstream> + +const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1; +const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1; + +const int EVENT_DELAY_MS = 10; + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + // Initialize the EC Factory so we can customize the EC + TAO_EC_Default_Factory::init_svcs (); + + // Initialize the ORB. + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + const char* ecname = "EventService"; + const char* address = "localhost"; + const char* iorfile = 0; + u_short port = 12345; + u_short listenport = 12345; + int mcast = 1; + + for (int i = 0; argv[i] != 0; i++) { + if (strcmp(argv[i], "-ecname") == 0) { + if (argv[i+1] != 0) { + i++; + ecname = argv[i]; + } else { + std::cerr << "Missing Event channel name" << std::endl; + } + } else if (strcmp(argv[i], "-address") == 0) { + if (argv[i+1] != 0) { + i++; + address = argv[i]; + } else { + std::cerr << "Missing address" << std::endl; + } + } else if (strcmp(argv[i], "-port") == 0) { + if (argv[i+1] != 0) { + i++; + port = ACE_OS::atoi(argv[i]); + } else { + std::cerr << "Missing port" << std::endl; + } + } else if (strcmp(argv[i], "-listenport") == 0) { + if (argv[i+1] != 0) { + i++; + listenport = ACE_OS::atoi(argv[i]); + } else { + std::cerr << "Missing port" << std::endl; + } + } else if (strcmp(argv[i], "-iorfile") == 0) { + if (argv[i+1] != 0) { + i++; + iorfile = argv[i]; + } + } else if (strcmp(argv[i], "-udp") == 0) { + mcast = 0; + } + } + + // Get the POA + CORBA::Object_var object = orb->resolve_initial_references ("RootPOA"); + PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ()); + PortableServer::POAManager_var poa_manager = poa->the_POAManager (); + poa_manager->activate (); + + // Create a local event channel and register it + TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ()); + TAO_EC_Event_Channel ec_impl (attributes); + ec_impl.activate (); + PortableServer::ObjectId_var oid = poa->activate_object(&ec_impl); + CORBA::Object_var ec_obj = poa->id_to_reference(oid.in()); + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in()); + + // Find the Naming Service. + CORBA::Object_var obj = orb->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Bind the Event Channel using Naming Services + CosNaming::Name_var name = root_context->to_name(ecname); + root_context->rebind(name.in(), ec.in()); + + // Get a proxy push consumer from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers(); + RtecEventChannelAdmin::ProxyPushConsumer_var consumer = + admin->obtain_push_consumer(); + + // Instantiate an EchoEventSupplier_i servant. + EchoEventSupplier_i servant(orb.in()); + + // Register it with the RootPOA. + oid = poa->activate_object(&servant); + CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in()); + RtecEventComm::PushSupplier_var supplier = + RtecEventComm::PushSupplier::_narrow(supplier_obj.in()); + + // Connect to the EC. + ACE_SupplierQOS_Factory qos; + qos.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1); + consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ()); + + // Initialize the address server with the desired address. + // This will be used by the sender object and the multicast + // receiver. + ACE_INET_Addr send_addr (port, address); + SimpleAddressServer addr_srv_impl (send_addr); + + PortableServer::ObjectId_var addr_srv_oid = + poa->activate_object(&addr_srv_impl); + CORBA::Object_var addr_srv_obj = poa->id_to_reference(addr_srv_oid.in()); + RtecUDPAdmin::AddrServer_var addr_srv = + RtecUDPAdmin::AddrServer::_narrow(addr_srv_obj.in()); + + // Create and initialize the sender object + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = + TAO_ECG_UDP_Sender::create(); + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) { + std::cerr << "Cannot open send endpoint" << std::endl; + return 1; + } + + // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the sender will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + -1); + sender->init (ec.in (), addr_srv.in (), clone); + + // Setup the subscription and connect to the EC + ACE_ConsumerQOS_Factory cons_qos_fact; + cons_qos_fact.start_disjunction_group (); + cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0); + RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS (); + sender->connect (sub); + + // Create and initialize the receiver + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = + TAO_ECG_UDP_Receiver::create(); + + // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the receiver will + // attempt to delete endpoint during shutdown. + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + -1); + receiver->init (ec.in (), clone, addr_srv.in ()); + + // Setup the registration and connect to the event channel + ACE_SupplierQOS_Factory supp_qos_fact; + supp_qos_fact.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1); + RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS (); + receiver->connect (pub); + + // Create the appropriate event handler and register it with the reactor + auto_ptr<ACE_Event_Handler> eh; + if (mcast) { + auto_ptr<TAO_ECG_Mcast_EH> mcast_eh(new TAO_ECG_Mcast_EH (receiver.in())); + mcast_eh->reactor (orb->orb_core ()->reactor ()); + mcast_eh->open (ec.in()); + ACE_AUTO_PTR_RESET(eh,mcast_eh.release(),ACE_Event_Handler); + //eh.reset(mcast_eh.release()); + } else { + auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in())); + udp_eh->reactor (orb->orb_core ()->reactor ()); + ACE_INET_Addr local_addr (listenport); + if (udp_eh->open (local_addr) == -1) { + std::cerr << "Cannot open EH" << std::endl; + } + ACE_AUTO_PTR_RESET(eh,udp_eh.release(),ACE_Event_Handler); + //eh.reset(udp_eh.release()); + } + + // Create an event (just a string in this case). + const CORBA::String_var eventData = CORBA::string_dup(ecname); + + // Create an event set for one event + RtecEventComm::EventSet event (1); + event.length (1); + + // Initialize event header. + event[0].header.source = MY_SOURCE_ID; + event[0].header.ttl = 1; + event[0].header.type = MY_EVENT_TYPE; + + // Initialize data fields in event. + event[0].data.any_value <<= eventData; + + if (iorfile != 0) { + CORBA::String_var str = orb->object_to_string( ec.in() ); + std::ofstream iorFile( iorfile ); + iorFile << str.in() << std::endl; + iorFile.close(); + } + std::cout << "Starting main loop" << std::endl; + + const int EVENT_DELAY_MS = 10; + + while (1) { + consumer->push (event); + + ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS); + orb->run(tv); + } + + orb->destroy(); + return 0; + } + catch(const CORBA::Exception& exc) + { + std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl; + } + return 1; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.cpp b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.cpp new file mode 100644 index 00000000000..c2339c37ad1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.cpp @@ -0,0 +1,24 @@ +// EchoEventSupplier_i.cpp +// Implements a PushSupplier. + +#include "EchoEventSupplier_i.h" +#include <tao/PortableServer/PS_CurrentC.h> + +// Constructor duplicates the ORB reference. +EchoEventSupplier_i::EchoEventSupplier_i(CORBA::ORB_ptr orb) + : orb_(CORBA::ORB::_duplicate(orb)) +{ + // Nothing to do. +} + +// Override the disconnect_push_Supplier() operation. +void EchoEventSupplier_i::disconnect_push_supplier() + throw(CORBA::SystemException) +{ + // Deactivate this object. + CORBA::Object_var obj = orb_->resolve_initial_references("POACurrent"); + PortableServer::Current_var current = PortableServer::Current::_narrow(obj.in()); + PortableServer::POA_var poa = current->get_POA(); + PortableServer::ObjectId_var objectId = current->get_object_id(); + poa->deactivate_object(objectId.in()); +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.h b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.h new file mode 100644 index 00000000000..b06f44cf8e3 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/EchoEventSupplier_i.h @@ -0,0 +1,22 @@ +// EchoEventSupplier_i.h +// Implements a PushSupplier. + +#ifndef _EchoEventSupplier_i_h_ +#define _EchoEventSupplier_i_h_ + +#include <orbsvcs/RtecEventCommS.h> // for POA_CosEventComm::PushSupplier + +class EchoEventSupplier_i : public virtual POA_RtecEventComm::PushSupplier +{ + public: + // Constructor + EchoEventSupplier_i(CORBA::ORB_ptr orb); + + virtual void disconnect_push_supplier() + throw(CORBA::SystemException); + + private: + CORBA::ORB_var orb_; +}; + +#endif // _EchoEventSupplier_i_h_ diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/README b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/README new file mode 100644 index 00000000000..27b6aaee1e1 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/README @@ -0,0 +1,138 @@ +Real-Time Event Service + + +File: DevGuideExamples/EventServices/RTEC_MCast_Federated/README + + +This directory contains an example that shows how to create and +federate real-time event channels using the classes in EC_Gateway_UDP.h. +Depending on the options, it will use either UDP or multicast to link +the event channels. + +------------------------------------------------------------------------- + +Note: To run this example, you must first run the Naming Service, e.g.: + + $TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o ns.ior& + +------------------------------------------------------------------------- + +To start the supplier/EC processes on a single host and federate them +using UDP, do the following (after starting the Naming_Service): + + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -udp -ecname name1 -port 1233 -listenport 1234 + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -udp -ecname name2 -port 1234 -listenport 1233 + +This will start two EC/supplier processes on the same node. One (name1) +will listen on port 1234 and send on port 1233. The other (name2) will +do the opposite. You should be able to use any available port as long as +the port and listenport options are symmetric (listenport of one process +must be the port of the other). The -address option can be used to +specify a supplier/EC process on another host. Here is an example of two +processes on different hosts: + +On node1: + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -udp -ecname name1 -port 1233 -listenport 1234 -address node2 +On node2: + ./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -udp -ecname name2 -port 1234 -listenport 1233 -address node1 + +When using UDP, this example is limited to federating two ECs. + +------------------------------------------------------------------------- + +To start the supplier/EC processes and federate them using multicast, do the +following (after starting the Naming_Service): + +./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -ORBSvcConf supplier.conf -ecname name1 -address 224.9.9.2 -port 1234 +./EchoEventSupplier -ORBInitRef NameService=file://ns.ior -ORBSvcConf supplier.conf -ecname name2 -address 224.9.9.2 -port 1234 + +The -address and -port options should be passed a valid and available +multicast address and port. + +------------------------------------------------------------------------- + +To start the consumers, simply do the following (this works the same for +both types of federations): + +./EchoEventConsumer -ORBInitRef NameService=file://ns.ior -ecname name1 +./EchoEventConsumer -ORBInitRef NameService=file://ns.ior -ecname name2 + +It may be easiest to start these in separate windows. Each consumer +connects to one EC (specified by the -ecname option). You should see +events from both suppliers on each event channel (each supplier passes +events containing with the name of the EC they are using). + +------------------------------------------------------------------------- + +EchoEventSupplerMain.cpp + + Main program for a PushSupplier. + + EchoEventSupplier [ -ORBSvcConf supplier.conf ] [ -udp ] -ecname <name> + [ -address <address> ] [ -port <port> ] + [ -listenport <lport> ] + + This will create a local RTEC event channel and bind it under + the root context of the naming service with the name <name>. + It will also federate with remote event channels specified via + the other options. By default, it uses multicast to federate + the ECs (specifying -udp forces use of UDP). <address> is + the address of the remote EC when using UDP and the multicast + address when using multicast. <port> is the port to send + to when using UDP and the multicast port when using multicast. + <lport> is the port to listen on for UDP (and not used by + multicast. You must pass -ORBSvcConf supplier.conf when + using multicast so as to enable Observers. + + After initializing the local event channel and setting up the + connection for the federation, it publishes an event to the + local event channel every 10 milliseconds. This event will + contain the string <name> in the any_value field. + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumerMain.cpp + + Main program for a PushConsumer. + + To run it: + + EchoEventConsumer -ecname <name> + + This will look for an event channel bound to <name> in the Root context + of the Naming Service. It will consume events from this channel and + print the type, source, and string contents contained in any_value. + + Use Control-C to kill the process. + +------------------------------------------------------------------------- + +EchoEventConsumer_i.{h,cpp} + + Call which implements the RtecEventComm::PushConsumer interface. + +------------------------------------------------------------------------- + +SimpleAddressServer.{h,cpp} + + This is a servant class that implements the RtecUDPAdmin::AddrServer + interface. It is used by the UDP/multicast senders to return an + address that they will send out particular events on. It is also + used by the multicast event handler, to determine which addresses + to listen to based on consumer subscriptions. This simple + implementation always returns the same address. + + + +Exeuction via Perl Script +------------------------- + +A Perl script has been created to automate the steps shown +above. This script can be run via the following command: + +./run_test.pl + +By default, this script uses multicast; pass -udp to the +script to use udp. diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/RTEC_MCast_Federated.mpc b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/RTEC_MCast_Federated.mpc new file mode 100644 index 00000000000..373818b574d --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/RTEC_MCast_Federated.mpc @@ -0,0 +1,21 @@ +project(*_Dev_Supplier): namingexe, rteventexe, rtevent_serv { + exename = EchoEventSupplier + includes += ../common + + Source_Files { + EchoEventSupplierMain.cpp + EchoEventSupplier_i.cpp + SimpleAddressServer.cpp + } +} + +project(*_Dev_Consumer): namingexe, rteventexe { + exename = EchoEventConsumer + includes += ../common + + Source_Files { + EchoEventConsumerMain.cpp + EchoEventConsumer_i.cpp + } +} + diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.cpp b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.cpp new file mode 100644 index 00000000000..8e593774e67 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.cpp @@ -0,0 +1,41 @@ +// SimpleAddressServer.cpp + +#include "SimpleAddressServer.h" +#include <ace/INET_Addr.h> +#include <ace/OS_NS_string.h> + +SimpleAddressServer::SimpleAddressServer (const ACE_INET_Addr& address) { +#if defined (ACE_HAS_IPV6) + if (address.get_type() == PF_INET6) + { + RtecUDPAdmin::UDP_Addr_v6 v6; + sockaddr_in6 *in6 = + reinterpret_cast<sockaddr_in6 *>(address.get_addr()); + ACE_OS::memcpy (v6.ipaddr,&in6->sin6_addr,16); + v6.port = address.get_port_number(); + this->address_.v6_addr (v6); + return; + } +#endif /* ACE_HAS_IPV6 */ + RtecUDPAdmin::UDP_Addr v4; + v4.ipaddr = address.get_ip_address (); + v4.port = address.get_port_number (); + this->address_.v4_addr (v4); +} + +void +SimpleAddressServer::get_addr (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException) { + if (this->address_._d() == RtecUDPAdmin::Rtec_inet6) + throw CORBA::DATA_CONVERSION(0, CORBA::COMPLETED_YES); + address = this->address_.v4_addr(); +} + +void +SimpleAddressServer::get_address(const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Address& address) + throw (CORBA::SystemException) +{ + address = this->address_; +} diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.h b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.h new file mode 100644 index 00000000000..ae92a697c0e --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/SimpleAddressServer.h @@ -0,0 +1,27 @@ +// SimpleAddressServer.h + +#ifndef SIMPLEADDRESSSERVER_H +#define SIMPLEADDRESSSERVER_H + +#include <orbsvcs/RtecUDPAdminS.h> + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL +class ACE_INET_Addr; +ACE_END_VERSIONED_NAMESPACE_DECL + +class SimpleAddressServer : public POA_RtecUDPAdmin::AddrServer { +public: + SimpleAddressServer (const ACE_INET_Addr& address); + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException); + + virtual void get_address(const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Address& address) + throw (CORBA::SystemException); + +private: + RtecUDPAdmin::UDP_Address address_; +}; + +#endif diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/run_test.pl b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/run_test.pl new file mode 100644 index 00000000000..350558eb25b --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/run_test.pl @@ -0,0 +1,144 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Env (ACE_ROOT); +use lib "$ACE_ROOT/bin"; +use PerlACE::Run_Test; + +if (!defined $ENV{TAO_ROOT}) { + $ENV{TAO_ROOT} = "$ENV{ACE_ROOT}/TAO"; +} + +sub usage() { + print "Usage:\n"; + print " run_test [-h] [-debug]\n\n"; + print " -udp -- Federate using udp\n"; + print " -mcast -- Federate using multicast (the default)\n"; + print " -h -- Prints this information\n"; + print " -debug -- Sets the debug flag for the test\n"; + exit; +} + +my $udp = 0; +my $i = 0; +my $flags = ""; +while ($i <= $#ARGV) { + if ($ARGV[$i] eq "-h" || $ARGV[$i] eq "-help" || + $ARGV[$i] eq "--help" || $ARGV[$i] eq "-?") { + usage (); + } elsif ($ARGV[$i] eq "-debug") { + $flags .= " -ORBDebugLevel 10 "; + } elsif ($ARGV[$i] eq "-udp") { + $udp = 1; + } elsif ($ARGV[$i] eq "-mcast") { + $udp = 0; + } else { + print "ERROR: Unknown Option: ".$ARGV[$i]."\n\n"; + usage (); + } + $i++; +} + +if ($udp) { + print "Using UDP to link the event channels.\n\n"; +} else { + print "Using multicast to link the event channels.\n\n"; +} + +$nsiorfile = PerlACE::LocalFile ("ns.ior"); +$ec1iorfile = PerlACE::LocalFile ("ec1.ior"); +$ec2iorfile = PerlACE::LocalFile ("ec2.ior"); + +$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile"; +$end_point = "-ORBEndpoint iiop://localhost"; +$ns_port = PerlACE::random_port(); + +unlink $nsiorfile; +unlink $ec1iorfile; +unlink $ec2iorfile; + +# start Naming Service + +$NameService = "$ENV{TAO_ROOT}/orbsvcs/Naming_Service/Naming_Service"; +$NS = new PerlACE::Process($NameService, "$flags -o $nsiorfile $end_point:$ns_port"); +$NS->Spawn(); +if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) { + print STDERR "ERROR: cannot find file <$nsiorfile>\n"; + $NS->Kill(); + exit 1; +} + +# start EchoEventSupplier +my($port1) = 10001 + PerlACE::uniqueid() ; +my($port2) = 10001 + PerlACE::uniqueid() ; +my($mport) = 10001 + PerlACE::uniqueid() ; +if ( -e "supplier.conf" ) +{ + $supplier_conf_file = "supplier.conf"; +} +else{ + $supplier_conf_file = "../supplier.conf"; +} + +$args1 = "$flags $arg_ns_ref -ORBSvcConf $supplier_conf_file $end_point -iorfile $ec1iorfile"; +if ($udp) { + $args1 .= " -udp -ecname ec1 -port $port1 -listenport $port2 "; +} else { + $args1 .= " -ecname ec1 -address 224.9.9.2 -port $mport "; +} +$S1 = new PerlACE::Process("EchoEventSupplier", $args1); +$S1->Spawn(); + +$args2 = "$flags $arg_ns_ref -ORBSvcConf $supplier_conf_file $end_point -iorfile $ec2iorfile"; +if ($udp) { + $args2 .= " -udp -ecname ec2 -port $port2 -listenport $port1 "; +} else { + $args2 .= " -ecname ec2 -address 224.9.9.2 -port $mport "; +} +$S2 = new PerlACE::Process("EchoEventSupplier", $args2); +$S2->Spawn(); + +if ((PerlACE::waitforfile_timed ($ec1iorfile, 15) == -1) || + (PerlACE::waitforfile_timed ($ec2iorfile, 2) == -1)) { + print STDERR "ERROR: cannot find files <$ec1iorfile> and <$ec2iorfile>\n"; + $NS->Kill(); + $S1->Kill(); + $S2->Kill(); + exit 1; +} + +$args3 = "$flags $arg_ns_ref -ecname ec1 $end_point"; +$C1 = new PerlACE::Process("EchoEventConsumer", $args3); +$C1->Spawn(); + + +$args4 = "$flags $arg_ns_ref -ecname ec2 $end_point"; +$C2 = new PerlACE::Process("EchoEventConsumer", $args4); +$C2->Spawn(); + +if ($C1->WaitKill(30) == -1) { + $S1->Kill(); + $S2->Kill(); + $NS->Kill(); + $C2->Kill(); + + exit 1; +} + +if ($C2->WaitKill(5) == -1) { + $S1->Kill(); + $S2->Kill(); + $NS->Kill(); + exit 1; +} + +$NS->Kill(); +$S1->Kill(); +$S2->Kill(); + +unlink $nsiorfile; +unlink $ec1iorfile; +unlink $ec2iorfile; + +exit 0; diff --git a/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/supplier.conf b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/supplier.conf new file mode 100644 index 00000000000..d9eeea24f43 --- /dev/null +++ b/TAO/DevGuideExamples/EventServices/RTEC_MCast_Federated/supplier.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECobserver basic" |