diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-04-24 22:28:54 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-04-24 22:28:54 +0000 |
commit | b466b3aeaa8d8ea67e206d944d25c33a3e5e7418 (patch) | |
tree | 9aa2cc6bf8dd6a05e7c6c3c83bba9edf0d9ec93f /TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp | |
parent | b4b2247467a3cb12c84bf763026b83b8826c0b11 (diff) | |
download | ATCD-b466b3aeaa8d8ea67e206d944d25c33a3e5e7418.tar.gz |
ChangeLogTag:Mon Apr 24 15:24:19 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp b/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp new file mode 100644 index 00000000000..a542bc97ecf --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/MCast/MCast.cpp @@ -0,0 +1,405 @@ +// $Id$ + +#include "Consumer.h" +#include "Supplier.h" +#include "AddrServer.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/EC_Gateway_UDP.h" +#include "ace/Get_Opt.h" + +ACE_RCSID(EC_Examples, MCast, "$Id$") + +const char *udp_mcast_address = + ACE_DEFAULT_MULTICAST_ADDR ":10001"; + +int parse_args (int argc, char *argv[]); + +int +main (int argc, char* argv[]) +{ + // Register the default factory in the Service Configurator. + // If your platform supports static constructors then you can + // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO + // must run on platforms where static constructors do not work well, + // so we have to explicitly invoke this function. + TAO_EC_Default_Factory::init_svcs (); + + // The exception macros are described in: + // + // $ACE_ROOT/docs/exceptions.html + // + // and defined in: + // + // $ACE_ROOT/ace/CORBA_macros.h + // + // If your platform supports native exceptions, and TAO was compiled + // with native exception support then you can simply use try/catch + // and avoid the ACE_TRY_ENV argument. + // Unfortunately many embedded systems cannot use exceptions due to + // the space and time overhead. + // + // This would be OK except for the fact that event channel library + // is compiled with support for platforms without exceptions, so + // some of the classes (the consumers and suppliers), must receive + // the extra CORBA::Environment argument. + // In platforms with native C++ exception support the argument must + // be present, but it can be ignored, and the standard C++ exception + // mechanisms can be used instead. + // Still we are workin on some way to avoid the extra argument + // altogether, at least for platforms that do not require it. + // + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // **************** HERE STARTS THE ORB SETUP + + // Create the ORB, pass the argv list for parsing. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Parse the arguments, you usually want to do this after + // invoking ORB_init() because ORB_init() will remove all the + // -ORB options from the command line. + if (parse_args (argc, argv) == -1) + { + ACE_ERROR ((LM_ERROR, + "Usage: Service [-m udp_mcast_addr]\n")); + return 1; + } + + // This is the standard code to get access to the POA and + // activate it. + // The POA starts in the holding state, if it is not activated + // it will not process any requests. + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************** THAT COMPLETS THE ORB SETUP + + // **************** HERE START THE LOCAL EVENT CHANNEL SETUP + + // This structure is used to define the startup time event + // channel configuration. + // This structure is described in + // + // $TAO_ROOT/docs/ec_options.html + // + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + + // Create the Event Channel implementation class + TAO_EC_Event_Channel ec_impl (attributes); + + // Activate the Event Channel, depending on the configuration + // that may involve creating some threads. + // But it should always be invoked because several internal data + // structures are initialized at that point. + ec_impl.activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // The event channel is activated as any other CORBA servant. + // In this case we use the simple implicit activation with the + // RootPOA + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP + + // **************** HERE STARTS THE FEDERATION SETUP + + // The next step is to setup the multicast gateways. + // There are two gateways involved, one sends the locally + // generated events to the federated peers, the second gateway + // receives multicast traffic and turns it into local events. + + // The sender requires a helper object to select what + // multicast group will carry what traffic, this is the + // so-called 'Address Server'. + // The intention is that advanced applications can use different + // multicast groups for different events, this can exploit + // network interfaces that filter unwanted multicast traffic. + // The helper object is accessed through an IDL interface, so it + // can reside remotely. + // In this example, and in many application, using a fixed + // multicast group is enough, and a local address server is the + // right approach. + + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (udp_mcast_address); + ACE_DEBUG ((LM_DEBUG, + "Multicast address is: %s\n", + udp_mcast_address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + AddrServer as_impl (addr); + RtecUDPAdmin::AddrServer_var address_server = + as_impl._this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + // Now we setup the sender: + TAO_ECG_UDP_Sender sender; + sender.init (event_channel.in (), + address_server.in (), + &endpoint, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Now we connect the sender as a consumer of events, it will + // receive any event from any source and send it to the "right" + // multicast group, as defined by the address server set above: + RtecEventChannelAdmin::ConsumerQOS sub; + sub.is_gateway = 1; + + sub.dependencies.length (2); + sub.dependencies[0].event.header.type = + ACE_ES_EVENT_ANY; // first free event type + sub.dependencies[0].event.header.type = + ACE_ES_EVENT_SOURCE_ANY; // Any source is OK + + sender.open (sub, ACE_TRY_ENV); + ACE_TRY_CHECK; + + // To receive events we need to setup an event handler: + TAO_ECG_UDP_Receiver receiver; + TAO_ECG_Mcast_EH mcast_eh (&receiver); + + // The event handler uses the ORB reactor to wait for multicast + // traffic: + mcast_eh.reactor (orb->orb_core ()->reactor ()); + + // The multicast Event Handler needs to know to what multicast + // groups it should listen to. To do so it becomes an observer + // with the event channel, to determine the list of events + // required by all the local consumer. + // Then it register for the multicast groups that carry those + // events: + int r = mcast_eh.open (event_channel.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + if (r == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open EH %p\n"), 1); + } + + // Big events have to be fragmented when sent using multicast. + // The receiver periodically checks the status of the fragments, + // using the following timer. + // If after <max_expiration_count> iterations the complete + // message has not been received it is dropped: + ACE_Time_Value expire (0, 50000); + const int max_expiration_count = 5; + + // Again the receiver connects to the event channel as a + // supplier of events, using the Observer features to detect + // local consumers and their interests: + receiver.init (event_channel.in (), + &endpoint, + address_server.in (), + orb->orb_core ()->reactor (), + expire, + max_expiration_count, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // The Receiver is also a supplier of events. The exact type of + // events is only known to the application, because it depends + // on the traffic carried by all the multicast groups that the + // different event handlers subscribe to. + // In this example we choose to simply describe our publications + // using wilcards, any event from any source. More advanced + // application could use the Observer features in the event + // channel to update this information (and reduce the number of + // multicast groups that each receive subscribes to). + // In a future version the event channel could perform some of + // those tasks automatically + RtecEventChannelAdmin::SupplierQOS pub; + pub.publications.length (1); + pub.publications[0].event.header.type = ACE_ES_EVENT_ANY; + pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + pub.is_gateway = 1; + + receiver.open (pub, ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE FEDERATION SETUP + + // **************** HERE STARTS THE CLIENT SETUP + + // First let us create a consumer and connect it to the event + // channel + Consumer consumer; + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + event_channel->for_consumers (ACE_TRY_ENV); + ACE_TRY_CHECK; + consumer.connect (consumer_admin.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // And now create a supplier + Supplier supplier; + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + event_channel->for_suppliers (ACE_TRY_ENV); + ACE_TRY_CHECK; + supplier.connect (supplier_admin.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLIENT SETUP + + // **************** HERE STARTS THE EVENT LOOP + + // Wait for events, including incoming multicast data. + // We could also use orb->run(), but that will not let us + // terminate the application in a nice way. + for (int i = 0; i != 1000; ++i) + { + CORBA::Boolean there_is_work = + orb->work_pending (ACE_TRY_ENV); + ACE_TRY_CHECK; + if (there_is_work) + { + // We use a TAO extension. The CORBA mechanism does not + // provide any decent way to control the duration of + // perform_work() or work_pending(), so just calling + // them results in a spin loop. + ACE_Time_Value tv (0, 50000); + orb->perform_work (tv, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_Time_Value tv (0, 100000); + ACE_OS::sleep (tv); + supplier.perform_push (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // **************** THAT COMPLETES THE EVENT LOOP + + // **************** HERE STARTS THE CLEANUP CODE + + // First the easy ones + supplier.disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + consumer.disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Now let us close the Receiver + receiver.close (ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (mcast_eh.close () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Closing MCast event handler\n"), 1); + } + + // And also close the sender of events + sender.close (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // The event channel must be destroyed, so it can release its + // resources, and inform all the clients that are still + // connected that it is going away. + event_channel->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Deactivating the event channel implementation is not strictly + // required, the POA will do it for us, but it is good manners: + { + // Using _this() activates with the default POA, we must gain + // access to that POA to deactivate the object. + // Notice that we 'know' that the default POA for this servant + // is the root POA, but the code is more robust if we don't + // rely on that. + PortableServer::POA_var poa = + ec_impl._default_POA (ACE_TRY_ENV); + ACE_TRY_CHECK; + // Get the Object Id used for the servant.. + PortableServer::ObjectId_var oid = + poa->servant_to_id (&ec_impl, ACE_TRY_ENV); + ACE_TRY_CHECK; + // Deactivate the object + poa->deactivate_object (oid.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // Now we can destroy the POA, the flags mean that we want to + // wait until the POA is really destroyed + poa->destroy (1, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Finally destroy the ORB + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************** THAT COMPLETES THE CLEANUP CODE + + ACE_DEBUG ((LM_DEBUG, + "MCast example terminated\n")); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "m:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'm': + udp_mcast_address = get_opts.optarg; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "[-m udp_mcast_address]" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +// **************************************************************** + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |