// $Id$

// ============================================================================
//
// = LIBRARY
//    tests
//
// = DESCRIPTION
//    This program tests ACE_SOCK_Dgram_Mcast class.
//    It specifically tests subscribing to multiple groups on the same socket
//    on one or more physical interfaces (if available).
//
//    The test can be run as a producer, consumer, or both producer/consumer
//    (default).  The test requires at least two (2) multicast groups which can
//    be configured as command line options.  The consumer subscribes to a
//    single group per instance and an additional instance tries to subscribe
//    to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
//    bind()'s the first address to the socket, additional joins will fail).
//    The producer iterates through the list of group addresses and sends a
//    single message containing the destination address and port to each one.
//    It also sends messages to five (5) additional groups and a message to an
//    additional port for each group in order to produce a bit of "noise" in
//    order to help validate how well the multicast filtering works on a
//    particular platform.
//
//    The list of destination groups start at 239.255.0.1 (default) and
//    increment by 1 up to 5 (default) groups.  Both of these values, as well
//    as others, can be overridden via command-line options.  Use the -?
//    option to display the usage message...
//
// = AUTHOR
//    Don Hinton <dhinton@dresystems.com>
//
// ============================================================================

#include "tests/test_config.h"
#include "ace/Get_Opt.h"
#include "ace/Vector_T.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/ACE.h"
#include "ace/Reactor.h"
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_strings.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Task.h"
#include "ace/Atomic_Op.h"
#include "ace/SString.h"
#include "ace/Signal.h"
#include "ace/Min_Max.h"

ACE_RCSID(tests, Multicast_Test, "$Id$")

#if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)

/*
 *  The 'finished' flag is used to break out of an infninite loop in the
 *  task::svc () method.  The 'handler' will set the flag in respose to
 *  SIGINT (CTRL-C).
 */
static sig_atomic_t finished = 0;
extern "C" void handler (int)
{
  finished = 1;
}

static const int MCT_ITERATIONS = 10;
static const int MCT_GROUPS = 5;
static const int MCT_MIN_GROUPS = 2;

static const char MCT_START_GROUP[] = "239.255.0.1";
static const int  MCT_START_PORT = 16000;

static const size_t MAX_STRING_SIZE = 200;

int advance_addr (ACE_INET_Addr &addr);

// Keep track of errors so we can report them on exit.
static sig_atomic_t error = 0;

/*
 * MCast_Config holds configuration data for this test.
 */
class MCT_Config
{
public:

  enum
    {
      PRODUCER = 1,
      CONSUMER = 2,
      BOTH = PRODUCER | CONSUMER
    };

  MCT_Config (void)
    : group_start_ (MCT_START_PORT, MCT_START_GROUP),
      groups_ (0),
      debug_ (0),
      role_ (BOTH),
      sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
      iterations_ (MCT_ITERATIONS),
      ttl_ (1),
      wait_ (2)
    {
      if (IP_MAX_MEMBERSHIPS == 0)
        this->groups_ = MCT_GROUPS;
      else
        this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
    }
  ~MCT_Config (void)
    {}

  int open (int argc, ACE_TCHAR *argv[]);
  int debug (void) const { return this->debug_;}
  void dump (void) const;
  int groups (void) const { return this->groups_;}
  const ACE_INET_Addr group_start (void) const { return this->group_start_;}
  u_long role (void) const { return this->role_;}
  int iterations (void) const { return this->iterations_;}
  int ttl (void) const { return this->ttl_;}
  int wait (void) const { return this->wait_;}
  ACE_SOCK_Dgram_Mcast::options options (void) const
  {
    return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
  }

private:
  // Starting group address. (only IPv4 capable right now...)
  ACE_INET_Addr group_start_;

  // Number of groups we will try to use in the test.
  int groups_;

  // Debug flag.
  int debug_;

  // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
  u_long role_;

  // ACE_SOCK_Dgram_Mcast ctor options
  u_long sdm_opts_;

  // Producer iterations
  int iterations_;

  // TTL, time to live, for use over routers.
  int ttl_;

