summaryrefslogtreecommitdiff
path: root/orbsvcs/tests/EC_MT_Mcast
diff options
context:
space:
mode:
authorparsons <parsons@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2010-08-13 15:47:20 +0000
committerparsons <parsons@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2010-08-13 15:47:20 +0000
commitd86939f8e188671a6226b24e0957d984050c0ed6 (patch)
treee58de7d2e0d317204ee79bd0578e51c93bf58ea8 /orbsvcs/tests/EC_MT_Mcast
parente55d9214407ea472e418f9852cb58dddab0c83f5 (diff)
downloadATCD-d86939f8e188671a6226b24e0957d984050c0ed6.tar.gz
Diffstat (limited to 'orbsvcs/tests/EC_MT_Mcast')
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/AddrServer.cpp46
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/AddrServer.h46
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/Consumer.cpp92
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/Consumer.h47
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/EC_MT_Mcast.mpc6
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/MCast.cpp379
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/Makefile.am67
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/README13
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/Supplier.cpp82
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/Supplier.h47
-rwxr-xr-xorbsvcs/tests/EC_MT_Mcast/run_test.pl64
-rw-r--r--orbsvcs/tests/EC_MT_Mcast/svc.conf5
12 files changed, 894 insertions, 0 deletions
diff --git a/orbsvcs/tests/EC_MT_Mcast/AddrServer.cpp b/orbsvcs/tests/EC_MT_Mcast/AddrServer.cpp
new file mode 100644
index 00000000000..723910ec736
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/AddrServer.cpp
@@ -0,0 +1,46 @@
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#include "AddrServer.h"
+#include "ace/INET_Addr.h"
+
+ACE_RCSID(EC_MT_Mcast,
+ AddrServer,
+ "$Id$")
+
+AddrServer::AddrServer (const ACE_INET_Addr& addr)
+{
+#if defined (ACE_HAS_IPV6)
+ if (addr.get_type() == PF_INET6)
+ {
+ RtecUDPAdmin::UDP_Addr_v6 v6;
+ sockaddr_in6 *in6 =
+ reinterpret_cast<sockaddr_in6 *>(addr.get_addr());
+ ACE_OS::memcpy (v6.ipaddr,&in6->sin6_addr,16);
+ v6.port = addr.get_port_number();
+ this->addr_.v6_addr (v6);
+ return;
+ }
+#endif /* ACE_HAS_IPV6 */
+ RtecUDPAdmin::UDP_Addr v4;
+ v4.ipaddr = addr.get_ip_address ();
+ v4.port = addr.get_port_number ();
+ this->addr_.v4_addr (v4);
+}
+
+void
+AddrServer::get_addr (const RtecEventComm::EventHeader&,
+ RtecUDPAdmin::UDP_Addr& addr)
+{
+ if (this->addr_._d() == RtecUDPAdmin::Rtec_inet6)
+ throw CORBA::DATA_CONVERSION(0, CORBA::COMPLETED_YES);
+ addr = this->addr_.v4_addr();
+}
+
+
+void
+AddrServer::get_address (const RtecEventComm::EventHeader&,
+ RtecUDPAdmin::UDP_Address_out addr)
+{
+ addr = this->addr_;
+}
diff --git a/orbsvcs/tests/EC_MT_Mcast/AddrServer.h b/orbsvcs/tests/EC_MT_Mcast/AddrServer.h
new file mode 100644
index 00000000000..df931b22a8b
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/AddrServer.h
@@ -0,0 +1,46 @@
+// -*- C++ -*-
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#ifndef ADDRSERVER_H
+#define ADDRSERVER_H
+#include /**/ "ace/pre.h"
+
+#include "orbsvcs/RtecUDPAdminS.h"
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+class ACE_INET_Addr;
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+class AddrServer : public POA_RtecUDPAdmin::AddrServer
+{
+ // = TITLE
+ // A simple AddrServer
+ //
+ // = DESCRIPTION
+ // The EC is able to use multiple multicast groups to transmit its
+ // data, the is given control over the mapping between the Event
+ // (type,source) pair and the (ipaddr,port) pair using a
+ // AddrServer.
+ // This class implements a very simple server that simply maps the
+ // <type> component to the <ipaddr> and uses a fixed <port>,
+ // provided at initialization time.
+ //
+public:
+ AddrServer (const ACE_INET_Addr &addr);
+ // Constructor
+
+ // = The RtecUDPAdmin::AddrServer methods
+ virtual void get_addr (const RtecEventComm::EventHeader& header,
+ RtecUDPAdmin::UDP_Addr_out addr);
+
+ virtual void get_address (const RtecEventComm::EventHeader& header,
+ RtecUDPAdmin::UDP_Address_out addr);
+
+private:
+ RtecUDPAdmin::UDP_Address addr_;
+ // The address
+};
+
+#include /**/ "ace/post.h"
+#endif /* ADDRSERVER_H */
diff --git a/orbsvcs/tests/EC_MT_Mcast/Consumer.cpp b/orbsvcs/tests/EC_MT_Mcast/Consumer.cpp
new file mode 100644
index 00000000000..2d48ae0266f
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/Consumer.cpp
@@ -0,0 +1,92 @@
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#include "Consumer.h"
+#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_MT_Mcast,
+ Consumer,
+ "$Id$")
+
+Consumer::Consumer (void)
+ : event_count_ (0)
+{
+}
+
+void
+Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin)
+{
+ this->proxy_ =
+ consumer_admin->obtain_push_supplier ();
+
+ RtecEventComm::PushConsumer_var me =
+ this->_this ();
+
+ // Simple subscription, but usually the helper classes in
+ // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
+ RtecEventChannelAdmin::ConsumerQOS qos;
+ qos.is_gateway = 0;
+
+ qos.dependencies.length (2);
+ RtecEventComm::EventHeader& h0 =
+ qos.dependencies[0].event.header;
+ h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ h0.source = 1; // The disjunction has one element
+
+ RtecEventComm::EventHeader& h1 =
+ qos.dependencies[1].event.header;
+ h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type
+ h1.source = ACE_ES_EVENT_SOURCE_ANY; // Any source is OK
+
+ this->proxy_->connect_push_consumer (me.in (), qos);
+}
+
+void
+Consumer::disconnect (void)
+{
+ try
+ {
+ // Disconnect from the proxy
+ this->proxy_->disconnect_push_supplier ();
+ }
+ catch (const CORBA::Exception&)
+ {
+ // Ignore exceptions
+ }
+ this->proxy_ = RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+
+ // Deactivate this object
+ PortableServer::POA_var poa =
+ this->_default_POA ();
+ // Get the Object Id used for the servant..
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (this);
+ // Deactivate the object
+ poa->deactivate_object (oid.in ());
+}
+
+void
+Consumer::push (const RtecEventComm::EventSet& events)
+{
+ if (events.length () == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Consumer (%P|%t) no events\n"));
+ return;
+ }
+
+ this->event_count_ += events.length ();
+ if (this->event_count_ % 10000 == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Consumer (%P|%t): %d events received\n",
+ this->event_count_));
+ }
+}
+
+void
+Consumer::disconnect_push_consumer (void)
+{
+}
+
diff --git a/orbsvcs/tests/EC_MT_Mcast/Consumer.h b/orbsvcs/tests/EC_MT_Mcast/Consumer.h
new file mode 100644
index 00000000000..00001f37d6c
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/Consumer.h
@@ -0,0 +1,47 @@
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#ifndef CONSUMER_H
+#define CONSUMER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class Consumer : public POA_RtecEventComm::PushConsumer
+{
+ // = TITLE
+ // Simple consumer object
+ //
+ // = DESCRIPTION
+ // This class is a consumer of events.
+ // It simply subscribes to one event type.
+ //
+public:
+ Consumer (void);
+ // Constructor
+
+ void connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin);
+ // Connect to the Event Channel
+
+ void disconnect (void);
+ // Disconnect from the event channel
+
+ // = The RtecEventComm::PushConsumer methods
+
+ virtual void push (const RtecEventComm::EventSet& events);
+ virtual void disconnect_push_consumer (void);
+ // The skeleton methods.
+
+private:
+ CORBA::ULong event_count_;
+ // Keep track of the number of events received.
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy_;
+ // The proxy
+};
+
+#endif /* CONSUMER_H */
diff --git a/orbsvcs/tests/EC_MT_Mcast/EC_MT_Mcast.mpc b/orbsvcs/tests/EC_MT_Mcast/EC_MT_Mcast.mpc
new file mode 100644
index 00000000000..73788961680
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/EC_MT_Mcast.mpc
@@ -0,0 +1,6 @@
+// -*- MPC -*-
+// $Id$
+
+project(EC_MT_Mcast): rteventexe, rtevent_serv, messaging, strategies {
+ exename = MCast
+}
diff --git a/orbsvcs/tests/EC_MT_Mcast/MCast.cpp b/orbsvcs/tests/EC_MT_Mcast/MCast.cpp
new file mode 100644
index 00000000000..dda31a733bb
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/MCast.cpp
@@ -0,0 +1,379 @@
+// $Id$
+// Adapted from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#include "Consumer.h"
+#include "Supplier.h"
+#include "AddrServer.h"
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event/ECG_Mcast_EH.h"
+#include "orbsvcs/Event/ECG_UDP_Sender.h"
+#include "orbsvcs/Event/ECG_UDP_Receiver.h"
+#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/ORB_Core.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID (EC_MT_Mcast,
+ MCast,
+ "$Id$")
+
+const ACE_TCHAR *udp_mcast_address =
+ ACE_TEXT (ACE_DEFAULT_MULTICAST_ADDR) ACE_TEXT(":10001");
+
+static CORBA::ORB_var orb = CORBA::ORB::_nil ();
+static bool terminate_threads = false;
+static const unsigned pool_size = 2;
+static const int data_items = 60000;
+
+void *
+run_orb_within_thread (void *)
+{
+
+ while (! terminate_threads)
+ {
+ try
+ {
+ CORBA::Boolean there_is_work =
+ orb->work_pending ();
+ if (there_is_work)
+ {
+ // We use a TAO extension. The CORBA mechanism does not
+ // provide any decent way to control the duration of
+ // perform_work() or work_pending(), so just calling
+ // them results in a spin loop.
+ ACE_Time_Value tv (0, 50000);
+ orb->perform_work (tv);
+ }
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("perform work");
+
+ return 0;
+ }
+ }
+
+ return 0;
+}
+
+int parse_args (int argc, ACE_TCHAR *argv[]);
+
+int
+ACE_TMAIN(int argc, ACE_TCHAR *argv[])
+{
+ // Register the default factory in the Service Configurator.
+ // If your platform supports static constructors then you can
+ // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO
+ // must run on platforms where static constructors do not work well,
+ // so we have to explicitly invoke this function.
+ TAO_EC_Default_Factory::init_svcs ();
+
+ try
+ {
+ // **************** HERE IS THE ORB SETUP
+
+ // Create the ORB, pass the argv list for parsing.
+ orb = CORBA::ORB_init (argc, argv);
+
+ // Parse the arguments, you usually want to do this after
+ // invoking ORB_init() because ORB_init() will remove all the
+ // -ORB options from the command line.
+ if (parse_args (argc, argv) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Usage: Service [-m udp_mcast_addr]\n"));
+ return 1;
+ }
+
+ // This is the standard code to get access to the POA and
+ // activate it.
+ // The POA starts in the holding state, if it is not activated
+ // it will not process any requests.
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (object.in ());
+ PortableServer::POAManager_var poa_manager =
+ poa->the_POAManager ();
+ poa_manager->activate ();
+
+ // **************** THAT COMPLETES THE ORB SETUP
+
+ // **************** HERE IS THE LOCAL EVENT CHANNEL SETUP
+
+ // This structure is used to define the startup time event
+ // channel configuration.
+ // This structure is described in
+ //
+ // $TAO_ROOT/docs/ec_options.html
+ //
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (),
+ poa.in ());
+
+ // Create the Event Channel implementation class
+ TAO_EC_Event_Channel ec_impl (attributes);
+
+ // Activate the Event Channel, depending on the configuration
+ // that may involve creating some threads.
+ // But it should always be invoked because several internal data
+ // structures are initialized at that point.
+ ec_impl.activate ();
+
+ // The event channel is activated as any other CORBA servant.
+ // In this case we use the simple implicit activation with the
+ // RootPOA
+ RtecEventChannelAdmin::EventChannel_var event_channel =
+ ec_impl._this ();
+
+ // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP
+
+ // **************** HERE IS THE FEDERATION SETUP
+
+ // The next step is to setup the multicast gateways.
+ // There are two gateways involved, one sends the locally
+ // generated events to the federated peers, the second gateway
+ // receives multicast traffic and turns it into local events.
+
+ // The sender requires a helper object to select what
+ // multicast group will carry what traffic, this is the
+ // so-called 'Address Server'.
+ // The intention is that advanced applications can use different
+ // multicast groups for different events, this can exploit
+ // network interfaces that filter unwanted multicast traffic.
+ // The helper object is accessed through an IDL interface, so it
+ // can reside remotely.
+ // In this example, and in many application, using a fixed
+ // multicast group is enough, and a local address server is the
+ // right approach.
+
+ // First we convert the string into an INET address, then we
+ // convert that into the right IDL structure:
+ ACE_INET_Addr udp_addr (ACE_TEXT_ALWAYS_CHAR(udp_mcast_address));
+ ACE_DEBUG ((LM_DEBUG,
+ "Multicast address is: %s\n",
+ udp_mcast_address));
+
+ // Now we create and activate the servant
+ AddrServer as_impl (udp_addr);
+ RtecUDPAdmin::AddrServer_var address_server =
+ as_impl._this ();
+
+ // We need a local socket to send the data, open it and check
+ // that everything is OK:
+ TAO_ECG_UDP_Out_Endpoint* endpointptr = 0;
+
+ ACE_NEW_RETURN (endpointptr, TAO_ECG_UDP_Out_Endpoint, 0);
+
+ TAO_ECG_Refcounted_Endpoint endpoint (endpointptr);
+ if (endpoint->dgram ().open (ACE_Addr::sap_any,
+ udp_addr.get_type()) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"),
+ 1);
+ }
+
+ // Now we setup the sender:
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
+ sender = TAO_ECG_UDP_Sender::create();
+
+ sender->init (event_channel.in (),
+ address_server.in (),
+ endpoint);
+
+ // Now we connect the sender as a consumer of events, it will
+ // receive any event from any source and send it to the "right"
+ // multicast group, as defined by the address server set above:
+ RtecEventChannelAdmin::ConsumerQOS sub;
+ sub.is_gateway = 1;
+
+ sub.dependencies.length (1);
+ sub.dependencies[0].event.header.type =
+ ACE_ES_EVENT_ANY; // first free event type
+ sub.dependencies[0].event.header.source =
+ ACE_ES_EVENT_SOURCE_ANY; // Any source is OK
+
+ sender->connect (sub);
+
+ // To receive events we need to setup an event handler:
+ TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
+ receiver = TAO_ECG_UDP_Receiver::create();
+
+ TAO_ECG_Mcast_EH mcast_eh (&*receiver);
+
+ // The event handler uses the ORB reactor to wait for multicast
+ // traffic:
+ mcast_eh.reactor (orb->orb_core ()->reactor ());
+
+ // The multicast Event Handler needs to know to what multicast
+ // groups it should listen to. To do so it becomes an observer
+ // with the event channel, to determine the list of events
+ // required by all the local consumer.
+ // Then it register for the multicast groups that carry those
+ // events:
+ mcast_eh.open (event_channel.in ());
+
+ // Again the receiver connects to the event channel as a
+ // supplier of events, using the Observer features to detect
+ // local consumers and their interests:
+ receiver->init (event_channel.in (),
+ endpoint,
+ address_server.in ());
+
+ // The Receiver is also a supplier of events. The exact type of
+ // events is only known to the application, because it depends
+ // on the traffic carried by all the multicast groups that the
+ // different event handlers subscribe to.
+ // In this example we choose to simply describe our publications
+ // using wilcards, any event from any source. More advanced
+ // application could use the Observer features in the event
+ // channel to update this information (and reduce the number of
+ // multicast groups that each receive subscribes to).
+ // In a future version the event channel could perform some of
+ // those tasks automatically
+ RtecEventChannelAdmin::SupplierQOS pub;
+ pub.publications.length (1);
+ pub.publications[0].event.header.type = ACE_ES_EVENT_ANY;
+ pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY;
+ pub.is_gateway = 1;
+
+ receiver->connect (pub);
+
+ // **************** THAT COMPLETES THE FEDERATION SETUP
+
+ // **************** HERE IS THE CLIENT SETUP
+
+ // First let us create consumers and connect them to the event
+ // channel
+ Consumer consumer1;
+ Consumer consumer2;
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ event_channel->for_consumers ();
+ consumer1.connect (consumer_admin.in ());
+ consumer2.connect (consumer_admin.in ());
+
+ // And now create a supplier
+ Supplier supplier;
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ event_channel->for_suppliers ();
+ supplier.connect (supplier_admin.in ());
+
+ // **************** THAT COMPLETES THE CLIENT SETUP
+
+ // **************** HERE IS THE EVENT LOOP
+
+ // creating thread pool
+ ACE_Thread_Manager the_ace_manager;
+ the_ace_manager.open ();
+ int thread_pool_id = the_ace_manager.spawn_n (
+ pool_size, ACE_THR_FUNC (run_orb_within_thread), 0, THR_DETACHED | THR_NEW_LWP);
+ if (thread_pool_id == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "Cannot spawn thread pool\n"), 1);
+ }
+ ACE_OS::sleep (1); // simple solution ensures ready thread pool
+
+ for (int i = 0; i < data_items; i++)
+ {
+ supplier.perform_push ();
+ }
+
+ ACE_OS::sleep (2); // simple solution ensures ready receivers
+ terminate_threads = true; // terminate thread pool
+
+ the_ace_manager.wait(); // wait until all threads in the pool are stopped
+
+ the_ace_manager.close ();
+
+ // **************** THAT COMPLETES THE EVENT LOOP
+
+ // **************** HERE IS THE CLEANUP CODE
+
+ // First the easy ones
+ supplier.disconnect ();
+ consumer1.disconnect ();
+ consumer2.disconnect ();
+
+ // Now let us disconnect the Receiver
+ receiver->shutdown ();
+
+ int r = mcast_eh.shutdown ();
+
+ if (r == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Closing MCast event handler\n"), 1);
+ }
+
+ // And also disconnect the sender of events
+ sender->shutdown ();
+
+ // The event channel must be destroyed, so it can release its
+ // resources, and inform all the clients that are still
+ // connected that it is going away.
+ event_channel->destroy ();
+
+ // Deactivating the event channel implementation is not strictly
+ // required, the POA will do it for us, but it is good manners:
+ {
+ // Using _this() activates with the default POA, we must gain
+ // access to that POA to deactivate the object.
+ // Notice that we 'know' that the default POA for this servant
+ // is the root POA, but the code is more robust if we don't
+ // rely on that.
+ PortableServer::POA_var poa =
+ ec_impl._default_POA ();
+ // Get the Object Id used for the servant..
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (&ec_impl);
+ // Deactivate the object
+ poa->deactivate_object (oid.in ());
+ }
+
+ // Now we can destroy the POA, the flags mean that we want to
+ // wait until the POA is really destroyed
+ poa->destroy (1, 1);
+
+ // Finally destroy the ORB
+ orb->destroy ();
+
+ // **************** THAT COMPLETES THE CLEANUP CODE
+
+ ACE_DEBUG ((LM_DEBUG,
+ "MCast example finished\n"));
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Service");
+ return 1;
+ }
+ return 0;
+}
+
+// ****************************************************************
+
+int parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("m:"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'm':
+ udp_mcast_address = get_opts.opt_arg ();
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "[-m udp_mcast_address]"
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
diff --git a/orbsvcs/tests/EC_MT_Mcast/Makefile.am b/orbsvcs/tests/EC_MT_Mcast/Makefile.am
new file mode 100644
index 00000000000..2d4e76e9d6c
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/Makefile.am
@@ -0,0 +1,67 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ../bin/mwc.pl -type automake -noreldefs TAO.mwc
+
+ACE_BUILDDIR = $(top_builddir)/..
+ACE_ROOT = $(top_srcdir)/..
+TAO_BUILDDIR = $(top_builddir)
+TAO_ROOT = $(top_srcdir)
+
+
+## Makefile.EC_MT_Mcast.am
+
+if BUILD_CORBA_MESSAGING
+if BUILD_EXCEPTIONS
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS = MCast
+
+MCast_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+MCast_SOURCES = \
+ AddrServer.cpp \
+ Consumer.cpp \
+ MCast.cpp \
+ Supplier.cpp \
+ AddrServer.h \
+ Consumer.h \
+ Supplier.h
+
+MCast_LDADD = \
+ $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent_Skel.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_RTEvent.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \
+ $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PI.la \
+ $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+endif BUILD_EXCEPTIONS
+endif BUILD_CORBA_MESSAGING
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/orbsvcs/tests/EC_MT_Mcast/README b/orbsvcs/tests/EC_MT_Mcast/README
new file mode 100644
index 00000000000..561d0e39cae
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/README
@@ -0,0 +1,13 @@
+# $Id$
+
+ This directory contains a regression test for a thread safety problem
+ in the TAO_ECG_UDP_Receiver class.
+ The code is a copy of
+ $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+ with only minor modifications.
+ For further details, see the README there.
+
+ The crash will only show up when running multiple MCast processes.
+ Therefore, it is easiest use the run_test.pl script to invoke this
+ test.
+
diff --git a/orbsvcs/tests/EC_MT_Mcast/Supplier.cpp b/orbsvcs/tests/EC_MT_Mcast/Supplier.cpp
new file mode 100644
index 00000000000..ff590115fb7
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/Supplier.cpp
@@ -0,0 +1,82 @@
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#include "Supplier.h"
+#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_MT_Mcast,
+ Supplier,
+ "$Id$")
+
+Supplier::Supplier (void)
+{
+}
+
+void
+Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin)
+{
+ this->proxy_ =
+ supplier_admin->obtain_push_consumer ();
+
+ RtecEventComm::PushSupplier_var me =
+ this->_this ();
+
+ // Simple publication, but usually the helper classes in
+ // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
+ RtecEventChannelAdmin::SupplierQOS qos;
+ qos.is_gateway = 0;
+
+ qos.publications.length (1);
+ RtecEventComm::EventHeader& h0 =
+ qos.publications[0].event.header;
+ h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type
+ h0.source = 1; // first free event source
+
+ this->proxy_->connect_push_supplier (me.in (), qos);
+}
+
+void
+Supplier::disconnect (void)
+{
+ // Disconnect from the EC
+ try
+ {
+ this->proxy_->disconnect_push_consumer ();
+ }
+ catch (const CORBA::Exception&)
+ {
+ }
+
+ PortableServer::POA_var poa =
+ this->_default_POA ();
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this);
+ poa->deactivate_object (id.in ());
+}
+
+void
+Supplier::perform_push (void)
+{
+ try
+ {
+ // The event type and source must match our publications
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.type = ACE_ES_EVENT_UNDEFINED;
+ event[0].header.source = 1;
+ // Avoid loops throught the event channel federations
+ event[0].header.ttl = 1;
+
+ this->proxy_->push (event);
+ }
+ catch (const CORBA::Exception&)
+ {
+ }
+}
+
+void
+Supplier::disconnect_push_supplier (void)
+{
+}
+
diff --git a/orbsvcs/tests/EC_MT_Mcast/Supplier.h b/orbsvcs/tests/EC_MT_Mcast/Supplier.h
new file mode 100644
index 00000000000..dca5f9b3d8b
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/Supplier.h
@@ -0,0 +1,47 @@
+// $Id$
+// Reused from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
+
+#ifndef SUPPLIER_H
+#define SUPPLIER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class Supplier : public POA_RtecEventComm::PushSupplier
+{
+ // = TITLE
+ // Simple supplier object
+ //
+ // = DESCRIPTION
+ // This class is a supplier of events.
+ // It simply publishes one event type, when the perform_push()
+ // method is invoked it pushes the event through the event service
+ //
+public:
+ Supplier (void);
+ // Constructor
+
+ void connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin);
+ // Connect to the event channel
+
+ void disconnect (void);
+ // Disconnect from the event channel
+
+ void perform_push (void);
+ // Push a single event
+
+ // = The RtecEventComm::PushSupplier methods
+
+ virtual void disconnect_push_supplier (void);
+ // The skeleton methods.
+
+private:
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_;
+ // The proxy
+};
+
+#endif /* SUPPLIER_H */
diff --git a/orbsvcs/tests/EC_MT_Mcast/run_test.pl b/orbsvcs/tests/EC_MT_Mcast/run_test.pl
new file mode 100755
index 00000000000..d0a2cf0ad34
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/run_test.pl
@@ -0,0 +1,64 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib "$ENV{ACE_ROOT}/bin";
+use PerlACE::TestTarget;
+
+$status = 0;
+$debug_level = '0';
+
+foreach $i (@ARGV) {
+ if ($i eq '-debug') {
+ $debug_level = '10';
+ }
+}
+
+my $test1 = PerlACE::TestTarget::create_target (1) || die "Create target 1 failed\n";
+my $test2 = PerlACE::TestTarget::create_target (2) || die "Create target 2 failed\n";
+
+my $svc_conf = "svc$PerlACE::svcconf_ext";
+my $test1_svc_conf = $test1->LocalFile ($svc_conf);
+my $test2_svc_conf = $test2->LocalFile ($svc_conf);
+
+my $mcast_address = (int(rand(16)) + 224) . '.' . int(rand(256)) . '.' .
+ int(rand(256)) . '.' . int(rand(256)) . ':' .
+ (10001 + $test1->RandomPort());
+
+# Run two copies of the same test...
+$T1 = $test1->CreateProcess ("MCast", "-m $mcast_address -ORBSvcConf $test1_svc_conf");
+$T2 = $test2->CreateProcess ("MCast", "-m $mcast_address -ORBSvcConf $test2_svc_conf");
+
+$test_status = $T1->Spawn ();
+
+if ($test_status != 0) {
+ print STDERR "ERROR: could not spawn MCast 1, returned $test_status\n";
+ exit 1;
+}
+
+$test_status = $T2->Spawn ();
+
+if ($test_status != 0) {
+ print STDERR "ERROR: could not spawn MCast 2, returned $test_status\n";
+ $T1->Kill ();
+ exit 1;
+}
+
+$test_status = $T1->WaitKill ($test1->ProcessStopWaitInterval() + 285);
+
+if ($test_status != 0) {
+ print STDERR "ERROR: test 1 returned $test_status\n";
+ $status = 1;
+}
+
+$test_status = $T2->WaitKill ($test2->ProcessStopWaitInterval() + 285);
+
+if ($test_status != 0) {
+ print STDERR "ERROR: test 2 returned $test_status\n";
+ $status = 1;
+}
+
+exit $status;
diff --git a/orbsvcs/tests/EC_MT_Mcast/svc.conf b/orbsvcs/tests/EC_MT_Mcast/svc.conf
new file mode 100644
index 00000000000..c6fce32e1bc
--- /dev/null
+++ b/orbsvcs/tests/EC_MT_Mcast/svc.conf
@@ -0,0 +1,5 @@
+# $Id$
+static EC_Factory "-ECObserver basic -ECProxyPushConsumerCollection mt:copy_on_write:list -ECProxyPushSupplierCollection mt:copy_on_write:list -ECDispatching reactive -ECScheduling null -ECFiltering prefix -ECSupplierFiltering per-supplier"
+static Client_Strategy_Factory "-ORBProfileLock thread -ORBClientConnectionHandler MT"
+static Server_Strategy_Factory "-ORBConcurrency reactive -ORBPOALock thread"
+static Advanced_Resource_Factory "-ORBReactorType tp -ORBInputCDRAllocator thread -ORBConnectionCacheLock thread"