summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Multicast
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Reactor/Multicast')
-rw-r--r--ACE/examples/Reactor/Multicast/.cvsignore4
-rw-r--r--ACE/examples/Reactor/Multicast/Log_Wrapper.cpp81
-rw-r--r--ACE/examples/Reactor/Multicast/Log_Wrapper.h68
-rw-r--r--ACE/examples/Reactor/Multicast/Makefile.am50
-rw-r--r--ACE/examples/Reactor/Multicast/README15
-rw-r--r--ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc17
-rw-r--r--ACE/examples/Reactor/Multicast/client.cpp126
-rw-r--r--ACE/examples/Reactor/Multicast/server.cpp247
8 files changed, 608 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Multicast/.cvsignore b/ACE/examples/Reactor/Multicast/.cvsignore
new file mode 100644
index 00000000000..955ffdc75d5
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/.cvsignore
@@ -0,0 +1,4 @@
+client
+client
+server
+server
diff --git a/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp b/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp
new file mode 100644
index 00000000000..055b57b9975
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp
@@ -0,0 +1,81 @@
+// $Id$
+
+// client.C
+
+#include "Log_Wrapper.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_utsname.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_netdb.h"
+
+ACE_RCSID(Multicast, Log_Wrapper, "$Id$")
+
+Log_Wrapper::Log_Wrapper (void)
+{
+ sequence_number_ = 0;
+ this->log_msg_.app_id = ACE_OS::getpid ();
+}
+
+Log_Wrapper::~Log_Wrapper (void)
+{
+}
+
+// Set the log_msg_ host address.
+
+int
+Log_Wrapper::open (const int port, const char *mcast_addr)
+{
+ struct hostent *host_info;
+ ACE_utsname host_data;
+
+ if (ACE_OS::uname (&host_data) < 0)
+ return -1;
+
+#if defined (ACE_LACKS_UTSNAME_T)
+ if ((host_info = ACE_OS::gethostbyname
+ (ACE_TEXT_ALWAYS_CHAR(host_data.nodename))) == NULL)
+#else
+ if ((host_info = ACE_OS::gethostbyname (host_data.nodename)) == NULL)
+#endif
+ return -1;
+ else
+ ACE_OS::memcpy ((char *) &this->log_msg_.host,
+ (char *) host_info->h_addr,
+ host_info->h_length);
+
+ // This starts out initialized to all zeros!
+ server_ = ACE_INET_Addr (port, mcast_addr);
+
+ if (logger_.subscribe (server_) == -1)
+ perror("can't subscribe to multicast group"), exit(1);
+
+ // success.
+ return 0;
+}
+
+// Send the message to a logger object.
+// This wrapper fills in all the log_record info for you.
+// uses iovector stuff to make contiguous header and message.
+
+int
+Log_Wrapper::log_message (Log_Priority type, char *message)
+{
+ sequence_number_++;
+
+ this->log_msg_.type = type;
+ this->log_msg_.time = time (0);
+ this->log_msg_.msg_length = ACE_OS::strlen(message)+1;
+ this->log_msg_.sequence_number = htonl(sequence_number_);
+
+ iovec iovp[2];
+ iovp[0].iov_base = reinterpret_cast<char*> (&log_msg_);
+ iovp[0].iov_len = sizeof (log_msg_);
+ iovp[1].iov_base = message;
+ iovp[1].iov_len = log_msg_.msg_length;
+
+ logger_.send (iovp, 2);
+
+ // success.
+ return 0;
+}
+
diff --git a/ACE/examples/Reactor/Multicast/Log_Wrapper.h b/ACE/examples/Reactor/Multicast/Log_Wrapper.h
new file mode 100644
index 00000000000..10458f706bc
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/Log_Wrapper.h
@@ -0,0 +1,68 @@
+/* -*- C++ -*- */
+// $Id$
+
+// log_wrapper.h
+
+#include "ace/Profile_Timer.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Dgram_Mcast.h"
+
+#ifndef _LOG_WRAPPER_H
+#define _LOG_WRAPPER_H
+
+class Log_Wrapper
+ // = TITLE
+ // Provide a wrapper around sending log messages via IP
+ // multicast.
+{
+public:
+ Log_Wrapper (void);
+ ~Log_Wrapper (void);
+
+ // = Types of logging messages.
+ enum Log_Priority
+ {
+ LM_MESSAGE,
+ LM_DEBUG,
+ LM_WARNING,
+ LM_ERROR,
+ LM_EMERG
+ };
+
+ int open (const int port, const char* mcast_addr);
+ // Subscribe to a given UDP multicast group
+
+ int log_message (Log_Priority type, char *message);
+ // send a string to the logger
+
+ // = Format of the logging record.
+ struct Log_Record
+ {
+ u_long sequence_number;
+ Log_Priority type;
+ long host;
+ long time;
+ long app_id;
+ long msg_length;
+ };
+
+private:
+ ACE_INET_Addr server_;
+ // Server address where records are logged.
+
+ u_long sequence_number_;
+ // Keep track of the sequence.
+
+ Log_Record log_msg_;
+ // One record used for many log messages.
+
+ ACE_SOCK_Dgram_Mcast logger_;
+ // A logger object.
+};
+
+#endif /* _LOG_WRAPPER_H */
diff --git a/ACE/examples/Reactor/Multicast/Makefile.am b/ACE/examples/Reactor/Multicast/Makefile.am
new file mode 100644
index 00000000000..60d65e7ebe8
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/Makefile.am
@@ -0,0 +1,50 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.Reactor_Multicast_Client.am
+noinst_PROGRAMS = client
+
+client_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+client_SOURCES = \
+ Log_Wrapper.cpp \
+ client.cpp \
+ Log_Wrapper.h
+
+client_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Makefile.Reactor_Multicast_Server.am
+noinst_PROGRAMS += server
+
+server_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+server_SOURCES = \
+ Log_Wrapper.cpp \
+ server.cpp \
+ Log_Wrapper.h
+
+server_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/Reactor/Multicast/README b/ACE/examples/Reactor/Multicast/README
new file mode 100644
index 00000000000..85f64cc8120
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/README
@@ -0,0 +1,15 @@
+The following test illustrates the SOCK Mcast multicast wrappers in
+conjunction with the Reactor. This test was written by Tim Harrison
+(harrison@cs.wustl.edu).
+
+To run the server type:
+
+% server &
+
+It will wait for the first message sent to it and then read for 5 seconds.
+
+To run the client type any of these:
+
+% client -m max_message_size -i iterations
+% client < <filename>
+% client
diff --git a/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc b/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc
new file mode 100644
index 00000000000..a15c53340e4
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc
@@ -0,0 +1,17 @@
+// -*- MPC -*-
+// $Id$
+
+project(*client) : aceexe {
+ exename = client
+ Source_Files {
+ client.cpp
+ Log_Wrapper.cpp
+ }
+}
+project(*server) : aceexe {
+ exename = server
+ Source_Files {
+ server.cpp
+ Log_Wrapper.cpp
+ }
+}
diff --git a/ACE/examples/Reactor/Multicast/client.cpp b/ACE/examples/Reactor/Multicast/client.cpp
new file mode 100644
index 00000000000..25b18c2ae6c
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/client.cpp
@@ -0,0 +1,126 @@
+// $Id$
+
+// This program reads in messages from stdin and sends them to a
+// Log_Wrapper.
+
+#include "ace/OS_main.h"
+#include "ace/OS_NS_stdlib.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_Memory.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "Log_Wrapper.h"
+
+ACE_RCSID(Multicast, client, "$Id$")
+
+// Multi-cast address.
+static const char *MCAST_ADDR = ACE_DEFAULT_MULTICAST_ADDR;
+
+// UDP port.
+static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
+
+// Maximum message size.
+static int max_message_size = BUFSIZ;
+
+// Number of times to send message of max_message_size.
+static int iterations = 0;
+
+static void
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_LOG_MSG->open (argv[0]);
+
+ // Start at argv[1]
+ ACE_Get_Opt getopt (argc, argv, ACE_TEXT("m:ui:"), 1);
+
+ for (int c; (c = getopt ()) != -1; )
+ switch (c)
+ {
+ case 'm':
+ max_message_size = ACE_OS::atoi (getopt.opt_arg ()) * BUFSIZ;
+ break;
+ case 'i':
+ iterations = ACE_OS::atoi (getopt.opt_arg ());
+ break;
+ case 'u':
+ // usage fallthrough
+ default:
+ ACE_ERROR ((LM_ERROR,
+ "%n: -m max_message_size (in k) -i iterations\n%a",
+ 1));
+ /* NOTREACHED */
+ }
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR **argv)
+{
+ int user_prompt;
+
+ parse_args (argc,argv);
+
+ ACE_DEBUG ((LM_DEBUG, "max buffer size = %d\n", max_message_size));
+
+ // Instantiate a log wrapper for logging
+ Log_Wrapper log;
+
+ // Make a connection to a logger.
+ if (log.open (UDP_PORT, MCAST_ADDR) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "open"), -1);
+
+ char *buf;
+ ACE_NEW_RETURN (buf, char[max_message_size], -1);
+
+ // If -i has been specified, send max_message_size messages
+ // iterations number of times.
+ if (iterations)
+ {
+ ACE_OS::memset (buf, 1, max_message_size);
+
+ while (iterations--)
+ if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "log"), -1);
+ }
+
+ // otherwise, a file has been redirected, or give prompts
+ else
+ {
+ // If a file has been redirected, don't activate user prompts.
+ if (ACE_OS::isatty (0))
+ user_prompt = 1;
+ else
+ user_prompt = 0;
+
+ // Continually read messages from stdin and log them.
+
+ for (int count = 1;;)
+ {
+ if (user_prompt)
+ ACE_DEBUG ((LM_DEBUG, "\nEnter message ('Q':quit):\n"));
+
+ ssize_t nbytes = ACE_OS::read (ACE_STDIN, buf, max_message_size);
+
+ if (nbytes <= 0)
+ break; // End of file or error.
+ buf[nbytes - 1] = '\0';
+
+ // Quitting?
+ if (user_prompt)
+ {
+ if (buf[0] == 'Q' || buf[0] == 'q')
+ break;
+ }
+ else // Keep from overrunning the receiver.
+ ACE_OS::sleep (1);
+
+ // Send the message to the logger.
+ if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "log_message"), -1);
+ ACE_DEBUG ((LM_DEBUG, "finished sending message %d\n", count++));
+ }
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Client done.\n"));
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Multicast/server.cpp b/ACE/examples/Reactor/Multicast/server.cpp
new file mode 100644
index 00000000000..65e39b97d1a
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/server.cpp
@@ -0,0 +1,247 @@
+// $Id$
+
+// server.cpp (written by Tim Harrison)
+
+// Listens to multicast address for client log messages. Prints
+// Mbits/sec received from client.
+
+#include "ace/OS_main.h"
+#include "ace/SOCK_Dgram.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Dgram_Mcast.h"
+#include "ace/Reactor.h"
+#include "ace/Log_Msg.h"
+#include "Log_Wrapper.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/os_include/os_netdb.h"
+
+ACE_RCSID(Multicast, server, "$Id$")
+
+#if defined (ACE_HAS_IP_MULTICAST)
+class Server_Events : public ACE_Event_Handler
+{
+public:
+ Server_Events (u_short port,
+ const char *mcast_addr,
+ long time_interval = 0);
+ ~Server_Events (void);
+
+ virtual int handle_input (ACE_HANDLE fd);
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg);
+
+ virtual ACE_HANDLE get_handle (void) const;
+
+ ACE_Time_Value *wait_time (void);
+
+private:
+ char *message_;
+ Log_Wrapper::Log_Record *log_record_;
+ char buf_[4 * BUFSIZ];
+ char hostname_[MAXHOSTNAMELEN];
+
+ int initialized_;
+ int count_;
+ long interval_;
+ // time interval to log messages
+
+ ACE_Time_Value *how_long_;
+ ACE_Reactor *reactor_;
+ ACE_SOCK_Dgram_Mcast mcast_dgram_;
+ ACE_INET_Addr remote_addr_;
+ ACE_INET_Addr mcast_addr_;
+
+ // = statistics on messages received
+ double total_bytes_received_;
+ int total_messages_received_;
+ int last_sequence_number_;
+};
+
+static const char MCAST_ADDR[] = ACE_DEFAULT_MULTICAST_ADDR;
+static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
+static const int DURATION = 5;
+
+ACE_HANDLE
+Server_Events::get_handle (void) const
+{
+ return this->mcast_dgram_.get_handle ();
+}
+
+ACE_Time_Value *
+Server_Events::wait_time (void)
+{
+ return this->how_long_;
+}
+
+Server_Events::Server_Events (u_short port,
+ const char *mcast_addr,
+ long time_interval)
+ : initialized_ (0),
+ count_ (1),
+ interval_ (time_interval),
+ mcast_addr_ (port, mcast_addr),
+ total_bytes_received_ (0)
+{
+ // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
+
+ if (ACE_OS::hostname (this->hostname_,
+ MAXHOSTNAMELEN) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "hostname"));
+
+ else if (this->mcast_dgram_.subscribe (this->mcast_addr_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "subscribe"));
+ else
+ {
+ // Point to NULL so that we block in the beginning.
+ this->how_long_ = 0;
+
+ this->log_record_ = (Log_Wrapper::Log_Record *) &buf_;
+ this->message_ = &buf_[sizeof (Log_Wrapper::Log_Record)];
+ }
+}
+
+// A destructor that emacs refuses to color blue ;-)
+
+Server_Events::~Server_Events (void)
+{
+ this->mcast_dgram_.unsubscribe ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "total bytes received = %d after %d second\n",
+ this->total_bytes_received_,
+ this->interval_));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Mbits/sec = %.2f\n",
+ (float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_))));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "last sequence number = %d\ntotal messages received = %d\ndiff = %d\n",
+ this->last_sequence_number_,
+ this->total_messages_received_,
+ this->last_sequence_number_ - total_messages_received_));
+}
+
+int
+Server_Events::handle_timeout (const ACE_Time_Value &,
+ const void *arg)
+{
+ ACE_DEBUG ((LM_DEBUG, "\t%d timeout%s occurred for %s.\n",
+ this->count_,
+ this->count_ == 1 ? "" : "s",
+ (char *) arg));
+
+ // Don't let the timeouts continue if there's no activity since
+ // otherwise we use up a lot of CPU time unnecessarily.
+ if (this->count_ == 5)
+ {
+ reactor ()->cancel_timer (this);
+ this->initialized_ = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\tcancelled timeout for %s to avoid busy waiting.\n",
+ (char *) arg));
+ }
+
+ this->count_++;
+ return 0;
+}
+
+int
+Server_Events::handle_input (ACE_HANDLE)
+{
+ // Receive message from multicast group.
+ iovec iovp[2];
+ iovp[0].iov_base = buf_;
+ iovp[0].iov_len = sizeof (log_record_);
+ iovp[1].iov_base = &buf_[sizeof (log_record_)];
+ iovp[1].iov_len = 4 * BUFSIZ - sizeof (log_record_);
+
+ ssize_t retcode =
+ this->mcast_dgram_.recv (iovp,
+ 2,
+ this->remote_addr_);
+ if (retcode != -1)
+ {
+ total_messages_received_++;
+ total_bytes_received_ += retcode;
+ last_sequence_number_ =
+ ntohl (log_record_->sequence_number);
+
+ for (char *message_end = this->message_ + ACE_OS::strlen (this->message_) - 1;
+ ACE_OS::strchr ("\r\n \t", *message_end) != 0;
+ )
+ {
+ *message_end-- = '\0';
+ if (message_end == this->message_)
+ break;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "sequence number = %d\n",
+ last_sequence_number_));
+ ACE_DEBUG ((LM_DEBUG,
+ "message = '%s'\n",
+ this->message_));
+
+ if (this->initialized_ == 0)
+ {
+ // Restart the timer since we've received events again.
+ if (reactor()->schedule_timer (this,
+ (void *) this->hostname_,
+ ACE_Time_Value::zero,
+ ACE_Time_Value (DURATION)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "schedule_timer"),
+ -1);
+ this->initialized_ = 1;
+ }
+
+ this->count_ = 1;
+ return 0;
+ }
+ else
+ return -1;
+}
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Instantiate a server which will receive messages for DURATION
+ // seconds.
+ Server_Events server_events (UDP_PORT,
+ MCAST_ADDR,
+ DURATION);
+ // Instance of the ACE_Reactor.
+ ACE_Reactor reactor;
+
+ if (reactor.register_handler (&server_events,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n%a",
+ "register_handler",
+ 1));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "starting up server\n"));
+
+ for (;;)
+ reactor.handle_events (server_events.wait_time ());
+
+ ACE_NOTREACHED (return 0;)
+}
+#else
+int
+main (int, char *argv[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error: %s must be run on a platform that support IP multicast\n",
+ argv[0]), -1);
+}
+#endif /* ACE_HAS_IP_MULTICAST */