  // Time to wait on CONSUMER threads to end before killing test.
  int wait_;
};

int
MCT_Config::open (int argc, ACE_TCHAR *argv[])
{
  int retval = 0;
  int help = 0;

  ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);

  if (getopt.long_option (ACE_TEXT ("GroupStart"),
                          'g',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add GroupStart option.\n")),
                      1);

  if (getopt.long_option (ACE_TEXT ("Groups"),
                          'n',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add Groups option.\n")), 1);

  if (getopt.long_option (ACE_TEXT ("Debug"),
                          'd',
                          ACE_Get_Opt::NO_ARG) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add Debug option.\n")), 1);

  if (getopt.long_option (ACE_TEXT ("Role"),
                          'r',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add Role option.\n")), 1);

  if (getopt.long_option (ACE_TEXT ("SDM_options"),
                          'm',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add Multicast_Options option.\n")),
                      1);

  if (getopt.long_option (ACE_TEXT ("Iterations"),
                          'i',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add iterations option.\n")),
                      1);

  if (getopt.long_option (ACE_TEXT ("TTL"),
                          't',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add TTL option.\n")),
                      1);

  if (getopt.long_option (ACE_TEXT ("Wait"),
                          'w',
                          ACE_Get_Opt::ARG_REQUIRED) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add wait option.\n")),
                      1);

  if (getopt.long_option (ACE_TEXT ("help"),
                          'h',
                          ACE_Get_Opt::NO_ARG) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" Unable to add help option.\n")),
                      1);

  // Now, let's parse it...
  int c = 0;
  while ((c = getopt ()) != EOF)
    {
      switch (c)
        {
        case 0:
          // Long Option. This should never happen.
          retval = -1;
           break;
        case 'g':
          {
            // @todo validate all these, i.e., must be within range
            // 224.255.0.0 to 238.255.255.255, but we only allow the
            // administrative "site local" range, 239.255.0.0 to
            // 239.255.255.255.
            ACE_TCHAR *group = getopt.opt_arg ();
            if (this->group_start_.set (group) != 0)
              {
                ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
                            group));
              }
          }
          break;
        case 'i':
          this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
          break;
        case 'n':
          {
            int n = ACE_OS::atoi (getopt.opt_arg ());
            // I'm assuming 0 means unlimited, so just use whatever the
            // user provides.  Seems to work okay on Solaris 5.8.
            if (IP_MAX_MEMBERSHIPS == 0)
              this->groups_ = n;
            else
              this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
                                       IP_MAX_MEMBERSHIPS);
            break;
          }
        case 'd':
          this->debug_ = 1;
          break;
        case 'r':
          {
            ACE_TCHAR *c = getopt.opt_arg ();
            if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
              this->role_ = CONSUMER;
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
              this->role_ = PRODUCER;
            else
              {
                help = 1;
                retval = -1;
              }
          }
          break;
        case 'm':
          {
            //@todo add back OPT_BINDADDR_NO...
            ACE_TCHAR *c = getopt.opt_arg ();
            if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
              ACE_SET_BITS (this->sdm_opts_,
                            ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
              ACE_CLR_BITS (this->sdm_opts_,
                            ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
              {
                ACE_CLR_BITS (this->sdm_opts_,
                              ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
                ACE_SET_BITS (this->sdm_opts_,
                              ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
              }
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
              ACE_SET_BITS (this->sdm_opts_,
                            ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
              ACE_CLR_BITS (this->sdm_opts_,
                            ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
              {
                ACE_CLR_BITS (this->sdm_opts_,
                              ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
                ACE_SET_BITS (this->sdm_opts_,
                              ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
              }
            else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
              this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
            else
              {
                help = 1;
                retval = -1;
              }
          }
          break;
        case 't':
          this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
          break;
        case 'w':
          this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
          break;
        case ':':
          // This means an option requiring an argument didn't have one.
          ACE_ERROR ((LM_ERROR,
                      ACE_TEXT (" Option '%c' requires an argument but ")
                      ACE_TEXT ("none was supplied\n"),
                      getopt.opt_opt ()));
          help = 1;
          retval = -1;
          break;
        case '?':
        case 'h':
        default:
          if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
              && getopt.opt_opt () != 'h')
            // Don't allow unknown options.
            ACE_ERROR ((LM_ERROR,
                        ACE_TEXT (" Found an unknown option (%c) ")
                        ACE_TEXT ("we couldn't handle.\n"),
                        getopt.opt_opt ()));
                        // getopt.last_option ())); //readd with "%s" when
                        // last_option() is available.
          help = 1;
          retval = -1;
          break;
        }
    }

  if (retval == -1)
    {
      if (help)
        // print usage here
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("usage: %s [options]\n")
                    ACE_TEXT ("Options:\n")
                    ACE_TEXT ("  -g {STRING}  --GroupStart={STRING}  ")
                    ACE_TEXT ("starting multicast group address\n")
                    ACE_TEXT ("                                      ")
                    ACE_TEXT ("(default=239.255.0.1:16000)\n")
                    ACE_TEXT ("  -n {#}       --Groups={#}           ")
                    ACE_TEXT ("number of groups (default=5)\n")
                    ACE_TEXT ("  -d           --Debug                ")
                    ACE_TEXT ("debug flag (default=off)\n")
                    ACE_TEXT ("  -r {STRING}  --Role={STRING}        ")
                    ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
                    ACE_TEXT ("                                      ")
                    ACE_TEXT ("(default=BOTH)\n")
                    ACE_TEXT ("  -m {STRING}  --SDM_options={STRING} ")
                    ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
                    ACE_TEXT ("                                      ")
                    ACE_TEXT ("(default=DEFOPTS)\n")
                    ACE_TEXT ("  -i {#}       --Iterations={#}       ")
                    ACE_TEXT ("number of iterations (default=100)\n")
                    ACE_TEXT ("  -t {#}       --TTL={#}              ")
                    ACE_TEXT ("time to live (default=1)\n")
                    ACE_TEXT ("  -w {#}       --Wait={#}             ")
                    ACE_TEXT ("number of seconds to wait on CONSUMER\n")
                    ACE_TEXT ("                                      ")
                    ACE_TEXT ("(default=2)\n")
                    ACE_TEXT ("  -h/?         --help                 ")
                    ACE_TEXT ("show this message\n"),
                    argv[0]));

      return -1;
    }

  return 0;
}

void
MCT_Config::dump (void) const
{
  ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
              IP_MAX_MEMBERSHIPS));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\tgroups_ = %d\n"),
              this->groups_));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\trole_ = %s\n"),
              (ACE_BIT_ENABLED (this->role_, PRODUCER)
               && ACE_BIT_ENABLED (this->role_, CONSUMER))
                 ? ACE_TEXT ("PRODUCER/CONSUMER")
                 : ACE_BIT_ENABLED (this->role_, PRODUCER)
                     ? ACE_TEXT ("PRODUCER")
                     : ACE_TEXT ("CONSUMER")));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\tsdm_options_ = %d\n"),
              this->sdm_opts_));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\titerations_ = %d\n"),
              this->iterations_));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\tttl_ = %d\n"),
              this->ttl_));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\twait_ = %d\n"),
              this->wait_));
  // Note that this call to get_host_addr is the non-reentrant
  // version, but it's okay for us.
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
              this->group_start_.get_host_addr (),
              this->group_start_.get_port_number ()));

  ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}

