summaryrefslogtreecommitdiff
path: root/tests/Multicast_Test.cpp
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:11 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:11 +0000
commit6b846cf03c0bcbd8c276cb0af61a181e5f98eaae (patch)
treeda50d054f9c761c3f6a5923f6979e93306c56d68 /tests/Multicast_Test.cpp
parent0e555b9150d38e3b3473ba325b56db2642e6352b (diff)
downloadATCD-6b846cf03c0bcbd8c276cb0af61a181e5f98eaae.tar.gz
Repo restructuring
Diffstat (limited to 'tests/Multicast_Test.cpp')
-rw-r--r--tests/Multicast_Test.cpp944
1 files changed, 0 insertions, 944 deletions
diff --git a/tests/Multicast_Test.cpp b/tests/Multicast_Test.cpp
deleted file mode 100644
index 8613c1a498b..00000000000
--- a/tests/Multicast_Test.cpp
+++ /dev/null
@@ -1,944 +0,0 @@
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// tests
-//
-// = DESCRIPTION
-// This program tests ACE_SOCK_Dgram_Mcast class.
-// It specifically tests subscribing to multiple groups on the same socket
-// on one or more physical interfaces (if available).
-//
-// The test can be run as a producer, consumer, or both producer/consumer
-// (default). The test requires at least two (2) multicast groups which can
-// be configured as command line options. The consumer subscribes to a
-// single group per instance and an additional instance tries to subscribe
-// to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
-// bind()'s the first address to the socket, additional joins will fail).
-// The producer iterates through the list of group addresses and sends a
-// single message containing the destination address and port to each one.
-// It also sends messages to five (5) additional groups and a message to an
-// additional port for each group in order to produce a bit of "noise" in
-// order to help validate how well the multicast filtering works on a
-// particular platform.
-//
-// The list of destination groups start at 239.255.0.1 (default) and
-// increment by 1 up to 5 (default) groups. Both of these values, as well
-// as others, can be overridden via command-line options. Use the -?
-// option to display the usage message...
-//
-// = AUTHOR
-// Don Hinton <dhinton@dresystems.com>
-//
-// ============================================================================
-
-#include "tests/test_config.h"
-#include "ace/Get_Opt.h"
-#include "ace/Vector_T.h"
-#include "ace/SOCK_Dgram_Mcast.h"
-#include "ace/ACE.h"
-#include "ace/Reactor.h"
-#include "ace/OS_NS_stdio.h"
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_strings.h"
-#include "ace/OS_NS_sys_time.h"
-#include "ace/Task.h"
-#include "ace/Atomic_Op.h"
-#include "ace/SString.h"
-#include "ace/Signal.h"
-#include "ace/Min_Max.h"
-
-ACE_RCSID(tests, Multicast_Test, "$Id$")
-
-#if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
-
-/*
- * The 'finished' flag is used to break out of an infninite loop in the
- * task::svc () method. The 'handler' will set the flag in respose to
- * SIGINT (CTRL-C).
- */
-static sig_atomic_t finished = 0;
-extern "C" void handler (int)
-{
- finished = 1;
-}
-
-static const int MCT_ITERATIONS = 10;
-static const int MCT_GROUPS = 5;
-static const int MCT_MIN_GROUPS = 2;
-
-static const char MCT_START_GROUP[] = "239.255.0.1";
-static const int MCT_START_PORT = 16000;
-
-static const size_t MAX_STRING_SIZE = 200;
-
-int advance_addr (ACE_INET_Addr &addr);
-
-// Keep track of errors so we can report them on exit.
-static sig_atomic_t error = 0;
-
-/*
- * MCast_Config holds configuration data for this test.
- */
-class MCT_Config
-{
-public:
-
- enum
- {
- PRODUCER = 1,
- CONSUMER = 2,
- BOTH = PRODUCER | CONSUMER
- };
-
- MCT_Config (void)
- : group_start_ (MCT_START_PORT, MCT_START_GROUP),
- groups_ (0),
- debug_ (0),
- role_ (BOTH),
- sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
- iterations_ (MCT_ITERATIONS),
- ttl_ (1),
- wait_ (2)
- {
- if (IP_MAX_MEMBERSHIPS == 0)
- this->groups_ = MCT_GROUPS;
- else
- this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
- }
- ~MCT_Config (void)
- {}
-
- int open (int argc, ACE_TCHAR *argv[]);
- int debug (void) const { return this->debug_;}
- void dump (void) const;
- int groups (void) const { return this->groups_;}
- const ACE_INET_Addr group_start (void) const { return this->group_start_;}
- u_long role (void) const { return this->role_;}
- int iterations (void) const { return this->iterations_;}
- int ttl (void) const { return this->ttl_;}
- int wait (void) const { return this->wait_;}
- ACE_SOCK_Dgram_Mcast::options options (void) const
- {
- return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
- }
-
-private:
- // Starting group address. (only IPv4 capable right now...)
- ACE_INET_Addr group_start_;
-
- // Number of groups we will try to use in the test.
- int groups_;
-
- // Debug flag.
- int debug_;
-
- // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
- u_long role_;
-
- // ACE_SOCK_Dgram_Mcast ctor options
- u_long sdm_opts_;
-
- // Producer iterations
- int iterations_;
-
- // TTL, time to live, for use over routers.
- int ttl_;
-
- // Time to wait on CONSUMER threads to end before killing test.
- int wait_;
-};
-
-int
-MCT_Config::open (int argc, ACE_TCHAR *argv[])
-{
- int retval = 0;
- int help = 0;
-
- ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);
-
- if (getopt.long_option (ACE_TEXT ("GroupStart"),
- 'g',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add GroupStart option.\n")),
- 1);
-
- if (getopt.long_option (ACE_TEXT ("Groups"),
- 'n',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add Groups option.\n")), 1);
-
- if (getopt.long_option (ACE_TEXT ("Debug"),
- 'd',
- ACE_Get_Opt::NO_ARG) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add Debug option.\n")), 1);
-
- if (getopt.long_option (ACE_TEXT ("Role"),
- 'r',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add Role option.\n")), 1);
-
- if (getopt.long_option (ACE_TEXT ("SDM_options"),
- 'm',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add Multicast_Options option.\n")),
- 1);
-
- if (getopt.long_option (ACE_TEXT ("Iterations"),
- 'i',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add iterations option.\n")),
- 1);
-
- if (getopt.long_option (ACE_TEXT ("TTL"),
- 't',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add TTL option.\n")),
- 1);
-
- if (getopt.long_option (ACE_TEXT ("Wait"),
- 'w',
- ACE_Get_Opt::ARG_REQUIRED) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add wait option.\n")),
- 1);
-
- if (getopt.long_option (ACE_TEXT ("help"),
- 'h',
- ACE_Get_Opt::NO_ARG) != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT (" Unable to add help option.\n")),
- 1);
-
- // Now, let's parse it...
- int c = 0;
- while ((c = getopt ()) != EOF)
- {
- switch (c)
- {
- case 0:
- // Long Option. This should never happen.
- retval = -1;
- break;
- case 'g':
- {
- // @todo validate all these, i.e., must be within range
- // 224.255.0.0 to 238.255.255.255, but we only allow the
- // administrative "site local" range, 239.255.0.0 to
- // 239.255.255.255.
- ACE_TCHAR *group = getopt.opt_arg ();
- if (this->group_start_.set (group) != 0)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
- group));
- }
- }
- break;
- case 'i':
- this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
- break;
- case 'n':
- {
- int n = ACE_OS::atoi (getopt.opt_arg ());
- // I'm assuming 0 means unlimited, so just use whatever the
- // user provides. Seems to work okay on Solaris 5.8.
- if (IP_MAX_MEMBERSHIPS == 0)
- this->groups_ = n;
- else
- this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
- IP_MAX_MEMBERSHIPS);
- break;
- }
- case 'd':
- this->debug_ = 1;
- break;
- case 'r':
- {
- ACE_TCHAR *c = getopt.opt_arg ();
- if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
- this->role_ = CONSUMER;
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
- this->role_ = PRODUCER;
- else
- {
- help = 1;
- retval = -1;
- }
- }
- break;
- case 'm':
- {
- //@todo add back OPT_BINDADDR_NO...
- ACE_TCHAR *c = getopt.opt_arg ();
- if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
- ACE_SET_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
- ACE_CLR_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
- {
- ACE_CLR_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
- ACE_SET_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
- }
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
- ACE_SET_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
- ACE_CLR_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
- {
- ACE_CLR_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
- ACE_SET_BITS (this->sdm_opts_,
- ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
- }
- else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
- this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
- else
- {
- help = 1;
- retval = -1;
- }
- }
- break;
- case 't':
- this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
- break;
- case 'w':
- this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
- break;
- case ':':
- // This means an option requiring an argument didn't have one.
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT (" Option '%c' requires an argument but ")
- ACE_TEXT ("none was supplied\n"),
- getopt.opt_opt ()));
- help = 1;
- retval = -1;
- break;
- case '?':
- case 'h':
- default:
- if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
- && getopt.opt_opt () != 'h')
- // Don't allow unknown options.
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT (" Found an unknown option (%c) ")
- ACE_TEXT ("we couldn't handle.\n"),
- getopt.opt_opt ()));
- // getopt.last_option ())); //readd with "%s" when
- // last_option() is available.
- help = 1;
- retval = -1;
- break;
- }
- }
-
- if (retval == -1)
- {
- if (help)
- // print usage here
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("usage: %s [options]\n")
- ACE_TEXT ("Options:\n")
- ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
- ACE_TEXT ("starting multicast group address\n")
- ACE_TEXT (" ")
- ACE_TEXT ("(default=239.255.0.1:16000)\n")
- ACE_TEXT (" -n {#} --Groups={#} ")
- ACE_TEXT ("number of groups (default=5)\n")
- ACE_TEXT (" -d --Debug ")
- ACE_TEXT ("debug flag (default=off)\n")
- ACE_TEXT (" -r {STRING} --Role={STRING} ")
- ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
- ACE_TEXT (" ")
- ACE_TEXT ("(default=BOTH)\n")
- ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
- ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
- ACE_TEXT (" ")
- ACE_TEXT ("(default=DEFOPTS)\n")
- ACE_TEXT (" -i {#} --Iterations={#} ")
- ACE_TEXT ("number of iterations (default=100)\n")
- ACE_TEXT (" -t {#} --TTL={#} ")
- ACE_TEXT ("time to live (default=1)\n")
- ACE_TEXT (" -w {#} --Wait={#} ")
- ACE_TEXT ("number of seconds to wait on CONSUMER\n")
- ACE_TEXT (" ")
- ACE_TEXT ("(default=2)\n")
- ACE_TEXT (" -h/? --help ")
- ACE_TEXT ("show this message\n"),
- argv[0]));
-
- return -1;
- }
-
- return 0;
-}
-
-void
-MCT_Config::dump (void) const
-{
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
- IP_MAX_MEMBERSHIPS));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\tgroups_ = %d\n"),
- this->groups_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\trole_ = %s\n"),
- (ACE_BIT_ENABLED (this->role_, PRODUCER)
- && ACE_BIT_ENABLED (this->role_, CONSUMER))
- ? ACE_TEXT ("PRODUCER/CONSUMER")
- : ACE_BIT_ENABLED (this->role_, PRODUCER)
- ? ACE_TEXT ("PRODUCER")
- : ACE_TEXT ("CONSUMER")));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\tsdm_options_ = %d\n"),
- this->sdm_opts_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\titerations_ = %d\n"),
- this->iterations_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\tttl_ = %d\n"),
- this->ttl_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\twait_ = %d\n"),
- this->wait_));
- // Note that this call to get_host_addr is the non-reentrant
- // version, but it's okay for us.
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
- this->group_start_.get_host_addr (),
- this->group_start_.get_port_number ()));
-
- ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
-}
-
-/******************************************************************************/
-
-class MCT_Event_Handler : public ACE_Event_Handler
-{
-public:
- MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
- = ACE_SOCK_Dgram_Mcast::DEFOPTS);
- virtual ~MCT_Event_Handler (void);
-
- int join (const ACE_INET_Addr &mcast_addr,
- int reuse_addr = 1,
- const ACE_TCHAR *net_if = 0);
- int leave (const ACE_INET_Addr &mcast_addr,
- const ACE_TCHAR *net_if = 0);
-
- // = Event Handler hooks.
- virtual int handle_input (ACE_HANDLE handle);
- virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask);
-
- virtual ACE_HANDLE get_handle (void) const;
-
-protected:
- ACE_SOCK_Dgram_Mcast *mcast (void);
- int find (const char *buf);
-
-private:
- ACE_SOCK_Dgram_Mcast mcast_;
-
- // List of groups we've joined
- ACE_Vector<ACE_CString*> address_vec_;
-
- // Flag used to set the 'finished' flag when the last event handler
- // gets removed from the reactor.
- static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
-};
-
-ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;
-
-MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
- : mcast_ (options)
-{
- // Increment the number of active handlers in the reactor. Note this isn't
- // really correct, but it should work for our simple example.
- ++MCT_Event_Handler::active_handlers_;
-}
-
-MCT_Event_Handler::~MCT_Event_Handler (void)
-{
- size_t size = this->address_vec_.size ();
- for (size_t i = 0; i < size; ++i)
- {
- delete this->address_vec_[i];
- this->address_vec_[i] = 0;
- }
- mcast_.close ();
-}
-
-
-ACE_SOCK_Dgram_Mcast *
-MCT_Event_Handler::mcast (void)
-{
- return &this->mcast_;
-}
-
-int
-MCT_Event_Handler::find (const char *buf)
-{
- size_t size = this->address_vec_.size ();
- size_t i;
- for (i = 0; i < size; ++i)
- {
- if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
- return 0;
- }
-
- // Not found, so output message we received along with a list of groups
- // we've joined for debugging.
- ACE_CString local;
- for (i = 0; i < size; ++i)
- {
- local += "\t";
- local += this->address_vec_[i]->c_str ();
- local += "\n";
- }
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
- buf, local.c_str ()));
-
- return -1;
-}
-
-
-int
-MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
- int reuse_addr,
- const ACE_TCHAR *net_if)
-{
- if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("MCT_Event_Handler::join - %p\n"),
- ACE_TEXT ("Could not join group")),
- -1);
-
- char buf[MAX_STRING_SIZE];
- ACE_OS::sprintf (buf, "%s/%d",
- mcast_addr.get_host_addr (),
- mcast_addr.get_port_number ());
- ACE_CString *str;
- ACE_NEW_RETURN (str, ACE_CString (buf), -1);
- this->address_vec_.push_back (str);
- return 0;
-}
-
-int
-MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
- const ACE_TCHAR *net_if)
-{
- if (this->mcast_.leave (mcast_addr, net_if) == 0)
- {
- char buf[MAX_STRING_SIZE];
- size_t size = this->address_vec_.size ();
- for (size_t i = 0; i < size; ++i)
- {
- ACE_OS::sprintf (buf, "%s/%d",
- mcast_addr.get_host_addr (),
- mcast_addr.get_port_number ());
- if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
- {
- this->address_vec_[i]->set ("");
- break;
- }
- }
- return 0;
- }
- return -1;
-}
-
-int
-MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
-{
- char buf[MAX_STRING_SIZE];
- ACE_OS::memset (buf, 0, sizeof buf);
- ACE_INET_Addr addr;
-
- if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
- {
- ++error;
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("MCT_Event_Handler::handle_input - ")
- ACE_TEXT ("calling recv\n")), -1);
- }
-
- // Zero length buffer means we are done.
- if (ACE_OS::strlen (buf) == 0)
- return -1;
- else if (this->find (buf) == -1)
- {
- ++error;
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("MCT_Event_Handler::handle_input - ")
- ACE_TEXT ("Received dgram for a group we didn't join ")
- ACE_TEXT ("(%s) \n"),
- buf));
- }
- return 0;
-}
-
-int
-MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
- ACE_Reactor_Mask /*close_mask*/)
-{
- // If this is the last handler, use the finished flag to signal
- // the task to exit.
- if (--MCT_Event_Handler::active_handlers_ == 0)
- finished = 1;
-
- // The DONT_CALL flag keeps the reactor from calling handle_close ()
- // again, since we commit suicide below.
- this->reactor ()->remove_handler (this,
- ACE_Event_Handler::ALL_EVENTS_MASK |
- ACE_Event_Handler::DONT_CALL);
- this->reactor (0);
- delete this;
- return 0;
-}
-
-ACE_HANDLE
-MCT_Event_Handler::get_handle (void) const
-{
- return this->mcast_.get_handle ();
-}
-
-/******************************************************************************/
-
-/*
- * Our MCT_Task object will be an Active Object if we are running the Consumer
- * side of the test. open() calls active() which creates a thread and calls
- * the svc() method that calls runs the reactor event loop.
- */
-class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
-{
-public:
- MCT_Task (const MCT_Config &config,
- ACE_Reactor *reactor = ACE_Reactor::instance ());
- ~MCT_Task (void);
-
- // = Task hooks.
- virtual int open (void *args = 0);
- virtual int svc (void);
-
-private:
- const MCT_Config &config_;
- int iterations_;
-};
-
-MCT_Task::MCT_Task (const MCT_Config &config,
- ACE_Reactor *reactor)
- : config_ (config)
-{
- this->reactor (reactor);
-}
-
-MCT_Task::~MCT_Task (void)
-{}
-
-int
-MCT_Task::open (void *)
-{
- MCT_Event_Handler *handler;
-
- ACE_INET_Addr addr = this->config_.group_start ();
- int groups = this->config_.groups ();
- for (int i = 0; i < groups; ++i)
- {
- ACE_NEW_RETURN (handler,
- MCT_Event_Handler (this->config_.options ()), -1);
- // We subscribe to all groups for the first one and one each for
- // all the others.
- if (i == 0)
- {
- // go ahead and hide the other one since we want our own.
- ACE_INET_Addr addr = this->config_.group_start ();
- for (int j = 0; j < groups; ++j)
- {
- // If OPT_BINDADDR_YES is set, this will fail after the first
- // join, so just break and keep on going, otherwise it's a
- // real error.
- if (j > 0
- && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
- this->config_.options ()))
- break;
-
- if (handler->join (addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("MCT_Task::open - join error\n")),
- -1);
- advance_addr (addr);
- }
- }
- else
- {
- if (handler->join (addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("MCT_Task::open - join error\n")),
- -1);
- }
-
- advance_addr (addr);
-
- if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("MCT_Task::open - cannot register ")
- ACE_TEXT ("handler\n")),
- -1);
- }
-
- if (this->activate (THR_NEW_LWP) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("MCT_TASK:open - activate failed")),
- -1);
- return 0;
-}
-
-int
-MCT_Task::svc (void)
-{
- // make sure this thread owns the reactor or handle_events () won't do
- // anything.
- this->reactor ()->owner (ACE_Thread::self ());
-
- // loop and call handle_events...
- while (!finished)
- this->reactor ()->handle_events ();
-
- return 0;
-}
-
-/******************************************************************************/
-
-int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
-{
-
- // Send each message twice, once to the right port, and once to the "wrong"
- // port. This helps generate noise and lets us see if port filtering is
- // working properly.
- const char *address = addr.get_host_addr ();
- int port = addr.get_port_number ();
-
- for (int i = 0; i < 2; ++i)
- {
- char buf[MAX_STRING_SIZE];
- if (done)
- buf[0] = 0;
- else
- ACE_OS::sprintf (buf, "%s/%d", address, port);
- //ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sending (%s)\n"), buf));
- if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
- ACE_TEXT ("send_dgram - error calling send on ")
- ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
- addr.set_port_number (++port);
- }
- return 0;
-}
-
-int producer (MCT_Config &config)
-{
- int retval = 0;
-
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
- ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET);
-
- // Note that is is IPv4 specific and needs to be changed once
- //
- if (config.ttl () > 1)
- {
- int ttl = config.ttl ();
- if (socket.set_option (IPPROTO_IP,
- IP_MULTICAST_TTL,
- (void*) &ttl,
- sizeof ttl) != 0)
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
- ACE_TEXT ("= %d\n"),
- ttl));
- else
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
- }
-
- int iterations = config.iterations ();
- // we add an extra 5 groups for noise.
- int groups = config.groups () + 5;
- for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
- {
- ACE_INET_Addr addr = config.group_start ();
- for (int j = 0; j < groups && !finished; ++j)
- {
- if ((retval += send_dgram (socket, addr,
- ((i + 1) == iterations))) == -1)
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
- if ((retval += advance_addr (addr)) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("Calling advance_addr.\n")));
- }
- // Give the task thread a chance to run.
- ACE_Thread::yield ();
- }
- socket.close ();
- return retval;
-}
-
-/*
- * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
- * Note that the algorithm is somewhat simplistic, but sufficient for our
- * purpose.
- */
-int advance_addr (ACE_INET_Addr &addr)
-{
- int a, b, c, d;
- ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
- if (d < 255)
- ++d;
- else if (c < 255)
- {
- d = 1;
- ++c;
- }
- else if (b < 255)
- {
- d = 1;
- c = 0;
- ++b;
- }
- else if (a < 239)
- {
- d = 1;
- c = 0;
- b = 0;
- ++a;
- }
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("advance_addr - Cannot advance multicast ")
- ACE_TEXT ("group address past %s\n"),
- addr.get_host_addr ()),
- -1);
-
- ACE_TCHAR buf[MAX_STRING_SIZE];
- ACE_OS::sprintf (buf, ACE_TEXT ("%d.%d.%d.%d:%d"),
- a, b, c, d, addr.get_port_number ());
- addr.set (buf);
- return 0;
-}
-
-int
-run_main (int argc, ACE_TCHAR *argv[])
-{
- int retval = 0;
- MCT_Config config;
- retval = config.open (argc, argv);
- if (retval != 0)
- return 1;
-
- const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test");
- ACE_TString test = temp;
-
- u_long role = config.role ();
- if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
- || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
- {
- if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
- test += ACE_TEXT ("-PRODUCER");
- else
- test += ACE_TEXT ("-CONSUMER");
- }
-
- // Start test only if options are valid.
- ACE_START_TEST (test.c_str ());
-
- // Register a signal handler to close down application gracefully.
- ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
-
- // Dump the configuration info to the log if caller passed debug option.
- if (config.debug ())
- config.dump ();
-
- ACE_Reactor *reactor = ACE_Reactor::instance ();
-
- MCT_Task *task = new MCT_Task (config, reactor);
-
- if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
- {
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
- // Open makes it an active object.
- retval += task->open ();
- }
-
- // now produce the datagrams...
- if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
- retval += producer (config);
-
- if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
- {
- // and wait for everything to finish
- ACE_DEBUG ((LM_INFO,
- ACE_TEXT ("start waiting for consumer to finish...\n")));
- // Wait for the threads to exit.
- // But, wait for a limited time since we could hang if the last udp
- // message isn't received.
- ACE_Time_Value max_wait ( config.wait ()/* seconds */);
- ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
- ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
- ? &wait_time : 0;
- if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
- {
- // We will no longer wait for this thread, so we must
- // force it to exit otherwise the thread will be referencing
- // deleted memory.
- finished = 1;
- reactor->end_reactor_event_loop ();
-
- if (errno == ETIME)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
- max_wait.msec ()));
- else
- ACE_OS::perror (ACE_TEXT ("wait"));
-
- ++error;
-
- // This should exit now that we ended the reactor loop.
- task->wait ();
- }
- }
-
- delete task;
- ACE_END_TEST;
- return (retval == 0 && error == 0) ? 0 : 1;
-}
-
-#else
-int
-run_main (int, ACE_TCHAR *[])
-{
- ACE_START_TEST (ACE_TEXT ("Multicast_Test"));
-
- ACE_ERROR ((LM_INFO,
- ACE_TEXT ("This test must be run on a platform ")
- ACE_TEXT ("that support IP multicast.\n")));
-
- ACE_END_TEST;
- return 1;
-}
-#endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */