diff options
Diffstat (limited to 'ACE/examples/Reactor/Multicast')
-rw-r--r-- | ACE/examples/Reactor/Multicast/.cvsignore | 4 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/Log_Wrapper.cpp | 81 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/Log_Wrapper.h | 68 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/Makefile.am | 50 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/README | 15 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc | 17 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/client.cpp | 126 | ||||
-rw-r--r-- | ACE/examples/Reactor/Multicast/server.cpp | 247 |
8 files changed, 608 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Multicast/.cvsignore b/ACE/examples/Reactor/Multicast/.cvsignore new file mode 100644 index 00000000000..955ffdc75d5 --- /dev/null +++ b/ACE/examples/Reactor/Multicast/.cvsignore @@ -0,0 +1,4 @@ +client +client +server +server diff --git a/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp b/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp new file mode 100644 index 00000000000..055b57b9975 --- /dev/null +++ b/ACE/examples/Reactor/Multicast/Log_Wrapper.cpp @@ -0,0 +1,81 @@ +// $Id$ + +// client.C + +#include "Log_Wrapper.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_utsname.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_netdb.h" + +ACE_RCSID(Multicast, Log_Wrapper, "$Id$") + +Log_Wrapper::Log_Wrapper (void) +{ + sequence_number_ = 0; + this->log_msg_.app_id = ACE_OS::getpid (); +} + +Log_Wrapper::~Log_Wrapper (void) +{ +} + +// Set the log_msg_ host address. + +int +Log_Wrapper::open (const int port, const char *mcast_addr) +{ + struct hostent *host_info; + ACE_utsname host_data; + + if (ACE_OS::uname (&host_data) < 0) + return -1; + +#if defined (ACE_LACKS_UTSNAME_T) + if ((host_info = ACE_OS::gethostbyname + (ACE_TEXT_ALWAYS_CHAR(host_data.nodename))) == NULL) +#else + if ((host_info = ACE_OS::gethostbyname (host_data.nodename)) == NULL) +#endif + return -1; + else + ACE_OS::memcpy ((char *) &this->log_msg_.host, + (char *) host_info->h_addr, + host_info->h_length); + + // This starts out initialized to all zeros! + server_ = ACE_INET_Addr (port, mcast_addr); + + if (logger_.subscribe (server_) == -1) + perror("can't subscribe to multicast group"), exit(1); + + // success. + return 0; +} + +// Send the message to a logger object. +// This wrapper fills in all the log_record info for you. +// uses iovector stuff to make contiguous header and message. + +int +Log_Wrapper::log_message (Log_Priority type, char *message) +{ + sequence_number_++; + + this->log_msg_.type = type; + this->log_msg_.time = time (0); + this->log_msg_.msg_length = ACE_OS::strlen(message)+1; + this->log_msg_.sequence_number = htonl(sequence_number_); + + iovec iovp[2]; + iovp[0].iov_base = reinterpret_cast<char*> (&log_msg_); + iovp[0].iov_len = sizeof (log_msg_); + iovp[1].iov_base = message; + iovp[1].iov_len = log_msg_.msg_length; + + logger_.send (iovp, 2); + + // success. + return 0; +} + diff --git a/ACE/examples/Reactor/Multicast/Log_Wrapper.h b/ACE/examples/Reactor/Multicast/Log_Wrapper.h new file mode 100644 index 00000000000..10458f706bc --- /dev/null +++ b/ACE/examples/Reactor/Multicast/Log_Wrapper.h @@ -0,0 +1,68 @@ +/* -*- C++ -*- */ +// $Id$ + +// log_wrapper.h + +#include "ace/Profile_Timer.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/INET_Addr.h" +#include "ace/SOCK_Dgram_Mcast.h" + +#ifndef _LOG_WRAPPER_H +#define _LOG_WRAPPER_H + +class Log_Wrapper + // = TITLE + // Provide a wrapper around sending log messages via IP + // multicast. +{ +public: + Log_Wrapper (void); + ~Log_Wrapper (void); + + // = Types of logging messages. + enum Log_Priority + { + LM_MESSAGE, + LM_DEBUG, + LM_WARNING, + LM_ERROR, + LM_EMERG + }; + + int open (const int port, const char* mcast_addr); + // Subscribe to a given UDP multicast group + + int log_message (Log_Priority type, char *message); + // send a string to the logger + + // = Format of the logging record. + struct Log_Record + { + u_long sequence_number; + Log_Priority type; + long host; + long time; + long app_id; + long msg_length; + }; + +private: + ACE_INET_Addr server_; + // Server address where records are logged. + + u_long sequence_number_; + // Keep track of the sequence. + + Log_Record log_msg_; + // One record used for many log messages. + + ACE_SOCK_Dgram_Mcast logger_; + // A logger object. +}; + +#endif /* _LOG_WRAPPER_H */ diff --git a/ACE/examples/Reactor/Multicast/Makefile.am b/ACE/examples/Reactor/Multicast/Makefile.am new file mode 100644 index 00000000000..60d65e7ebe8 --- /dev/null +++ b/ACE/examples/Reactor/Multicast/Makefile.am @@ -0,0 +1,50 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.Reactor_Multicast_Client.am +noinst_PROGRAMS = client + +client_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +client_SOURCES = \ + Log_Wrapper.cpp \ + client.cpp \ + Log_Wrapper.h + +client_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.Reactor_Multicast_Server.am +noinst_PROGRAMS += server + +server_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +server_SOURCES = \ + Log_Wrapper.cpp \ + server.cpp \ + Log_Wrapper.h + +server_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/Reactor/Multicast/README b/ACE/examples/Reactor/Multicast/README new file mode 100644 index 00000000000..85f64cc8120 --- /dev/null +++ b/ACE/examples/Reactor/Multicast/README @@ -0,0 +1,15 @@ +The following test illustrates the SOCK Mcast multicast wrappers in +conjunction with the Reactor. This test was written by Tim Harrison +(harrison@cs.wustl.edu). + +To run the server type: + +% server & + +It will wait for the first message sent to it and then read for 5 seconds. + +To run the client type any of these: + +% client -m max_message_size -i iterations +% client < <filename> +% client diff --git a/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc b/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc new file mode 100644 index 00000000000..a15c53340e4 --- /dev/null +++ b/ACE/examples/Reactor/Multicast/Reactor_Multicast.mpc @@ -0,0 +1,17 @@ +// -*- MPC -*- +// $Id$ + +project(*client) : aceexe { + exename = client + Source_Files { + client.cpp + Log_Wrapper.cpp + } +} +project(*server) : aceexe { + exename = server + Source_Files { + server.cpp + Log_Wrapper.cpp + } +} diff --git a/ACE/examples/Reactor/Multicast/client.cpp b/ACE/examples/Reactor/Multicast/client.cpp new file mode 100644 index 00000000000..25b18c2ae6c --- /dev/null +++ b/ACE/examples/Reactor/Multicast/client.cpp @@ -0,0 +1,126 @@ +// $Id$ + +// This program reads in messages from stdin and sends them to a +// Log_Wrapper. + +#include "ace/OS_main.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_Memory.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "Log_Wrapper.h" + +ACE_RCSID(Multicast, client, "$Id$") + +// Multi-cast address. +static const char *MCAST_ADDR = ACE_DEFAULT_MULTICAST_ADDR; + +// UDP port. +static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT; + +// Maximum message size. +static int max_message_size = BUFSIZ; + +// Number of times to send message of max_message_size. +static int iterations = 0; + +static void +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + + // Start at argv[1] + ACE_Get_Opt getopt (argc, argv, ACE_TEXT("m:ui:"), 1); + + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case 'm': + max_message_size = ACE_OS::atoi (getopt.opt_arg ()) * BUFSIZ; + break; + case 'i': + iterations = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'u': + // usage fallthrough + default: + ACE_ERROR ((LM_ERROR, + "%n: -m max_message_size (in k) -i iterations\n%a", + 1)); + /* NOTREACHED */ + } +} + +int +ACE_TMAIN (int argc, ACE_TCHAR **argv) +{ + int user_prompt; + + parse_args (argc,argv); + + ACE_DEBUG ((LM_DEBUG, "max buffer size = %d\n", max_message_size)); + + // Instantiate a log wrapper for logging + Log_Wrapper log; + + // Make a connection to a logger. + if (log.open (UDP_PORT, MCAST_ADDR) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "open"), -1); + + char *buf; + ACE_NEW_RETURN (buf, char[max_message_size], -1); + + // If -i has been specified, send max_message_size messages + // iterations number of times. + if (iterations) + { + ACE_OS::memset (buf, 1, max_message_size); + + while (iterations--) + if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "log"), -1); + } + + // otherwise, a file has been redirected, or give prompts + else + { + // If a file has been redirected, don't activate user prompts. + if (ACE_OS::isatty (0)) + user_prompt = 1; + else + user_prompt = 0; + + // Continually read messages from stdin and log them. + + for (int count = 1;;) + { + if (user_prompt) + ACE_DEBUG ((LM_DEBUG, "\nEnter message ('Q':quit):\n")); + + ssize_t nbytes = ACE_OS::read (ACE_STDIN, buf, max_message_size); + + if (nbytes <= 0) + break; // End of file or error. + buf[nbytes - 1] = '\0'; + + // Quitting? + if (user_prompt) + { + if (buf[0] == 'Q' || buf[0] == 'q') + break; + } + else // Keep from overrunning the receiver. + ACE_OS::sleep (1); + + // Send the message to the logger. + if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n" "log_message"), -1); + ACE_DEBUG ((LM_DEBUG, "finished sending message %d\n", count++)); + } + } + + ACE_DEBUG ((LM_DEBUG, "Client done.\n")); + return 0; +} diff --git a/ACE/examples/Reactor/Multicast/server.cpp b/ACE/examples/Reactor/Multicast/server.cpp new file mode 100644 index 00000000000..65e39b97d1a --- /dev/null +++ b/ACE/examples/Reactor/Multicast/server.cpp @@ -0,0 +1,247 @@ +// $Id$ + +// server.cpp (written by Tim Harrison) + +// Listens to multicast address for client log messages. Prints +// Mbits/sec received from client. + +#include "ace/OS_main.h" +#include "ace/SOCK_Dgram.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Dgram_Mcast.h" +#include "ace/Reactor.h" +#include "ace/Log_Msg.h" +#include "Log_Wrapper.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/os_include/os_netdb.h" + +ACE_RCSID(Multicast, server, "$Id$") + +#if defined (ACE_HAS_IP_MULTICAST) +class Server_Events : public ACE_Event_Handler +{ +public: + Server_Events (u_short port, + const char *mcast_addr, + long time_interval = 0); + ~Server_Events (void); + + virtual int handle_input (ACE_HANDLE fd); + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg); + + virtual ACE_HANDLE get_handle (void) const; + + ACE_Time_Value *wait_time (void); + +private: + char *message_; + Log_Wrapper::Log_Record *log_record_; + char buf_[4 * BUFSIZ]; + char hostname_[MAXHOSTNAMELEN]; + + int initialized_; + int count_; + long interval_; + // time interval to log messages + + ACE_Time_Value *how_long_; + ACE_Reactor *reactor_; + ACE_SOCK_Dgram_Mcast mcast_dgram_; + ACE_INET_Addr remote_addr_; + ACE_INET_Addr mcast_addr_; + + // = statistics on messages received + double total_bytes_received_; + int total_messages_received_; + int last_sequence_number_; +}; + +static const char MCAST_ADDR[] = ACE_DEFAULT_MULTICAST_ADDR; +static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT; +static const int DURATION = 5; + +ACE_HANDLE +Server_Events::get_handle (void) const +{ + return this->mcast_dgram_.get_handle (); +} + +ACE_Time_Value * +Server_Events::wait_time (void) +{ + return this->how_long_; +} + +Server_Events::Server_Events (u_short port, + const char *mcast_addr, + long time_interval) + : initialized_ (0), + count_ (1), + interval_ (time_interval), + mcast_addr_ (port, mcast_addr), + total_bytes_received_ (0) +{ + // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group. + + if (ACE_OS::hostname (this->hostname_, + MAXHOSTNAMELEN) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "hostname")); + + else if (this->mcast_dgram_.subscribe (this->mcast_addr_) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "subscribe")); + else + { + // Point to NULL so that we block in the beginning. + this->how_long_ = 0; + + this->log_record_ = (Log_Wrapper::Log_Record *) &buf_; + this->message_ = &buf_[sizeof (Log_Wrapper::Log_Record)]; + } +} + +// A destructor that emacs refuses to color blue ;-) + +Server_Events::~Server_Events (void) +{ + this->mcast_dgram_.unsubscribe (); + + ACE_DEBUG ((LM_DEBUG, + "total bytes received = %d after %d second\n", + this->total_bytes_received_, + this->interval_)); + + ACE_DEBUG ((LM_DEBUG, + "Mbits/sec = %.2f\n", + (float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_)))); + + ACE_DEBUG ((LM_DEBUG, + "last sequence number = %d\ntotal messages received = %d\ndiff = %d\n", + this->last_sequence_number_, + this->total_messages_received_, + this->last_sequence_number_ - total_messages_received_)); +} + +int +Server_Events::handle_timeout (const ACE_Time_Value &, + const void *arg) +{ + ACE_DEBUG ((LM_DEBUG, "\t%d timeout%s occurred for %s.\n", + this->count_, + this->count_ == 1 ? "" : "s", + (char *) arg)); + + // Don't let the timeouts continue if there's no activity since + // otherwise we use up a lot of CPU time unnecessarily. + if (this->count_ == 5) + { + reactor ()->cancel_timer (this); + this->initialized_ = 0; + + ACE_DEBUG ((LM_DEBUG, + "\tcancelled timeout for %s to avoid busy waiting.\n", + (char *) arg)); + } + + this->count_++; + return 0; +} + +int +Server_Events::handle_input (ACE_HANDLE) +{ + // Receive message from multicast group. + iovec iovp[2]; + iovp[0].iov_base = buf_; + iovp[0].iov_len = sizeof (log_record_); + iovp[1].iov_base = &buf_[sizeof (log_record_)]; + iovp[1].iov_len = 4 * BUFSIZ - sizeof (log_record_); + + ssize_t retcode = + this->mcast_dgram_.recv (iovp, + 2, + this->remote_addr_); + if (retcode != -1) + { + total_messages_received_++; + total_bytes_received_ += retcode; + last_sequence_number_ = + ntohl (log_record_->sequence_number); + + for (char *message_end = this->message_ + ACE_OS::strlen (this->message_) - 1; + ACE_OS::strchr ("\r\n \t", *message_end) != 0; + ) + { + *message_end-- = '\0'; + if (message_end == this->message_) + break; + } + + ACE_DEBUG ((LM_DEBUG, + "sequence number = %d\n", + last_sequence_number_)); + ACE_DEBUG ((LM_DEBUG, + "message = '%s'\n", + this->message_)); + + if (this->initialized_ == 0) + { + // Restart the timer since we've received events again. + if (reactor()->schedule_timer (this, + (void *) this->hostname_, + ACE_Time_Value::zero, + ACE_Time_Value (DURATION)) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "schedule_timer"), + -1); + this->initialized_ = 1; + } + + this->count_ = 1; + return 0; + } + else + return -1; +} + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + // Instantiate a server which will receive messages for DURATION + // seconds. + Server_Events server_events (UDP_PORT, + MCAST_ADDR, + DURATION); + // Instance of the ACE_Reactor. + ACE_Reactor reactor; + + if (reactor.register_handler (&server_events, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n%a", + "register_handler", + 1)); + + ACE_DEBUG ((LM_DEBUG, + "starting up server\n")); + + for (;;) + reactor.handle_events (server_events.wait_time ()); + + ACE_NOTREACHED (return 0;) +} +#else +int +main (int, char *argv[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "error: %s must be run on a platform that support IP multicast\n", + argv[0]), -1); +} +#endif /* ACE_HAS_IP_MULTICAST */ |