/******************************************************************************/

class MCT_Event_Handler : public ACE_Event_Handler
{
public:
  MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
                      = ACE_SOCK_Dgram_Mcast::DEFOPTS);
  virtual ~MCT_Event_Handler (void);

  int join (const ACE_INET_Addr &mcast_addr,
            int reuse_addr = 1,
            const ACE_TCHAR *net_if = 0);
  int leave (const ACE_INET_Addr &mcast_addr,
             const ACE_TCHAR *net_if = 0);

  // = Event Handler hooks.
  virtual int handle_input (ACE_HANDLE handle);
  virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask);

  virtual ACE_HANDLE get_handle (void) const;

protected:
  ACE_SOCK_Dgram_Mcast *mcast (void);
  int find (const char *buf);

private:
  ACE_SOCK_Dgram_Mcast mcast_;

  // List of groups we've joined
  ACE_Vector<ACE_CString*> address_vec_;

  // Flag used to set the 'finished' flag when the last event handler
  // gets removed from the reactor.
  static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
};

ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;

MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
  : mcast_ (options)
{
  // Increment the number of active handlers in the reactor.  Note this isn't
  // really correct, but it should work for our simple example.
  ++MCT_Event_Handler::active_handlers_;
}

MCT_Event_Handler::~MCT_Event_Handler (void)
{
  size_t size = this->address_vec_.size ();
  for (size_t i = 0; i < size; ++i)
    {
      delete this->address_vec_[i];
      this->address_vec_[i] = 0;
    }
  mcast_.close ();
}


