diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 13:56:48 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 13:56:48 +0000 |
commit | c4078c377d74290ebe4e66da0b4975da91732376 (patch) | |
tree | 1816ef391e42a07929304908ac0e21f4c2f6cb7b /ACE/examples/ASX/Event_Server/Event_Server | |
parent | 700d1c1a6be348c6c70a2085e559baeb8f4a62ea (diff) | |
download | ATCD-c4078c377d74290ebe4e66da0b4975da91732376.tar.gz |
swap in externals for ACE and TAO
Diffstat (limited to 'ACE/examples/ASX/Event_Server/Event_Server')
14 files changed, 0 insertions, 1978 deletions
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp deleted file mode 100644 index d5215ffcd26..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp +++ /dev/null @@ -1,159 +0,0 @@ -// $Id$ - -#include "ace/os_include/os_assert.h" -#include "ace/OS_NS_stdio.h" -#include "ace/OS_NS_string.h" -#include "Consumer_Router.h" -#include "Options.h" - -ACE_RCSID(Event_Server, Consumer_Router, "$Id$") - -Consumer_Router::Consumer_Router (Peer_Router_Context *prc) - : Peer_Router (prc) -{ - this->context ()->duplicate (); -} - -// Initialize the Router. - -int -Consumer_Router::open (void *) -{ - if (this->is_writer ()) - { - // Set the <Peer_Router_Context> to point back to us so that if - // any Consumer's "accidentally" send us data we'll be able to - // handle it by passing it down the stream. - this->context ()->peer_router (this); - - // Increment the reference count. - this->context ()->duplicate (); - - // Make this an active object to handle the error cases in a - // separate thread. This is mostly just for illustration, i.e., - // it's probably overkill to use a thread for this! - return this->activate (Options::instance ()->t_flags ()); - } - else // if (this->is_reader ()) - - // Nothing to do since this side is primarily used to transmit to - // Consumers, rather than receive. - return 0; -} - -int -Consumer_Router::close (u_long) -{ - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) closing Consumer_Router %s\n"), - this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"))); - - if (this->is_writer ()) - // Inform the thread to shut down. - this->msg_queue ()->deactivate (); - - // Both writer and reader call <release>, so the context knows when - // to clean itself up. - this->context ()->release (); - return 0; -} - -// Handle incoming messages in a separate thread. - -int -Consumer_Router::svc (void) -{ - assert (this->is_writer ()); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) starting svc in Consumer_Router\n"))); - - for (ACE_Message_Block *mb = 0; - this->getq (mb) >= 0; - ) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) warning: Consumer_Router is ") - ACE_TEXT ("forwarding a message to Supplier_Router\n"))); - - // Pass this message down to the next Module's writer Task. - if (this->put_next (mb) == -1) - ACE_ERROR_RETURN - ((LM_ERROR, - ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")), - -1); - } - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) stopping svc in Consumer_Router\n"))); - return 0; - // Note the implicit ACE_OS::thr_exit() via destructor. -} - -// Send a <Message_Block> to the supplier(s). - -int -Consumer_Router::put (ACE_Message_Block *mb, - ACE_Time_Value *) -{ - // Perform the necessary control operations before passing the - // message down the stream. - - if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) - { - this->control (mb); - return this->put_next (mb); - } - - // If we're the reader then we're responsible for broadcasting - // messages to Consumers. - - else if (this->is_reader ()) - { - if (this->context ()->send_peers (mb) == -1) - ACE_ERROR_RETURN - ((LM_ERROR, - ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")), - -1); - else - return 0; - } - else // if (this->is_writer ()) - - // Queue up the message to processed by <Consumer_Router::svc> - // Since we don't expect to be getting many of these messages, we - // queue them up and run them in a separate thread to avoid taxing - // the main thread. - return this->putq (mb); -} - -// Return information about the <Consumer_Router>. -#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR) -# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n") -#else -# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n") -#endif /* ACE_WIN32 || !ACE_USES_WCHAR */ - -int -Consumer_Router::info (ACE_TCHAR **strp, size_t length) const -{ - ACE_TCHAR buf[BUFSIZ]; - ACE_INET_Addr addr; - const ACE_TCHAR *mod_name = this->name (); - - if (this->context ()->acceptor ().get_local_addr (addr) == -1) - return -1; - - ACE_OS::sprintf (buf, - FMTSTR, - mod_name, - addr.get_port_number (), - ACE_TEXT ("tcp"), - ACE_TEXT ("# consumer router"), - this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")); - if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) - return -1; - else - ACE_OS::strncpy (*strp, mod_name, length); - - return ACE_OS::strlen (mod_name); -} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h deleted file mode 100644 index 062a07116ea..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h +++ /dev/null @@ -1,71 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#ifndef _CONSUMER_ROUTER_H -#define _CONSUMER_ROUTER_H - -#include "ace/SOCK_Acceptor.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "ace/UPIPE_Acceptor.h" -#include "ace/Svc_Handler.h" -#include "ace/RW_Thread_Mutex.h" -#include "Peer_Router.h" - -class Consumer_Router : public Peer_Router -{ - // = TITLE - // Provides the interface between one or more Consumers and the - // Event Server <ACE_Stream>. - // - // = DESCRIPTION - // This class normally sits on "top" of the Stream and routes - // messages coming from "downstream" to all the Consumers - // connected to it via its "read" <Task>. Normally, the messages - // flow up the stream from <Supplier_Router>s. However, if - // Consumers transmit data to the <Consumer_Router>, we dutifully - // push it out to the Suppliers via the <Supplier_Router>. - // - // When used on the "reader" side of a Stream, the - // <Consumer_Router> simply forwards all messages up the stream. - // When used on the "writer" side, the <Consumer_Router> queues - // up outgoing messages to suppliers and sends them down to the - // <Supplier_Router> in a separate thread. The reason for this - // is that it's really an "error" for a <Consumer_Router> to - // send messages to Suppliers, so we don't expect this to happen - // very much. When it does we use a separate thread to avoid - // taxing the main thread, which processes "normal" messages. - // - // All of the methods in this class except the constructor are - // called via base class pointers by the <ACE_Stream>. - // Therefore, we can put them in the protected section. -public: - Consumer_Router (Peer_Router_Context *prc); - // Initialization method. - -protected: - // = ACE_Task hooks. - virtual int open (void *a = 0); - // Called by the Stream to initialize the router. - - virtual int close (u_long flags = 0); - // Called by the Stream to shutdown the router. - - virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - // Called by the <Peer_Handler> to pass a message to the - // <Consumer_Router>. The <Consumer_Router> queues up this message, - // which is then processed in the <svc> method in a separate thread. - - virtual int svc (void); - // Runs in a separate thread to dequeue messages and pass them up - // the stream. - - // = Dynamic linking hooks. - virtual int info (ACE_TCHAR **info_string, size_t length) const; - // Returns information about this service. -}; - -#endif /* _CONSUMER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc deleted file mode 100644 index f99e912ce04..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc +++ /dev/null @@ -1,15 +0,0 @@ -// -*- MPC -*- -// $Id$ - -project(*Server) : aceexe { - avoids += ace_for_tao - exename = Event_Server - Source_Files { - Consumer_Router.cpp - Event_Analyzer.cpp - Options.cpp - Peer_Router.cpp - Supplier_Router.cpp - event_server.cpp - } -} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp deleted file mode 100644 index a064da6459a..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp +++ /dev/null @@ -1,80 +0,0 @@ -// $Id$ - -#include "ace/OS_NS_string.h" -#include "Options.h" -#include "Event_Analyzer.h" - -ACE_RCSID(Event_Server, Event_Analyzer, "$Id$") - -int -Event_Analyzer::open (void *) -{ - // No-op for now... - return 0; -} - -int -Event_Analyzer::close (u_long) -{ - // No-op for now... - return 0; -} - -int -Event_Analyzer::control (ACE_Message_Block *mb) -{ - ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); - ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd; - - switch (cmd = ioc->cmd ()) - { - case ACE_IO_Cntl_Msg::SET_LWM: - case ACE_IO_Cntl_Msg::SET_HWM: - this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ()); - break; - default: - break; - } - return 0; -} - -int -Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - if (Options::instance ()->debug ()) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) passing through Event_Analyser::put() (%s)\n"), - this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"))); - - if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) - this->control (mb); - - // Just pass the message along to the next Module in the stream... - return this->put_next (mb); -} - -int -Event_Analyzer::init (int, ACE_TCHAR *[]) -{ - // No-op for now. - return 0; -} - -int -Event_Analyzer::fini (void) -{ - // No-op for now. - return 0; -} - -int -Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const -{ - const ACE_TCHAR *mod_name = this->name (); - - if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) - return -1; - else - ACE_OS::strncpy (*strp, mod_name, length); - return ACE_OS::strlen (mod_name); -} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h deleted file mode 100644 index d4f88c8b68d..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#ifndef _EVENT_ANALYZER_H -#define _EVENT_ANALYZER_H - -#include "ace/Stream.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "ace/Module.h" -#include "ace/Task.h" - -class Event_Analyzer : public ACE_Task<ACE_SYNCH> -{ - // = TITLE - // This class forwards all the <ACE_Message_Block>s it receives - // onto its neighboring Module in the Stream. - // - // = DESCRIPTION - // In a "real" event service, application-specific processing - // would be done in the <put> (or <svc>) method in this class. -public: - // = Initialization hooks called by <ACE_Stream> (not used). - virtual int open (void *a = 0); - virtual int close (u_long flags = 0); - - virtual int put (ACE_Message_Block *msg, - ACE_Time_Value * = 0); - // Entry point into this task. - - // Dynamic linking hooks (not used). - virtual int init (int argc, ACE_TCHAR *argv[]); - virtual int fini (void); - virtual int info (ACE_TCHAR **info_string, - size_t length) const; -private: - virtual int control (ACE_Message_Block *); - // Implements the watermark control processing. -}; - -#endif /* _EVENT_ANALYZER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am deleted file mode 100644 index aefe1873d9d..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am +++ /dev/null @@ -1,50 +0,0 @@ -## Process this file with automake to create Makefile.in -## -## $Id$ -## -## This file was generated by MPC. Any changes made directly to -## this file will be lost the next time it is generated. -## -## MPC Command: -## ./bin/mwc.pl -type automake -noreldefs ACE.mwc - -ACE_BUILDDIR = $(top_builddir) -ACE_ROOT = $(top_srcdir) - - -## Makefile.Event_Server.am - -if !BUILD_ACE_FOR_TAO - -noinst_PROGRAMS = Event_Server - -Event_Server_CPPFLAGS = \ - -I$(ACE_ROOT) \ - -I$(ACE_BUILDDIR) - -Event_Server_SOURCES = \ - Consumer_Router.cpp \ - Event_Analyzer.cpp \ - Options.cpp \ - Peer_Router.cpp \ - Supplier_Router.cpp \ - event_server.cpp \ - Consumer_Router.h \ - Event_Analyzer.h \ - Options.h \ - Options.inl \ - Peer_Router.h \ - Supplier_Router.h - -Event_Server_LDADD = \ - $(ACE_BUILDDIR)/ace/libACE.la - -endif !BUILD_ACE_FOR_TAO - -## Clean up template repositories, etc. -clean-local: - -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* - -rm -f gcctemp.c gcctemp so_locations *.ics - -rm -rf cxx_repository ptrepository ti_files - -rm -rf templateregistry ir.out - -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp deleted file mode 100644 index 8683f48d153..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp +++ /dev/null @@ -1,208 +0,0 @@ -// $Id$ - -#include "ace/Get_Opt.h" -#include "ace/Thread.h" -#include "ace/Log_Msg.h" -#include "ace/OS_NS_stdio.h" -#if defined (ACE_HAS_TRACE) -# include "ace/OS_NS_strings.h" -#endif /* ACE_HAS_TRACE */ - -#include "Options.h" - -ACE_RCSID(Event_Server, Options, "$Id$") - -/* static */ -Options *Options::instance_ = 0; - -Options * -Options::instance (void) -{ - if (Options::instance_ == 0) - Options::instance_ = new Options; - - return Options::instance_; -} - -Options::Options (void) - : thr_count_ (4), - t_flags_ (0), - high_water_mark_ (8 * 1024), - low_water_mark_ (1024), - message_size_ (128), - initial_queue_length_ (0), - iterations_ (100000), - debugging_ (0), - verbosity_ (0), - consumer_port_ (ACE_DEFAULT_SERVER_PORT), - supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1) -{ -} - -Options::~Options (void) -{ -} - -void Options::print_results (void) -{ -#if !defined (ACE_WIN32) - ACE_Profile_Timer::ACE_Elapsed_Time et; - - this->itimer_.elapsed_time (et); - - if (this->verbose ()) - { -#if defined (ACE_HAS_PRUSAGE_T) - ACE_Profile_Timer::Rusage rusage; - this->itimer_.get_rusage (rusage); - - ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ()); - ACE_OS::printf ("%8d = lwpid\n" - "%8d = lwp count\n" - "%8d = minor page faults\n" - "%8d = major page faults\n" - "%8d = input blocks\n" - "%8d = output blocks\n" - "%8d = messages sent\n" - "%8d = messages received\n" - "%8d = signals received\n" - "%8ds, %dms = wait-cpu (latency) time\n" - "%8ds, %dms = user lock wait sleep time\n" - "%8ds, %dms = all other sleep time\n" - "%8d = voluntary context switches\n" - "%8d = involuntary context switches\n" - "%8d = system calls\n" - "%8d = chars read/written\n", - (int) rusage.pr_lwpid, - (int) rusage.pr_count, - (int) rusage.pr_minf, - (int) rusage.pr_majf, - (int) rusage.pr_inblk, - (int) rusage.pr_oublk, - (int) rusage.pr_msnd, - (int) rusage.pr_mrcv, - (int) rusage.pr_sigs, - (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000, - (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000, - (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000, - (int) rusage.pr_vctx, - (int) rusage.pr_ictx, - (int) rusage.pr_sysc, - (int) rusage.pr_ioch); -#else - /* Someone needs to write the corresponding dump for rusage... */ -#endif /* ACE_HAS_PRUSAGE_T */ - } - - ACE_OS::printf ("---------------------\n" - "real time = %.3f\n" - "user time = %.3f\n" - "system time = %.3f\n" - "---------------------\n", - et.real_time, et.user_time, et.system_time); -#endif /* ACE_WIN32 */ -} - -void -Options::parse_args (int argc, ACE_TCHAR *argv[]) -{ - //FUZZ: disable check_for_lack_ACE_OS - ACE_LOG_MSG->open (argv[0]); - - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("c:bdH:i:L:l:M:ns:t:T:v")); - int c; - - while ((c = get_opt ()) != EOF) - //FUZZ: enable check_for_lack_ACE_OS - switch (c) - { - case 'b': - this->t_flags (THR_BOUND); - break; - case 'c': - this->consumer_port (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'd': - this->debugging_ = 1; - break; - case 'H': - this->high_water_mark (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'i': - this->iterations (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'L': - this->low_water_mark (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'l': - this->initial_queue_length (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'M': - this->message_size (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'n': - this->t_flags (THR_NEW_LWP); - break; - case 's': - this->supplier_port (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'T': - #if defined (ACE_HAS_TRACE) - if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("ON")) == 0) - ACE_Trace::start_tracing (); - else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("OFF")) == 0) - ACE_Trace::stop_tracing (); - #endif /* ACE_HAS_TRACE */ - break; - case 't': - this->thr_count (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'v': - this->verbosity_ = 1; - break; - default: - ACE_OS::fprintf (stderr, "%s\n" - "\t[-b] (THR_BOUND)\n" - "\t[-c consumer port]\n" - "\t[-d] (enable debugging)\n" - "\t[-H high water mark]\n" - "\t[-i number of test iterations]\n" - "\t[-L low water mark]\n" - "\t[-M] message size \n" - "\t[-n] (THR_NEW_LWP)\n" - "\t[-q max queue size]\n" - "\t[-s supplier port]\n" - "\t[-t number of threads]\n" - "\t[-v] (verbose) \n", - ACE_TEXT_ALWAYS_CHAR (argv[0])); - ACE_OS::exit (1); - /* NOTREACHED */ - break; - } - - // This is a major hack to get the size_t format spec to be a narrow - // char, same as the other strings for printf() here. It only works - // because this is the end of the source file. It makes the - // ACE_SIZE_T_FORMAT_SPECIFIER not use ACE_TEXT, effectively. -#undef ACE_TEXT -#define ACE_TEXT(A) A - if (this->verbose ()) - ACE_OS::printf ("%8d = initial concurrency hint\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n" - ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n" - "%8d = THR_BOUND\n" - "%8d = THR_NEW_LWP\n", - ACE_Thread::getconcurrency (), - this->iterations (), - this->thr_count (), - this->low_water_mark (), - this->high_water_mark (), - this->message_size (), - this->initial_queue_length (), - (this->t_flags () & THR_BOUND) != 0, - (this->t_flags () & THR_NEW_LWP) != 0); -} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.h b/ACE/examples/ASX/Event_Server/Event_Server/Options.h deleted file mode 100644 index 4c935434d26..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Options.h +++ /dev/null @@ -1,122 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#ifndef OPTIONS_H -#define OPTIONS_H - -#include "ace/config-all.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "ace/Profile_Timer.h" - -class Options -{ - // = TITLE - // Option Singleton for Event Server. -public: - static Options *instance (void); - // Singleton access point. - - void parse_args (int argc, ACE_TCHAR *argv[]); - // Parse the command-line arguments and set the options. - - // = Timer management. - void stop_timer (void); - void start_timer (void); - - // = Set/get the number of threads. - void thr_count (size_t count); - size_t thr_count (void); - - // = Set/get the size of the queue. - void initial_queue_length (size_t length); - size_t initial_queue_length (void); - - // = Set/get the high water mark. - void high_water_mark (size_t size); - size_t high_water_mark (void); - - // = Set/get the high water mark. - void low_water_mark (size_t size); - size_t low_water_mark (void); - - // = Set/get the size of a message. - void message_size (size_t size); - size_t message_size (void); - - // = Set/get the number of iterations. - void iterations (size_t n); - size_t iterations (void); - - // Set/get threading flags. - void t_flags (long flag); - long t_flags (void); - - // Set/get supplier port number. - void supplier_port (u_short port); - u_short supplier_port (void); - - // Set/get consumer port number. - void consumer_port (u_short port); - u_short consumer_port (void); - - // Enabled if we're in debugging mode. - int debug (void); - - // Enabled if we're in verbose mode. - int verbose (void); - - // Print the results to the STDERR. - void print_results (void); - -private: - // = Ensure we're a Singleton. - Options (void); - ~Options (void); - - ACE_Profile_Timer itimer_; - // Time the process. - - size_t thr_count_; - // Number of threads to spawn. - - long t_flags_; - // Flags to <thr_create>. - - size_t high_water_mark_; - // ACE_Task high water mark. - - size_t low_water_mark_; - // ACE_Task low water mark. - - size_t message_size_; - // Size of a message. - - size_t initial_queue_length_; - // Initial number of items in the queue. - - size_t iterations_; - // Number of iterations to run the test program. - - int debugging_; - // Extra debugging info. - - int verbosity_; - // Extra verbose messages. - - u_short consumer_port_; - // Port that the Consumer_Router is using. - - u_short supplier_port_; - // Port that the Supplier_Router is using. - - static Options *instance_; - // Static Singleton. - -}; - -#include "Options.inl" -#endif /* OPTIONS_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl deleted file mode 100644 index 87ff395c503..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl +++ /dev/null @@ -1,141 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -/* Option manager for ustreams */ - -// Since this is only included in Options.h these should stay -// inline, not ACE_INLINE. -// FUZZ: disable check_for_inline - -inline void -Options::supplier_port (u_short port) -{ - this->supplier_port_ = port; -} - -inline u_short -Options::supplier_port (void) -{ - return this->supplier_port_; -} - -inline void -Options::consumer_port (u_short port) -{ - this->consumer_port_ = port; -} - -inline u_short -Options::consumer_port (void) -{ - return this->consumer_port_; -} - -inline void -Options::start_timer (void) -{ - this->itimer_.start (); -} - -inline void -Options::stop_timer (void) -{ - this->itimer_.stop (); -} - -inline void -Options::thr_count (size_t count) -{ - this->thr_count_ = count; -} - -inline size_t -Options::thr_count (void) -{ - return this->thr_count_; -} - -inline void -Options::initial_queue_length (size_t length) -{ - this->initial_queue_length_ = length; -} - -inline size_t -Options::initial_queue_length (void) -{ - return this->initial_queue_length_; -} - -inline void -Options::high_water_mark (size_t size) -{ - this->high_water_mark_ = size; -} - -inline size_t -Options::high_water_mark (void) -{ - return this->high_water_mark_; -} - -inline void -Options::low_water_mark (size_t size) -{ - this->low_water_mark_ = size; -} - -inline size_t -Options::low_water_mark (void) -{ - return this->low_water_mark_; -} - -inline void -Options::message_size (size_t size) -{ - this->message_size_ = size; -} - -inline size_t -Options::message_size (void) -{ - return this->message_size_; -} - -inline void -Options::iterations (size_t n) -{ - this->iterations_ = n; -} - -inline size_t -Options::iterations (void) -{ - return this->iterations_; -} - -inline void -Options::t_flags (long flag) -{ - this->t_flags_ |= flag; -} - -inline long -Options::t_flags (void) -{ - return this->t_flags_; -} - -inline int -Options::debug (void) -{ - return this->debugging_; -} - -inline int -Options::verbose (void) -{ - return this->verbosity_; -} - diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp deleted file mode 100644 index cb82eec16df..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp +++ /dev/null @@ -1,435 +0,0 @@ -// $Id$ - -#if !defined (_PEER_ROUTER_C) -#define _PEER_ROUTER_C - -#include "ace/Service_Config.h" -#include "ace/Get_Opt.h" -#include "Options.h" -#include "Peer_Router.h" - -ACE_RCSID(Event_Server, Peer_Router, "$Id$") - -// Send the <ACE_Message_Block> to all the peers. Note that in a -// "real" application this logic would most likely be more selective, -// i.e., it would actually do "routing" based on addressing -// information passed in the <ACE_Message_Block>. - -int -Peer_Router_Context::send_peers (ACE_Message_Block *mb) -{ - PEER_ITERATOR map_iter = this->peer_map_; - int bytes = 0; - int iterations = 0; - - // Skip past the header and get the message to send. - ACE_Message_Block *data_block = mb->cont (); - - // Use an iterator to "multicast" the data to *all* the registered - // peers. Note that this doesn't really multicast, it just makes a - // "logical" copy of the <ACE_Message_Block> and enqueues it in the - // appropriate <Peer_Handler> corresponding to each peer. Note that - // a "real" application would probably "route" the data to a subset - // of connected peers here, rather than send it to all the peers. - - for (PEER_ENTRY *ss = 0; - map_iter.next (ss) != 0; - map_iter.advance ()) - { - if (Options::instance ()->debug ()) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) sending to peer via handle %d\n"), - ss->ext_id_)); - - iterations++; - - // Increment reference count before sending since the - // <Peer_Handler> might be running in its own thread of control. - bytes += ss->int_id_->put (data_block->duplicate ()); - } - - mb->release (); - return bytes == 0 ? 0 : bytes / iterations; -} - -// Remove the <Peer_Handler> from the peer connection map. - -int -Peer_Router_Context::unbind_peer (ROUTING_KEY key) -{ - return this->peer_map_.unbind (key); -} - -// Add the <Peer_Handler> to the peer connection map. - -int -Peer_Router_Context::bind_peer (ROUTING_KEY key, - Peer_Handler *peer_handler) -{ - return this->peer_map_.bind (key, peer_handler); -} - -void -Peer_Router_Context::duplicate (void) -{ - this->reference_count_++; -} - -void -Peer_Router_Context::release (void) -{ - ACE_ASSERT (this->reference_count_ > 0); - this->reference_count_--; - - if (this->reference_count_ == 0) - delete this; -} - -Peer_Router_Context::Peer_Router_Context (u_short port) - : reference_count_ (0) -{ - // Initialize the Acceptor's "listen-mode" socket. - ACE_INET_Addr endpoint (port); - if (this->open (endpoint) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Acceptor::open"))); - - // Initialize the connection map. - else if (this->peer_map_.open () == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Map_Manager::open"))); - else - { - ACE_INET_Addr addr; - - if (this->acceptor ().get_local_addr (addr) != -1) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) initializing %C on port = %d, handle = %d, this = %u\n"), - addr.get_port_number () == Options::instance ()->supplier_port () - ? "Supplier_Handler" : "Consumer_Handler", - addr.get_port_number (), - this->acceptor().get_handle (), - this)); - else - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("get_local_addr"))); - } -} - -Peer_Router_Context::~Peer_Router_Context (void) -{ - // Free up the handle and close down the listening socket. - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) closing down Peer_Router_Context\n"))); - - // Close down the Acceptor and take ourselves out of the Reactor. - this->handle_close (); - - PEER_ITERATOR map_iter = this->peer_map_; - - // Make sure to take all the handles out of the map to avoid - // "resource leaks." - - for (PEER_ENTRY *ss = 0; - map_iter.next (ss) != 0; - map_iter.advance ()) - { - if (Options::instance ()->debug ()) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) closing down peer on handle %d\n"), - ss->ext_id_)); - - if (ACE_Reactor::instance ()->remove_handler - (ss->ext_id_, - ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) p\n"), - ACE_TEXT ("remove_handle"))); - } - - // Close down the map. - this->peer_map_.close (); -} - -Peer_Router * -Peer_Router_Context::peer_router (void) -{ - return this->peer_router_; -} - -void -Peer_Router_Context::peer_router (Peer_Router *pr) -{ - this->peer_router_ = pr; -} - -// Factory Method that creates a new <Peer_Handler> for each -// connection. - -int -Peer_Router_Context::make_svc_handler (Peer_Handler *&sh) -{ - ACE_NEW_RETURN (sh, - Peer_Handler (this), - -1); - return 0; -} - -Peer_Handler::Peer_Handler (Peer_Router_Context *prc) - : peer_router_context_ (prc) -{ -} - -// Send output to a peer. Note that this implementation "blocks" if -// flow control occurs. This is undesirable for "real" applications. -// The best way around this is to make the <Peer_Handler> an Active -// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway -// application. - -int -Peer_Handler::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ -#if 0 - // If we're running as Active Objects just enqueue the message here. - return this->putq (mb, tv); -#else - ACE_UNUSED_ARG (tv); - - int result = this->peer ().send_n (mb->rd_ptr (), - mb->length ()); - // Release the memory. - mb->release (); - - return result; -#endif /* 0 */ -} - -// Initialize a newly connected handler. - -int -Peer_Handler::open (void *) -{ - ACE_TCHAR buf[BUFSIZ], *p = buf; - - if (this->peer_router_context_->peer_router ()->info (&p, - sizeof buf) != -1) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) creating handler for %s, handle = %d\n"), - buf, - this->get_handle ())); - else - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("info")), - -1); -#if 0 - // If we're running as an Active Object activate the Peer_Handler - // here. - if (this->activate (Options::instance ()->t_flags ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("activation of thread failed")), - -1); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Peer_Handler::open registering with Reactor for handle_input\n"))); -#else - - // Register with the Reactor to receive messages from our Peer. - if (ACE_Reactor::instance ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("register_handler")), - -1); -#endif /* 0 */ - - // Insert outselves into the routing map. - else if (this->peer_router_context_->bind_peer (this->get_handle (), - this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("bind_peer")), - -1); - else - return 0; -} - -// Receive a message from a Peer. - -int -Peer_Handler::handle_input (ACE_HANDLE h) -{ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) input arrived on handle %d\n"), - h)); - - ACE_Message_Block *db; - - ACE_NEW_RETURN (db, ACE_Message_Block (BUFSIZ), -1); - - ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY), - ACE_Message_Block::MB_PROTO, db); - // Check for memory failures. - if (hb == 0) - { - db->release (); - errno = ENOMEM; - return -1; - } - - ssize_t n = this->peer ().recv (db->rd_ptr (), - db->size ()); - - if (n == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p"), - ACE_TEXT ("recv failed")), - -1); - else if (n == 0) // Client has closed down the connection. - { - if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p"), - ACE_TEXT ("unbind failed")), - -1); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) shutting down handle %d\n"), h)); - // Instruct the <ACE_Reactor> to deregister us by returning -1. - return -1; - } - else - { - // Transform incoming buffer into an <ACE_Message_Block>. - - // First, increment the write pointer to the end of the newly - // read data block. - db->wr_ptr (n); - - // Second, copy the "address" into the header block. Note that - // for this implementation the HANDLE we receive the message on - // is considered the "address." A "real" application would want - // to do something more sophisticated. - *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); - - // Third, update the write pointer in the header block. - hb->wr_ptr (sizeof (ACE_HANDLE)); - - // Finally, pass the message through the stream. Note that we - // use <Task::put> here because this gives the method at *our* - // level in the stream a chance to do something with the message - // before it is sent up the other side. For instance, if we - // receive messages in the <Supplier_Router>, it will just call - // <put_next> and send them up the stream to the - // <Consumer_Router> (which broadcasts them to consumers). - // However, if we receive messages in the <Consumer_Router>, it - // could reply to the Consumer with an error since it's not - // correct for Consumers to send messages (we don't do this in - // the current implementation, but it could be done in a "real" - // application). - - if (this->peer_router_context_->peer_router ()->put (hb) == -1) - return -1; - else - return 0; - } -} - -Peer_Router::Peer_Router (Peer_Router_Context *prc) - : peer_router_context_ (prc) -{ -} - -Peer_Router_Context * -Peer_Router::context (void) const -{ - return this->peer_router_context_; -} - -int -Peer_Router::control (ACE_Message_Block *mb) -{ - ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); - ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; - - switch (command = ioc->cmd ()) - { - case ACE_IO_Cntl_Msg::SET_LWM: - case ACE_IO_Cntl_Msg::SET_HWM: - this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); - break; - default: - return -1; - } - return 0; -} - -#if 0 - -// Right now, Peer_Handlers are purely Reactive, i.e., they all run in -// a single thread of control. It would be easy to make them Active -// Objects by calling activate() in Peer_Handler::open(), making -// Peer_Handler::put() enqueue each message on the message queue, and -// (3) then running the following svc() routine to route each message -// to its final destination within a separate thread. Note that we'd -// want to move the svc() call up to the Consumer_Router and -// Supplier_Router level in order to get the right level of control -// for input and output. - -Peer_Handler::svc (void) -{ - ACE_Message_Block *db, *hb; - - // Do an endless loop - for (;;) - { - db = new Message_Block (BUFSIZ); - hb = new Message_Block (sizeof (ROUTING_KEY), - Message_Block::MB_PROTO, - db); - - ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ()); - - if (n == -1) - LM_ERROR_RETURN ((LOG_ERROR, - ACE_TEXT ("%p"), - ACE_TEXT ("recv failed")), - -1); - else if (n == 0) // Client has closed down the connection. - { - if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1) - LM_ERROR_RETURN ((LOG_ERROR, - ACE_TEXT ("%p"), - ACE_TEXT ("unbind failed")), - -1); - LM_DEBUG ((LOG_DEBUG, - ACE_TEXT ("(%t) shutting down \n"))); - - // We do not need to be deregistered by reactor - // as we were not registered at all. - return -1; - } - else - { - // Transform incoming buffer into a Message. - db->wr_ptr (n); - *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. - hb->wr_ptr (sizeof (long)); - - // Pass the message to the stream. - if (this->peer_router_context_->peer_router ()->reply (hb) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("Peer_Handler.svc : peer_router->reply failed")), - -1); - } - } - return 0; -} -#endif /* 0 */ -#endif /* _PEER_ROUTER_C */ - diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h deleted file mode 100644 index 044ef07ea07..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h +++ /dev/null @@ -1,158 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#ifndef _PEER_ROUTER_H -#define _PEER_ROUTER_H - -#include "ace/Acceptor.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "ace/SOCK_Acceptor.h" -#include "ace/Svc_Handler.h" -#include "ace/Map_Manager.h" -#include "ace/RW_Thread_Mutex.h" - -// Type of search key for CONSUMER_MAP -typedef ACE_HANDLE ROUTING_KEY; - -// Forward declarations. -class Peer_Router; -class Peer_Router_Context; - -class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> -{ - // = TITLE - // Receive input from a Peer and forward to the appropriate - // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>). -public: - Peer_Handler (Peer_Router_Context * = 0); - // Initialization method. - - virtual int open (void * = 0); - // Called by the ACE_Acceptor::handle_input() to activate this - // object. - - virtual int handle_input (ACE_HANDLE); - // Receive input from a peer. - - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Send output to a peer. Note that this implementation "blocks" if - // flow control occurs. This is undesirable for "real" - // applications. The best way around this is to make the - // <Peer_Handler> an Active Object, e.g., as done in the - // $ACE_ROOT/apps/Gateway/Gateway application. - -protected: - Peer_Router_Context *peer_router_context_; - // Pointer to router context. This maintains the state that is - // shared by both Tasks in a <Peer_Router> Module. -}; - -class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> -{ - // = TITLE - // Defines state and behavior shared between both Tasks in a - // <Peer_Router> Module. - // - // = DESCRIPTION - // This class also serves as an <ACE_Acceptor>, which creates - // <Peer_Handlers> when Peers connect. -public: - // = Initialization and termination methods. - Peer_Router_Context (u_short port); - // Constructor. - - virtual int unbind_peer (ROUTING_KEY); - // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds - // to the <ROUTING_KEY>. - - virtual int bind_peer (ROUTING_KEY, Peer_Handler *); - // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the - // <ROUTING_KEY>. - - int send_peers (ACE_Message_Block *mb); - // Send the <ACE_Message_Block> to all the peers. Note that in a - // "real" application this logic would most likely be more - // selective, i.e., it would actually do "routing" based on - // addressing information passed in the <ACE_Message_Block>. - - int make_svc_handler (Peer_Handler *&sh); - // Factory Method that creates a new <Peer_Handler> for each - // connection. This method overrides the default behavior in - // <ACE_Acceptor>. - - // = Set/Get Router Task. - Peer_Router *peer_router (void); - void peer_router (Peer_Router *); - - void release (void); - // Decrement the reference count and delete <this> when count == 0; - - void duplicate (void); - // Increment the reference count. - -private: - Peer_Router *peer_router_; - // Pointer to the <Peer_Router> that we are accepting for. - - // = Useful typedefs. - typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> - PEER_MAP; - typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> - PEER_ITERATOR; - typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> - PEER_ENTRY; - - PEER_MAP peer_map_; - // Map used to keep track of active peers. - - int reference_count_; - // Keep track of when we can delete ourselves. - - ~Peer_Router_Context (void); - // Private to ensure dynamic allocation. - - friend class Friend_Of_Peer_Router_Context; - // Declare a friend class to avoid compiler warnings because the - // destructor is private. -}; - -class Peer_Router : public ACE_Task<ACE_SYNCH> -{ - // = TITLE - // This abstract base class provides mechanisms for routing - // messages to/from a <ACE_Stream> from/to one or more peers (which - // are typically running on remote hosts). - // - // = DESCRIPTION - // Subclasses of <Peer_Router> (such as <Consumer_Router> or - // <Supplier_Router>) override the <open>, <close>, and - // <put> methods to specialize the behavior of the router to - // meet application-specific requirements. -protected: - Peer_Router (Peer_Router_Context *prc); - // Initialization method. - - virtual int control (ACE_Message_Block *); - // Handle control messages arriving from adjacent Modules. - - Peer_Router_Context *context (void) const; - // Returns the routing context. - - typedef ACE_Task<ACE_SYNCH> inherited; - // Helpful typedef. - -private: - Peer_Router_Context *peer_router_context_; - // Reference to the context shared by the writer and reader Tasks, - // e.g., in the <Consumer_Router> and <Supplier_Router> Modules. - - // = Prevent copies and pass-by-value. - Peer_Router (const Peer_Router &); - void operator= (const Peer_Router &); -}; - -#endif /* _PEER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp deleted file mode 100644 index 72c43ee6312..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp +++ /dev/null @@ -1,165 +0,0 @@ -// $Id$ - -#include "ace/os_include/os_assert.h" -#include "ace/OS_NS_stdio.h" -#include "ace/OS_NS_string.h" -#include "Supplier_Router.h" -#include "Options.h" - -ACE_RCSID(Event_Server, Supplier_Router, "$Id$") - -// Handle outgoing messages in a separate thread. - -int -Supplier_Router::svc (void) -{ - assert (this->is_writer ()); - - ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n")); - - for (ACE_Message_Block *mb = 0; - this->getq (mb) >= 0; - ) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) warning: Supplier_Router is " - "forwarding a message via send_peers\n")); - - // Broadcast the message to the Suppliers, even though this is - // "incorrect" (assuming a oneway flow of events from Suppliers - // to Consumers)! - - if (this->context ()->send_peers (mb) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) send_peers failed in Supplier_Router\n"), - -1); - } - - ACE_DEBUG ((LM_DEBUG, - "(%t) stopping svc in Supplier_Router\n")); - return 0; -} - -Supplier_Router::Supplier_Router (Peer_Router_Context *prc) - : Peer_Router (prc) -{ - // Increment the reference count. - this->context ()->duplicate (); -} - -// Initialize the Supplier Router. - -int -Supplier_Router::open (void *) -{ - if (this->is_reader ()) - { - // Set the <Peer_Router_Context> to point back to us so that all - // the Peer_Handler's <put> their incoming <Message_Blocks> to - // our reader Task. - this->context ()->peer_router (this); - return 0; - } - - else // if (this->is_writer () - { - // Increment the reference count. - this->context ()->duplicate (); - - // Make this an active object to handle the error cases in a - // separate thread. - return this->activate (Options::instance ()->t_flags ()); - } -} - -// Close down the router. - -int -Supplier_Router::close (u_long) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) closing Supplier_Router %s\n", - this->is_reader () ? "reader" : "writer")); - - if (this->is_writer ()) - // Inform the thread to shut down. - this->msg_queue ()->deactivate (); - - // Both writer and reader call release(), so the context knows when - // to clean itself up. - this->context ()->release (); - return 0; -} - -// Send an <ACE_Message_Block> to the supplier(s). - -int -Supplier_Router::put (ACE_Message_Block *mb, - ACE_Time_Value *) -{ - // Perform the necessary control operations before passing - // the message up the stream. - - if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) - { - this->control (mb); - return this->put_next (mb); - } - - // If we're the reader then we are responsible for pass messages up - // to the next Module's reader Task. Note that in a "real" - // application this is likely where we'd take a look a the actual - // information that was in the message, e.g., in order to figure out - // what operation it was and what it's "parameters" where, etc. - else if (this->is_reader ()) - return this->put_next (mb); - - else // if (this->is_writer ()) - { - // Someone is trying to write to the Supplier. In this - // implementation this is considered an "error." However, we'll - // just go ahead and forward the message to the Supplier (who - // hopefully is prepared to receive it). - ACE_DEBUG ((LM_WARNING, - "(%t) warning: sending to a Supplier\n")); - - // Queue up the message to processed by <Supplier_Router::svc>. - // Since we don't expect to be getting many of these messages, - // we queue them up and run them in a separate thread to avoid - // taxing the main thread. - return this->putq (mb); - } -} - -// Return information about the <Supplier_Router>. -#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR) -# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n") -#else -# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n") -#endif /* ACE_WIN32 || !ACE_USES_WCHAR */ - -int -Supplier_Router::info (ACE_TCHAR **strp, size_t length) const -{ - ACE_TCHAR buf[BUFSIZ]; - ACE_INET_Addr addr; - const ACE_TCHAR *mod_name = this->name (); - - if (this->context ()->acceptor ().get_local_addr (addr) == -1) - return -1; - - ACE_OS::sprintf (buf, - FMTSTR, - mod_name, - addr.get_port_number (), - ACE_TEXT ("tcp"), - ACE_TEXT ("# supplier router"), - this->is_reader () ? - ACE_TEXT ("reader") : ACE_TEXT ("writer")); - if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) - return -1; - else - ACE_OS::strncpy (*strp, mod_name, length); - - return ACE_OS::strlen (mod_name); -} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h deleted file mode 100644 index 8a42943c147..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h +++ /dev/null @@ -1,72 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#ifndef _SUPPLIER_ROUTER_H -#define _SUPPLIER_ROUTER_H - -#include "ace/INET_Addr.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "ace/SOCK_Acceptor.h" -#include "ace/Map_Manager.h" -#include "ace/Svc_Handler.h" -#include "Peer_Router.h" - -class Supplier_Router : public Peer_Router -{ - // = TITLE - // Provides the interface between one or more Suppliers and the - // Event Server ACE_Stream. - // - // = DESCRIPTION - // This class normally sits on "bottom" of the Stream and sends - // all messages coming from Suppliers via its "write" <Task> - // "upstream" to all the Consumers connected to the - // <Consumer_Router>. Normally, the messages flow up the - // stream to <Consumer_Router>s. However, if Consumers - // transmit data to the <Consumer_Router>, we dutifully push it - // out to the Suppliers via the <Supplier_Router>. - // - // When used on the "reader" side of a Stream, the - // <Supplier_Router> simply forwards all messages up the stream. - // When used on the "writer" side, the <Supplier_Router> queues - // up outgoing messages to suppliers and sends them in a - // separate thread. The reason for this is that it's really an - // "error" for a <Supplier_Router> to send messages to - // Suppliers, so we don't expect this to happen very much. When - // it does we use a separate thread to avoid taxing the main - // thread, which processes "normal" messages. - // - // All of these methods are called via base class pointers by - // the <ACE_Stream> apparatus. Therefore, we can put them in - // the protected section. -public: - Supplier_Router (Peer_Router_Context *prc); - // Initialization method. - -protected: - // = ACE_Task hooks. - - virtual int open (void *a = 0); - // Called by the Stream to initialize the router. - - virtual int close (u_long flags = 0); - // Called by the Stream to shutdown the router. - - virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - // Called by the <SUPPLIER_HANDLER> to pass a message to the Router. - // The Router queues up this message, which is then processed in the - // <svc> method in a separate thread. - - virtual int svc (void); - // Runs in a separate thread to dequeue messages and pass them up - // the stream. - - virtual int info (ACE_TCHAR **info_string, size_t length) const; - // Dynamic linking hook. -}; - -#endif /* _SUPPLIER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp deleted file mode 100644 index 3858bad1fc2..00000000000 --- a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp +++ /dev/null @@ -1,258 +0,0 @@ -// $Id$ - -// Main driver program for the event server example. - -#include "ace/OS_main.h" -#include "ace/Service_Config.h" -#include "ace/OS_NS_unistd.h" -#include "Options.h" -#include "Consumer_Router.h" -#include "Event_Analyzer.h" -#include "Supplier_Router.h" -#include "ace/Sig_Adapter.h" -#include "ace/Stream.h" - -ACE_RCSID (Event_Server, - event_server, - "$Id$") - -// Typedef these components to handle multi-threading correctly. -typedef ACE_Stream<ACE_SYNCH> MT_Stream; -typedef ACE_Module<ACE_SYNCH> MT_Module; - - -class Event_Server : public ACE_Sig_Adapter -{ - // = TITLE - // Run the logic for the <Event_Server>. - - // - // = DESCRIPTION - // In addition to packaging the <Event_Server> components, this - // class also handles SIGINT and terminate the entire - // application process. There are several ways to terminate - // this application process: - // - // 1. Send a SIGINT signal (e.g., via ^C) - // 2. Type any character on the STDIN. - // - // Note that by inheriting from the <ACE_Sig_Adapter> we can - // shutdown the <ACE_Reactor> cleanly when a SIGINT is - // generated. -public: - Event_Server (void); - // Constructor. - - int svc (void); - // Run the event-loop for the event server. - -private: - virtual int handle_input (ACE_HANDLE handle); - // Hook method called back when a user types something into the - // STDIN in order to shut down the program. - - int configure_stream (void); - // Setup the plumbing in the stream. - - int set_watermarks (void); - // Set the high and low queue watermarks. - - int run_event_loop (void); - // Run the event-loop for the <Event_Server>. - - MT_Stream event_server_; - // The <ACE_Stream> that contains the <Event_Server> application - // <Modules>. -}; - -Event_Server::Event_Server (void) - : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop)) - // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is - // received. -{ - // Register to trap STDIN from the user. - if (ACE_Event_Handler::register_stdin_handler (this, - ACE_Reactor::instance (), - ACE_Thread_Manager::instance ()) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("register_stdin_handler"))); - // Register to trap the SIGINT signal. - else if (ACE_Reactor::instance ()->register_handler - (SIGINT, this) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("register_handler"))); -} - -int -Event_Server::handle_input (ACE_HANDLE) -{ - // This code here will make sure we actually wait for the user to - // type something. On platforms like Win32, <handle_input> is called - // prematurely (even when there is no data). - char temp_buffer [BUFSIZ]; - - ssize_t n = ACE_OS::read (ACE_STDIN, - temp_buffer, - sizeof (temp_buffer)); - // This ought to be > 0, otherwise something very strange has - // happened!! - ACE_ASSERT (n > 0); - ACE_UNUSED_ARG (n); // To avoid compile warning with ACE_NDEBUG. - - Options::instance ()->stop_timer (); - - ACE_DEBUG ((LM_INFO, - ACE_TEXT ("(%t) closing down the test\n"))); - Options::instance ()->print_results (); - - ACE_Reactor::instance ()->end_reactor_event_loop (); - return -1; -} - -int -Event_Server::configure_stream (void) -{ - Peer_Router_Context *src; - // Create the <Supplier_Router>'s routing context. This contains a - // context shared by both the write-side and read-side of the - // <Supplier_Router> Module. - ACE_NEW_RETURN (src, - Peer_Router_Context (Options::instance ()->supplier_port ()), - -1); - - MT_Module *srm = 0; - // Create the <Supplier_Router> module. - ACE_NEW_RETURN (srm, - MT_Module - (ACE_TEXT ("Supplier_Router"), - new Supplier_Router (src), - new Supplier_Router (src)), - -1); - - MT_Module *eam = 0; - // Create the <Event_Analyzer> module. - ACE_NEW_RETURN (eam, - MT_Module - (ACE_TEXT ("Event_Analyzer"), - new Event_Analyzer, - new Event_Analyzer), - -1); - - Peer_Router_Context *crc; - // Create the <Consumer_Router>'s routing context. This contains a - // context shared by both the write-side and read-side of the - // <Consumer_Router> Module. - ACE_NEW_RETURN (crc, - Peer_Router_Context (Options::instance ()->consumer_port ()), - -1); - - MT_Module *crm = 0; - // Create the <Consumer_Router> module. - ACE_NEW_RETURN (crm, - MT_Module - (ACE_TEXT ("Consumer_Router"), - new Consumer_Router (crc), - new Consumer_Router (crc)), - -1); - - // Push the Modules onto the event_server stream. - - if (this->event_server_.push (srm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("push (Supplier_Router)")), - -1); - else if (this->event_server_.push (eam) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("push (Event_Analyzer)")), - -1); - else if (this->event_server_.push (crm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("push (Consumer_Router)")), - -1); - return 0; -} - -int -Event_Server::set_watermarks (void) -{ - // Set the high and low water marks appropriately. The water marks - // control how much data can be buffered before the queues are - // considered "full." - size_t wm = Options::instance ()->low_water_mark (); - - if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM, - &wm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), - ACE_TEXT ("push (setting low watermark)")), - -1); - - wm = Options::instance ()->high_water_mark (); - if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM, - &wm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), - ACE_TEXT ("push (setting high watermark)")), - -1); - return 0; -} - -int -Event_Server::run_event_loop (void) -{ - // Begin the timer. - Options::instance ()->start_timer (); - - // Perform the main event loop waiting for the user to type ^C or to - // enter a line on the ACE_STDIN. - - ACE_Reactor::instance ()->run_reactor_event_loop (); - - // Close down the stream and call the <close> hooks on all the - // <ACE_Task>s in the various Modules in the Stream. - this->event_server_.close (); - - // Wait for the threads in the <Consumer_Router> and - // <Supplier_Router> to exit. - return ACE_Thread_Manager::instance ()->wait (); -} - -int -Event_Server::svc (void) -{ - if (this->configure_stream () == -1) - return -1; - else if (this->set_watermarks () == -1) - return -1; - else if (this->run_event_loop () == -1) - return -1; - else - return 0; -} - -int -ACE_TMAIN (int argc, ACE_TCHAR *argv[]) -{ -#if defined (ACE_HAS_THREADS) - Options::instance ()->parse_args (argc, argv); - - // Initialize the <Event_Server>. - Event_Server event_server; - - // Run the event server's event-loop. - int result = event_server.svc (); - - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("exiting main\n"))); - - return result; -#else - ACE_UNUSED_ARG (argc); - ACE_UNUSED_ARG (argv); - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("threads not supported on this platform\n")), - 1); -#endif /* ACE_HAS_THREADS */ -} |