summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp')
-rw-r--r--TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp245
1 files changed, 245 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp b/TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp
new file mode 100644
index 00000000000..316493b3f4c
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Mcast/RTEC_MCast_Federated/EchoEventSupplierMain.cpp
@@ -0,0 +1,245 @@
+// $Id$
+
+// EchoEventSupplierMain.cpp
+// Main program for a PushSupplier of Echo events.
+
+#include "EchoEventSupplier_i.h"
+#include "SimpleAddressServer.h"
+
+#include "orbsvcs/RtecEventCommC.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "orbsvcs/Time_Utilities.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/CosNamingC.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event/ECG_Mcast_EH.h"
+#include "orbsvcs/Event/ECG_UDP_Sender.h"
+#include "orbsvcs/Event/ECG_UDP_Receiver.h"
+#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
+#include "orbsvcs/Event/ECG_UDP_EH.h"
+
+#include "tao/ORB_Core.h"
+
+#include "ace/Auto_Ptr.h"
+#include <iostream>
+#include <fstream>
+
+const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1;
+const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1;
+
+const int EVENT_DELAY_MS = 10;
+
+int main (int argc, char* argv[])
+{
+ try
+ {
+ // Initialize the EC Factory so we can customize the EC
+ TAO_EC_Default_Factory::init_svcs ();
+
+ // Initialize the ORB.
+ CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
+
+ const char* ecname = "EventService";
+ const char* address = "localhost";
+ const char* iorfile = 0;
+ u_short port = 12345;
+ u_short listenport = 12345;
+ int mcast = 1;
+
+ for (int i = 0; argv[i] != 0; i++) {
+ if (strcmp(argv[i], "-ecname") == 0) {
+ if (argv[i+1] != 0) {
+ i++;
+ ecname = argv[i];
+ } else {
+ std::cerr << "Missing Event channel name" << std::endl;
+ }
+ } else if (strcmp(argv[i], "-address") == 0) {
+ if (argv[i+1] != 0) {
+ i++;
+ address = argv[i];
+ } else {
+ std::cerr << "Missing address" << std::endl;
+ }
+ } else if (strcmp(argv[i], "-port") == 0) {
+ if (argv[i+1] != 0) {
+ i++;
+ port = ACE_OS::atoi(argv[i]);
+ } else {
+ std::cerr << "Missing port" << std::endl;
+ }
+ } else if (strcmp(argv[i], "-listenport") == 0) {
+ if (argv[i+1] != 0) {
+ i++;
+ listenport = ACE_OS::atoi(argv[i]);
+ } else {
+ std::cerr << "Missing port" << std::endl;
+ }
+ } else if (strcmp(argv[i], "-iorfile") == 0) {
+ if (argv[i+1] != 0) {
+ i++;
+ iorfile = argv[i];
+ }
+ } else if (strcmp(argv[i], "-udp") == 0) {
+ mcast = 0;
+ }
+ }
+
+ // Get the POA
+ CORBA::Object_var object = orb->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ());
+ PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
+ poa_manager->activate ();
+
+ // Create a local event channel and register it
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ());
+ TAO_EC_Event_Channel ec_impl (attributes);
+ ec_impl.activate ();
+ PortableServer::ObjectId_var oid = poa->activate_object(&ec_impl);
+ CORBA::Object_var ec_obj = poa->id_to_reference(oid.in());
+ RtecEventChannelAdmin::EventChannel_var ec =
+ RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in());
+
+ // Find the Naming Service.
+ CORBA::Object_var obj = orb->resolve_initial_references("NameService");
+ CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in());
+
+ // Bind the Event Channel using Naming Services
+ CosNaming::Name_var name = root_context->to_name(ecname);
+ root_context->rebind(name.in(), ec.in());
+
+ // Get a proxy push consumer from the EventChannel.
+ RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers();
+ RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
+ admin->obtain_push_consumer();
+
+ // Instantiate an EchoEventSupplier_i servant.
+ EchoEventSupplier_i servant(orb.in());
+
+ // Register it with the RootPOA.
+ oid = poa->activate_object(&servant);
+ CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
+ RtecEventComm::PushSupplier_var supplier =
+ RtecEventComm::PushSupplier::_narrow(supplier_obj.in());
+
+ // Connect to the EC.
+ ACE_SupplierQOS_Factory qos;
+ qos.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
+ consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ());
+
+ // Initialize the address server with the desired address.
+ // This will be used by the sender object and the multicast
+ // receiver.
+ ACE_INET_Addr send_addr (port, address);
+ SimpleAddressServer addr_srv_impl (send_addr);
+
+ PortableServer::ObjectId_var addr_srv_oid =
+ poa->activate_object(&addr_srv_impl);
+ CORBA::Object_var addr_srv_obj = poa->id_to_reference(addr_srv_oid.in());
+ RtecUDPAdmin::AddrServer_var addr_srv =
+ RtecUDPAdmin::AddrServer::_narrow(addr_srv_obj.in());
+
+ // Create and initialize the sender object
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender =
+ TAO_ECG_UDP_Sender::create();
+ TAO_ECG_UDP_Out_Endpoint endpoint;
+ if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) {
+ std::cerr << "Cannot open send endpoint" << std::endl;
+ return 1;
+ }
+
+ // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
+ // If we don't clone our endpoint and pass &endpoint, the sender will
+ // attempt to delete endpoint during shutdown.
+ TAO_ECG_UDP_Out_Endpoint* clone;
+ ACE_NEW_RETURN (clone,
+ TAO_ECG_UDP_Out_Endpoint (endpoint),
+ -1);
+ sender->init (ec.in (), addr_srv.in (), clone);
+
+ // Setup the subscription and connect to the EC
+ ACE_ConsumerQOS_Factory cons_qos_fact;
+ cons_qos_fact.start_disjunction_group ();
+ cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0);
+ RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS ();
+ sender->connect (sub);
+
+ // Create and initialize the receiver
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver =
+ TAO_ECG_UDP_Receiver::create();
+
+ // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
+ // If we don't clone our endpoint and pass &endpoint, the receiver will
+ // attempt to delete endpoint during shutdown.
+ ACE_NEW_RETURN (clone,
+ TAO_ECG_UDP_Out_Endpoint (endpoint),
+ -1);
+ receiver->init (ec.in (), clone, addr_srv.in ());
+
+ // Setup the registration and connect to the event channel
+ ACE_SupplierQOS_Factory supp_qos_fact;
+ supp_qos_fact.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
+ RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS ();
+ receiver->connect (pub);
+
+ // Create the appropriate event handler and register it with the reactor
+ auto_ptr<ACE_Event_Handler> eh;
+ if (mcast) {
+ auto_ptr<TAO_ECG_Mcast_EH> mcast_eh(new TAO_ECG_Mcast_EH (receiver.in()));
+ mcast_eh->reactor (orb->orb_core ()->reactor ());
+ mcast_eh->open (ec.in());
+ ACE_AUTO_PTR_RESET(eh,mcast_eh.release(),ACE_Event_Handler);
+ //eh.reset(mcast_eh.release());
+ } else {
+ auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in()));
+ udp_eh->reactor (orb->orb_core ()->reactor ());
+ ACE_INET_Addr local_addr (listenport);
+ if (udp_eh->open (local_addr) == -1) {
+ std::cerr << "Cannot open EH" << std::endl;
+ }
+ ACE_AUTO_PTR_RESET(eh,udp_eh.release(),ACE_Event_Handler);
+ //eh.reset(udp_eh.release());
+ }
+
+ // Create an event (just a string in this case).
+ const CORBA::String_var eventData = CORBA::string_dup(ecname);
+
+ // Create an event set for one event
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+
+ // Initialize event header.
+ event[0].header.source = MY_SOURCE_ID;
+ event[0].header.ttl = 1;
+ event[0].header.type = MY_EVENT_TYPE;
+
+ // Initialize data fields in event.
+ event[0].data.any_value <<= eventData;
+
+ if (iorfile != 0) {
+ CORBA::String_var str = orb->object_to_string( ec.in() );
+ std::ofstream iorFile( iorfile );
+ iorFile << str.in() << std::endl;
+ iorFile.close();
+ }
+ std::cout << "Starting main loop" << std::endl;
+
+ const int EVENT_DELAY_MS = 10;
+
+ while (1) {
+ consumer->push (event);
+
+ ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS);
+ orb->run(tv);
+ }
+
+ orb->destroy();
+ return 0;
+ }
+ catch (CORBA::Exception& exc)
+ {
+ std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl;
+ }
+ return 1;
+}