ACE_SOCK_Dgram_Mcast *
MCT_Event_Handler::mcast (void)
{
  return &this->mcast_;
}

int
MCT_Event_Handler::find (const char *buf)
{
  size_t size = this->address_vec_.size ();
  size_t i;
  for (i = 0; i < size; ++i)
    {
      if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
        return 0;
    }

  // Not found, so output message we received along with a list of groups
  // we've joined for debugging.
  ACE_CString local;
  for (i = 0; i < size; ++i)
    {
      local += "\t";
      local += this->address_vec_[i]->c_str ();
      local += "\n";
    }
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
              buf, local.c_str ()));

  return -1;
}


int
MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
                         int reuse_addr,
                         const ACE_TCHAR *net_if)
{
  if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("MCT_Event_Handler::join - %p\n"),
                       ACE_TEXT ("Could not join group")),
                      -1);

  char buf[MAX_STRING_SIZE];
  ACE_OS::sprintf (buf, "%s/%d",
                   mcast_addr.get_host_addr (),
                   mcast_addr.get_port_number ());
  ACE_CString *str;
  ACE_NEW_RETURN (str, ACE_CString (buf), -1);
  this->address_vec_.push_back (str);
  return 0;
}

int
MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
                          const ACE_TCHAR *net_if)
{
  if (this->mcast_.leave (mcast_addr, net_if) == 0)
    {
      char buf[MAX_STRING_SIZE];
      size_t size = this->address_vec_.size ();
      for (size_t i = 0; i < size; ++i)
        {
          ACE_OS::sprintf (buf, "%s/%d",
                           mcast_addr.get_host_addr (),
                           mcast_addr.get_port_number ());
          if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
            {
              this->address_vec_[i]->set ("");
              break;
            }
        }
      return 0;
    }
  return -1;
}

int
MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
{
  char buf[MAX_STRING_SIZE];
  ACE_OS::memset (buf, 0, sizeof buf);
  ACE_INET_Addr addr;

  if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
    {
      ++error;
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("MCT_Event_Handler::handle_input - ")
                         ACE_TEXT ("calling recv\n")), -1);
    }

  // Zero length buffer means we are done.
  if (ACE_OS::strlen (buf) == 0)
    return -1;
  else if (this->find (buf) == -1)
    {
      ++error;
      ACE_DEBUG ((LM_ERROR,
                  ACE_TEXT ("MCT_Event_Handler::handle_input - ")
                  ACE_TEXT ("Received dgram for a group we didn't join ")
                  ACE_TEXT ("(%s) \n"),
                  buf));
    }
  return 0;
}

int
MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
                                 ACE_Reactor_Mask /*close_mask*/)
{
  // If this is the last handler, use the finished flag to signal
  // the task to exit.
  if (--MCT_Event_Handler::active_handlers_ == 0)
    finished = 1;

  // The DONT_CALL flag keeps the reactor from calling handle_close ()
  // again, since we commit suicide below.
  this->reactor ()->remove_handler (this,
                                    ACE_Event_Handler::ALL_EVENTS_MASK |
                                    ACE_Event_Handler::DONT_CALL);
  this->reactor (0);
  delete this;
  return 0;
}

ACE_HANDLE
MCT_Event_Handler::get_handle (void) const
{
  return this->mcast_.get_handle ();
}

/******************************************************************************/

/*
 * Our MCT_Task object will be an Active Object if we are running the Consumer
 * side of the test.  open() calls active() which creates a thread and calls
 * the svc() method that calls runs the reactor event loop.
 */
class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
{
public:
  MCT_Task (const MCT_Config &config,
            ACE_Reactor *reactor = ACE_Reactor::instance ());
  ~MCT_Task (void);

  // = Task hooks.
  virtual int open (void *args = 0);
  virtual int svc (void);

private:
  const MCT_Config &config_;
  int iterations_;
};

MCT_Task::MCT_Task (const MCT_Config &config,
                    ACE_Reactor *reactor)
  : config_ (config)
{
  this->reactor (reactor);
}

MCT_Task::~MCT_Task (void)
{}

int
MCT_Task::open (void *)
{
  MCT_Event_Handler *handler;

  ACE_INET_Addr addr = this->config_.group_start ();
  int groups = this->config_.groups ();
  for (int i = 0; i < groups; ++i)
    {
      ACE_NEW_RETURN (handler,
                      MCT_Event_Handler (this->config_.options ()), -1);
      // We subscribe to all groups for the first one and one each for
      // all the others.
      if (i == 0)
        {
          // go ahead and hide the other one since we want our own.
          ACE_INET_Addr addr = this->config_.group_start ();
          for (int j = 0; j < groups; ++j)
            {
              // If OPT_BINDADDR_YES is set, this will fail after the first
              // join, so just break and keep on going, otherwise it's a
              // real error.
              if (j > 0
                  && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
                                      this->config_.options ()))
                break;

              if (handler->join (addr) == -1)
                ACE_ERROR_RETURN ((LM_ERROR,
                                   ACE_TEXT ("MCT_Task::open - join error\n")),
                                  -1);
              advance_addr (addr);
            }
        }
      else
        {
          if (handler->join (addr) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("MCT_Task::open - join error\n")),
                              -1);
        }

      advance_addr (addr);

      if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("MCT_Task::open - cannot register ")
                           ACE_TEXT ("handler\n")),
                          -1);
    }

  if (this->activate (THR_NEW_LWP) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("MCT_TASK:open - activate failed")),
                      -1);
  return 0;
}

int
MCT_Task::svc (void)
{
  // make sure this thread owns the reactor or handle_events () won't do
  // anything.
  this->reactor ()->owner (ACE_Thread::self ());

  // loop and call handle_events...
  while (!finished)
    this->reactor ()->handle_events ();

  return 0;
}

/******************************************************************************/

int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
{

  // Send each message twice, once to the right port, and once to the "wrong"
  // port.  This helps generate noise and lets us see if port filtering is
  // working properly.
  const char *address = addr.get_host_addr ();
  int port = addr.get_port_number ();

  for (int i = 0; i < 2; ++i)
    {
      char buf[MAX_STRING_SIZE];
      if (done)
        buf[0] = 0;
      else
        ACE_OS::sprintf (buf, "%s/%d", address, port);
      //ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sending (%s)\n"), buf));
      if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                           ACE_TEXT ("send_dgram - error calling send on ")
                           ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
      addr.set_port_number (++port);
    }
  return 0;
}

int producer (MCT_Config &config)
{
  int retval = 0;

  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
  ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET);

  // Note that is is IPv4 specific and needs to be changed once
  //
  if (config.ttl () > 1)
    {
      int ttl = config.ttl ();
      if (socket.set_option (IPPROTO_IP,
                             IP_MULTICAST_TTL,
                             (void*) &ttl,
                             sizeof ttl) != 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
                    ACE_TEXT ("= %d\n"),
                    ttl));
      else
        ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
    }

  int iterations = config.iterations ();
  // we add an extra 5 groups for noise.
  int groups = config.groups () + 5;
  for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
    {
      ACE_INET_Addr addr = config.group_start ();
      for (int j = 0; j < groups && !finished; ++j)
        {
          if ((retval += send_dgram (socket, addr,
                                     ((i + 1) == iterations))) == -1)
            ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
          if ((retval += advance_addr (addr)) == -1)
            ACE_ERROR ((LM_ERROR,
                        ACE_TEXT ("Calling advance_addr.\n")));
        }
      // Give the task thread a chance to run.
      ACE_Thread::yield ();
    }
  socket.close ();
  return retval;
}

/*
 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
 * Note that the algorithm is somewhat simplistic, but sufficient for our
 * purpose.
 */
int advance_addr (ACE_INET_Addr &addr)
{
  int a, b, c, d;
  ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
  if (d < 255)
    ++d;
  else if (c < 255)
    {
      d = 1;
      ++c;
    }
  else if (b < 255)
    {
      d = 1;
      c = 0;
      ++b;
    }
  else if (a < 239)
    {
      d = 1;
      c = 0;
      b = 0;
      ++a;
    }
  else
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("advance_addr - Cannot advance multicast ")
                       ACE_TEXT ("group address past %s\n"),
                       addr.get_host_addr ()),
                      -1);

  ACE_TCHAR buf[MAX_STRING_SIZE];
  ACE_OS::sprintf (buf, ACE_TEXT ("%d.%d.%d.%d:%d"),
                   a, b, c, d, addr.get_port_number ());
  addr.set (buf);
  return 0;
}

int
run_main (int argc, ACE_TCHAR *argv[])
{
  int retval = 0;
  MCT_Config config;
  retval = config.open (argc, argv);
  if (retval != 0)
    return 1;

  const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test");
  ACE_TString test = temp;

  u_long role = config.role ();
  if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
      || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
    {
      if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
        test += ACE_TEXT ("-PRODUCER");
      else
        test += ACE_TEXT ("-CONSUMER");
    }

  // Start test only if options are valid.
  ACE_START_TEST (test.c_str ());

  // Register a signal handler to close down application gracefully.
  ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);

  // Dump the configuration info to the log if caller passed debug option.
  if (config.debug ())
    config.dump ();

  ACE_Reactor *reactor = ACE_Reactor::instance ();

  MCT_Task *task = new MCT_Task (config, reactor);

  if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
    {
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
      // Open makes it an active object.
      retval += task->open ();
    }

  // now produce the datagrams...
  if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
    retval += producer (config);

  if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
    {
      // and wait for everything to finish
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("start waiting for consumer to finish...\n")));
      // Wait for the threads to exit.
      // But, wait for a limited time since we could hang if the last udp
      // message isn't received.
      ACE_Time_Value max_wait ( config.wait ()/* seconds */);
      ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
      ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
                                ? &wait_time : 0;
      if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
        {
          // We will no longer wait for this thread, so we must
          // force it to exit otherwise the thread will be referencing
          // deleted memory.
          finished = 1;
          reactor->end_reactor_event_loop ();

          if (errno == ETIME)
            ACE_ERROR ((LM_ERROR,
                        ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
                        max_wait.msec ()));
          else
            ACE_OS::perror (ACE_TEXT ("wait"));

          ++error;
        }
    }

  delete task;
  ACE_END_TEST;
  return (retval == 0 && error == 0) ? 0 : 1;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Vector<ACE_CString *>;
template class ACE_Array_Base<ACE_String_Base<char> *>;
template class ACE_Array<ACE_String_Base<char> *>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Vector<ACE_CString *>
#pragma instantiate ACE_Array_Base<ACE_String_Base<char> *>
#pragma instantiate ACE_Array<ACE_String_Base<char> *>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#else
int
run_main (int, ACE_TCHAR *[])
{
  ACE_START_TEST (ACE_TEXT ("Multicast_Test"));

  ACE_ERROR ((LM_INFO,
              ACE_TEXT ("This test must be run on a platform ")
              ACE_TEXT ("that support IP multicast.\n")));

  ACE_END_TEST;
  return 1;
}
#endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */