summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/Event_Server/Event_Server
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2008-03-04 13:56:48 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2008-03-04 13:56:48 +0000
commitc4078c377d74290ebe4e66da0b4975da91732376 (patch)
tree1816ef391e42a07929304908ac0e21f4c2f6cb7b /ACE/examples/ASX/Event_Server/Event_Server
parent700d1c1a6be348c6c70a2085e559baeb8f4a62ea (diff)
downloadATCD-c4078c377d74290ebe4e66da0b4975da91732376.tar.gz
swap in externals for ACE and TAO
Diffstat (limited to 'ACE/examples/ASX/Event_Server/Event_Server')
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp159
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h71
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event.mpc15
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp80
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h44
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Makefile.am50
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.cpp208
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.h122
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.inl141
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp435
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h158
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp165
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h72
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp258
14 files changed, 0 insertions, 1978 deletions
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
deleted file mode 100644
index d5215ffcd26..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-// $Id$
-
-#include "ace/os_include/os_assert.h"
-#include "ace/OS_NS_stdio.h"
-#include "ace/OS_NS_string.h"
-#include "Consumer_Router.h"
-#include "Options.h"
-
-ACE_RCSID(Event_Server, Consumer_Router, "$Id$")
-
-Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
- : Peer_Router (prc)
-{
- this->context ()->duplicate ();
-}
-
-// Initialize the Router.
-
-int
-Consumer_Router::open (void *)
-{
- if (this->is_writer ())
- {
- // Set the <Peer_Router_Context> to point back to us so that if
- // any Consumer's "accidentally" send us data we'll be able to
- // handle it by passing it down the stream.
- this->context ()->peer_router (this);
-
- // Increment the reference count.
- this->context ()->duplicate ();
-
- // Make this an active object to handle the error cases in a
- // separate thread. This is mostly just for illustration, i.e.,
- // it's probably overkill to use a thread for this!
- return this->activate (Options::instance ()->t_flags ());
- }
- else // if (this->is_reader ())
-
- // Nothing to do since this side is primarily used to transmit to
- // Consumers, rather than receive.
- return 0;
-}
-
-int
-Consumer_Router::close (u_long)
-{
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) closing Consumer_Router %s\n"),
- this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")));
-
- if (this->is_writer ())
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
-
- // Both writer and reader call <release>, so the context knows when
- // to clean itself up.
- this->context ()->release ();
- return 0;
-}
-
-// Handle incoming messages in a separate thread.
-
-int
-Consumer_Router::svc (void)
-{
- assert (this->is_writer ());
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) starting svc in Consumer_Router\n")));
-
- for (ACE_Message_Block *mb = 0;
- this->getq (mb) >= 0;
- )
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) warning: Consumer_Router is ")
- ACE_TEXT ("forwarding a message to Supplier_Router\n")));
-
- // Pass this message down to the next Module's writer Task.
- if (this->put_next (mb) == -1)
- ACE_ERROR_RETURN
- ((LM_ERROR,
- ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
- -1);
- }
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) stopping svc in Consumer_Router\n")));
- return 0;
- // Note the implicit ACE_OS::thr_exit() via destructor.
-}
-
-// Send a <Message_Block> to the supplier(s).
-
-int
-Consumer_Router::put (ACE_Message_Block *mb,
- ACE_Time_Value *)
-{
- // Perform the necessary control operations before passing the
- // message down the stream.
-
- if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
- {
- this->control (mb);
- return this->put_next (mb);
- }
-
- // If we're the reader then we're responsible for broadcasting
- // messages to Consumers.
-
- else if (this->is_reader ())
- {
- if (this->context ()->send_peers (mb) == -1)
- ACE_ERROR_RETURN
- ((LM_ERROR,
- ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
- -1);
- else
- return 0;
- }
- else // if (this->is_writer ())
-
- // Queue up the message to processed by <Consumer_Router::svc>
- // Since we don't expect to be getting many of these messages, we
- // queue them up and run them in a separate thread to avoid taxing
- // the main thread.
- return this->putq (mb);
-}
-
-// Return information about the <Consumer_Router>.
-#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
-# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n")
-#else
-# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n")
-#endif /* ACE_WIN32 || !ACE_USES_WCHAR */
-
-int
-Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
-{
- ACE_TCHAR buf[BUFSIZ];
- ACE_INET_Addr addr;
- const ACE_TCHAR *mod_name = this->name ();
-
- if (this->context ()->acceptor ().get_local_addr (addr) == -1)
- return -1;
-
- ACE_OS::sprintf (buf,
- FMTSTR,
- mod_name,
- addr.get_port_number (),
- ACE_TEXT ("tcp"),
- ACE_TEXT ("# consumer router"),
- this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"));
- if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, mod_name, length);
-
- return ACE_OS::strlen (mod_name);
-}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
deleted file mode 100644
index 062a07116ea..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#ifndef _CONSUMER_ROUTER_H
-#define _CONSUMER_ROUTER_H
-
-#include "ace/SOCK_Acceptor.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/UPIPE_Acceptor.h"
-#include "ace/Svc_Handler.h"
-#include "ace/RW_Thread_Mutex.h"
-#include "Peer_Router.h"
-
-class Consumer_Router : public Peer_Router
-{
- // = TITLE
- // Provides the interface between one or more Consumers and the
- // Event Server <ACE_Stream>.
- //
- // = DESCRIPTION
- // This class normally sits on "top" of the Stream and routes
- // messages coming from "downstream" to all the Consumers
- // connected to it via its "read" <Task>. Normally, the messages
- // flow up the stream from <Supplier_Router>s. However, if
- // Consumers transmit data to the <Consumer_Router>, we dutifully
- // push it out to the Suppliers via the <Supplier_Router>.
- //
- // When used on the "reader" side of a Stream, the
- // <Consumer_Router> simply forwards all messages up the stream.
- // When used on the "writer" side, the <Consumer_Router> queues
- // up outgoing messages to suppliers and sends them down to the
- // <Supplier_Router> in a separate thread. The reason for this
- // is that it's really an "error" for a <Consumer_Router> to
- // send messages to Suppliers, so we don't expect this to happen
- // very much. When it does we use a separate thread to avoid
- // taxing the main thread, which processes "normal" messages.
- //
- // All of the methods in this class except the constructor are
- // called via base class pointers by the <ACE_Stream>.
- // Therefore, we can put them in the protected section.
-public:
- Consumer_Router (Peer_Router_Context *prc);
- // Initialization method.
-
-protected:
- // = ACE_Task hooks.
- virtual int open (void *a = 0);
- // Called by the Stream to initialize the router.
-
- virtual int close (u_long flags = 0);
- // Called by the Stream to shutdown the router.
-
- virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- // Called by the <Peer_Handler> to pass a message to the
- // <Consumer_Router>. The <Consumer_Router> queues up this message,
- // which is then processed in the <svc> method in a separate thread.
-
- virtual int svc (void);
- // Runs in a separate thread to dequeue messages and pass them up
- // the stream.
-
- // = Dynamic linking hooks.
- virtual int info (ACE_TCHAR **info_string, size_t length) const;
- // Returns information about this service.
-};
-
-#endif /* _CONSUMER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc
deleted file mode 100644
index f99e912ce04..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc
+++ /dev/null
@@ -1,15 +0,0 @@
-// -*- MPC -*-
-// $Id$
-
-project(*Server) : aceexe {
- avoids += ace_for_tao
- exename = Event_Server
- Source_Files {
- Consumer_Router.cpp
- Event_Analyzer.cpp
- Options.cpp
- Peer_Router.cpp
- Supplier_Router.cpp
- event_server.cpp
- }
-}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
deleted file mode 100644
index a064da6459a..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-// $Id$
-
-#include "ace/OS_NS_string.h"
-#include "Options.h"
-#include "Event_Analyzer.h"
-
-ACE_RCSID(Event_Server, Event_Analyzer, "$Id$")
-
-int
-Event_Analyzer::open (void *)
-{
- // No-op for now...
- return 0;
-}
-
-int
-Event_Analyzer::close (u_long)
-{
- // No-op for now...
- return 0;
-}
-
-int
-Event_Analyzer::control (ACE_Message_Block *mb)
-{
- ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
- ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd;
-
- switch (cmd = ioc->cmd ())
- {
- case ACE_IO_Cntl_Msg::SET_LWM:
- case ACE_IO_Cntl_Msg::SET_HWM:
- this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ());
- break;
- default:
- break;
- }
- return 0;
-}
-
-int
-Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) passing through Event_Analyser::put() (%s)\n"),
- this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")));
-
- if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
- this->control (mb);
-
- // Just pass the message along to the next Module in the stream...
- return this->put_next (mb);
-}
-
-int
-Event_Analyzer::init (int, ACE_TCHAR *[])
-{
- // No-op for now.
- return 0;
-}
-
-int
-Event_Analyzer::fini (void)
-{
- // No-op for now.
- return 0;
-}
-
-int
-Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const
-{
- const ACE_TCHAR *mod_name = this->name ();
-
- if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, mod_name, length);
- return ACE_OS::strlen (mod_name);
-}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
deleted file mode 100644
index d4f88c8b68d..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#ifndef _EVENT_ANALYZER_H
-#define _EVENT_ANALYZER_H
-
-#include "ace/Stream.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/Module.h"
-#include "ace/Task.h"
-
-class Event_Analyzer : public ACE_Task<ACE_SYNCH>
-{
- // = TITLE
- // This class forwards all the <ACE_Message_Block>s it receives
- // onto its neighboring Module in the Stream.
- //
- // = DESCRIPTION
- // In a "real" event service, application-specific processing
- // would be done in the <put> (or <svc>) method in this class.
-public:
- // = Initialization hooks called by <ACE_Stream> (not used).
- virtual int open (void *a = 0);
- virtual int close (u_long flags = 0);
-
- virtual int put (ACE_Message_Block *msg,
- ACE_Time_Value * = 0);
- // Entry point into this task.
-
- // Dynamic linking hooks (not used).
- virtual int init (int argc, ACE_TCHAR *argv[]);
- virtual int fini (void);
- virtual int info (ACE_TCHAR **info_string,
- size_t length) const;
-private:
- virtual int control (ACE_Message_Block *);
- // Implements the watermark control processing.
-};
-
-#endif /* _EVENT_ANALYZER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am
deleted file mode 100644
index aefe1873d9d..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am
+++ /dev/null
@@ -1,50 +0,0 @@
-## Process this file with automake to create Makefile.in
-##
-## $Id$
-##
-## This file was generated by MPC. Any changes made directly to
-## this file will be lost the next time it is generated.
-##
-## MPC Command:
-## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
-
-ACE_BUILDDIR = $(top_builddir)
-ACE_ROOT = $(top_srcdir)
-
-
-## Makefile.Event_Server.am
-
-if !BUILD_ACE_FOR_TAO
-
-noinst_PROGRAMS = Event_Server
-
-Event_Server_CPPFLAGS = \
- -I$(ACE_ROOT) \
- -I$(ACE_BUILDDIR)
-
-Event_Server_SOURCES = \
- Consumer_Router.cpp \
- Event_Analyzer.cpp \
- Options.cpp \
- Peer_Router.cpp \
- Supplier_Router.cpp \
- event_server.cpp \
- Consumer_Router.h \
- Event_Analyzer.h \
- Options.h \
- Options.inl \
- Peer_Router.h \
- Supplier_Router.h
-
-Event_Server_LDADD = \
- $(ACE_BUILDDIR)/ace/libACE.la
-
-endif !BUILD_ACE_FOR_TAO
-
-## Clean up template repositories, etc.
-clean-local:
- -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
- -rm -f gcctemp.c gcctemp so_locations *.ics
- -rm -rf cxx_repository ptrepository ti_files
- -rm -rf templateregistry ir.out
- -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp
deleted file mode 100644
index 8683f48d153..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-// $Id$
-
-#include "ace/Get_Opt.h"
-#include "ace/Thread.h"
-#include "ace/Log_Msg.h"
-#include "ace/OS_NS_stdio.h"
-#if defined (ACE_HAS_TRACE)
-# include "ace/OS_NS_strings.h"
-#endif /* ACE_HAS_TRACE */
-
-#include "Options.h"
-
-ACE_RCSID(Event_Server, Options, "$Id$")
-
-/* static */
-Options *Options::instance_ = 0;
-
-Options *
-Options::instance (void)
-{
- if (Options::instance_ == 0)
- Options::instance_ = new Options;
-
- return Options::instance_;
-}
-
-Options::Options (void)
- : thr_count_ (4),
- t_flags_ (0),
- high_water_mark_ (8 * 1024),
- low_water_mark_ (1024),
- message_size_ (128),
- initial_queue_length_ (0),
- iterations_ (100000),
- debugging_ (0),
- verbosity_ (0),
- consumer_port_ (ACE_DEFAULT_SERVER_PORT),
- supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1)
-{
-}
-
-Options::~Options (void)
-{
-}
-
-void Options::print_results (void)
-{
-#if !defined (ACE_WIN32)
- ACE_Profile_Timer::ACE_Elapsed_Time et;
-
- this->itimer_.elapsed_time (et);
-
- if (this->verbose ())
- {
-#if defined (ACE_HAS_PRUSAGE_T)
- ACE_Profile_Timer::Rusage rusage;
- this->itimer_.get_rusage (rusage);
-
- ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ());
- ACE_OS::printf ("%8d = lwpid\n"
- "%8d = lwp count\n"
- "%8d = minor page faults\n"
- "%8d = major page faults\n"
- "%8d = input blocks\n"
- "%8d = output blocks\n"
- "%8d = messages sent\n"
- "%8d = messages received\n"
- "%8d = signals received\n"
- "%8ds, %dms = wait-cpu (latency) time\n"
- "%8ds, %dms = user lock wait sleep time\n"
- "%8ds, %dms = all other sleep time\n"
- "%8d = voluntary context switches\n"
- "%8d = involuntary context switches\n"
- "%8d = system calls\n"
- "%8d = chars read/written\n",
- (int) rusage.pr_lwpid,
- (int) rusage.pr_count,
- (int) rusage.pr_minf,
- (int) rusage.pr_majf,
- (int) rusage.pr_inblk,
- (int) rusage.pr_oublk,
- (int) rusage.pr_msnd,
- (int) rusage.pr_mrcv,
- (int) rusage.pr_sigs,
- (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000,
- (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000,
- (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000,
- (int) rusage.pr_vctx,
- (int) rusage.pr_ictx,
- (int) rusage.pr_sysc,
- (int) rusage.pr_ioch);
-#else
- /* Someone needs to write the corresponding dump for rusage... */
-#endif /* ACE_HAS_PRUSAGE_T */
- }
-
- ACE_OS::printf ("---------------------\n"
- "real time = %.3f\n"
- "user time = %.3f\n"
- "system time = %.3f\n"
- "---------------------\n",
- et.real_time, et.user_time, et.system_time);
-#endif /* ACE_WIN32 */
-}
-
-void
-Options::parse_args (int argc, ACE_TCHAR *argv[])
-{
- //FUZZ: disable check_for_lack_ACE_OS
- ACE_LOG_MSG->open (argv[0]);
-
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("c:bdH:i:L:l:M:ns:t:T:v"));
- int c;
-
- while ((c = get_opt ()) != EOF)
- //FUZZ: enable check_for_lack_ACE_OS
- switch (c)
- {
- case 'b':
- this->t_flags (THR_BOUND);
- break;
- case 'c':
- this->consumer_port (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'd':
- this->debugging_ = 1;
- break;
- case 'H':
- this->high_water_mark (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'i':
- this->iterations (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'L':
- this->low_water_mark (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'l':
- this->initial_queue_length (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'M':
- this->message_size (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'n':
- this->t_flags (THR_NEW_LWP);
- break;
- case 's':
- this->supplier_port (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'T':
- #if defined (ACE_HAS_TRACE)
- if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("ON")) == 0)
- ACE_Trace::start_tracing ();
- else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("OFF")) == 0)
- ACE_Trace::stop_tracing ();
- #endif /* ACE_HAS_TRACE */
- break;
- case 't':
- this->thr_count (ACE_OS::atoi (get_opt.opt_arg ()));
- break;
- case 'v':
- this->verbosity_ = 1;
- break;
- default:
- ACE_OS::fprintf (stderr, "%s\n"
- "\t[-b] (THR_BOUND)\n"
- "\t[-c consumer port]\n"
- "\t[-d] (enable debugging)\n"
- "\t[-H high water mark]\n"
- "\t[-i number of test iterations]\n"
- "\t[-L low water mark]\n"
- "\t[-M] message size \n"
- "\t[-n] (THR_NEW_LWP)\n"
- "\t[-q max queue size]\n"
- "\t[-s supplier port]\n"
- "\t[-t number of threads]\n"
- "\t[-v] (verbose) \n",
- ACE_TEXT_ALWAYS_CHAR (argv[0]));
- ACE_OS::exit (1);
- /* NOTREACHED */
- break;
- }
-
- // This is a major hack to get the size_t format spec to be a narrow
- // char, same as the other strings for printf() here. It only works
- // because this is the end of the source file. It makes the
- // ACE_SIZE_T_FORMAT_SPECIFIER not use ACE_TEXT, effectively.
-#undef ACE_TEXT
-#define ACE_TEXT(A) A
- if (this->verbose ())
- ACE_OS::printf ("%8d = initial concurrency hint\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n"
- ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n"
- "%8d = THR_BOUND\n"
- "%8d = THR_NEW_LWP\n",
- ACE_Thread::getconcurrency (),
- this->iterations (),
- this->thr_count (),
- this->low_water_mark (),
- this->high_water_mark (),
- this->message_size (),
- this->initial_queue_length (),
- (this->t_flags () & THR_BOUND) != 0,
- (this->t_flags () & THR_NEW_LWP) != 0);
-}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.h b/ACE/examples/ASX/Event_Server/Event_Server/Options.h
deleted file mode 100644
index 4c935434d26..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Options.h
+++ /dev/null
@@ -1,122 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#ifndef OPTIONS_H
-#define OPTIONS_H
-
-#include "ace/config-all.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/Profile_Timer.h"
-
-class Options
-{
- // = TITLE
- // Option Singleton for Event Server.
-public:
- static Options *instance (void);
- // Singleton access point.
-
- void parse_args (int argc, ACE_TCHAR *argv[]);
- // Parse the command-line arguments and set the options.
-
- // = Timer management.
- void stop_timer (void);
- void start_timer (void);
-
- // = Set/get the number of threads.
- void thr_count (size_t count);
- size_t thr_count (void);
-
- // = Set/get the size of the queue.
- void initial_queue_length (size_t length);
- size_t initial_queue_length (void);
-
- // = Set/get the high water mark.
- void high_water_mark (size_t size);
- size_t high_water_mark (void);
-
- // = Set/get the high water mark.
- void low_water_mark (size_t size);
- size_t low_water_mark (void);
-
- // = Set/get the size of a message.
- void message_size (size_t size);
- size_t message_size (void);
-
- // = Set/get the number of iterations.
- void iterations (size_t n);
- size_t iterations (void);
-
- // Set/get threading flags.
- void t_flags (long flag);
- long t_flags (void);
-
- // Set/get supplier port number.
- void supplier_port (u_short port);
- u_short supplier_port (void);
-
- // Set/get consumer port number.
- void consumer_port (u_short port);
- u_short consumer_port (void);
-
- // Enabled if we're in debugging mode.
- int debug (void);
-
- // Enabled if we're in verbose mode.
- int verbose (void);
-
- // Print the results to the STDERR.
- void print_results (void);
-
-private:
- // = Ensure we're a Singleton.
- Options (void);
- ~Options (void);
-
- ACE_Profile_Timer itimer_;
- // Time the process.
-
- size_t thr_count_;
- // Number of threads to spawn.
-
- long t_flags_;
- // Flags to <thr_create>.
-
- size_t high_water_mark_;
- // ACE_Task high water mark.
-
- size_t low_water_mark_;
- // ACE_Task low water mark.
-
- size_t message_size_;
- // Size of a message.
-
- size_t initial_queue_length_;
- // Initial number of items in the queue.
-
- size_t iterations_;
- // Number of iterations to run the test program.
-
- int debugging_;
- // Extra debugging info.
-
- int verbosity_;
- // Extra verbose messages.
-
- u_short consumer_port_;
- // Port that the Consumer_Router is using.
-
- u_short supplier_port_;
- // Port that the Supplier_Router is using.
-
- static Options *instance_;
- // Static Singleton.
-
-};
-
-#include "Options.inl"
-#endif /* OPTIONS_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl
deleted file mode 100644
index 87ff395c503..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl
+++ /dev/null
@@ -1,141 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-/* Option manager for ustreams */
-
-// Since this is only included in Options.h these should stay
-// inline, not ACE_INLINE.
-// FUZZ: disable check_for_inline
-
-inline void
-Options::supplier_port (u_short port)
-{
- this->supplier_port_ = port;
-}
-
-inline u_short
-Options::supplier_port (void)
-{
- return this->supplier_port_;
-}
-
-inline void
-Options::consumer_port (u_short port)
-{
- this->consumer_port_ = port;
-}
-
-inline u_short
-Options::consumer_port (void)
-{
- return this->consumer_port_;
-}
-
-inline void
-Options::start_timer (void)
-{
- this->itimer_.start ();
-}
-
-inline void
-Options::stop_timer (void)
-{
- this->itimer_.stop ();
-}
-
-inline void
-Options::thr_count (size_t count)
-{
- this->thr_count_ = count;
-}
-
-inline size_t
-Options::thr_count (void)
-{
- return this->thr_count_;
-}
-
-inline void
-Options::initial_queue_length (size_t length)
-{
- this->initial_queue_length_ = length;
-}
-
-inline size_t
-Options::initial_queue_length (void)
-{
- return this->initial_queue_length_;
-}
-
-inline void
-Options::high_water_mark (size_t size)
-{
- this->high_water_mark_ = size;
-}
-
-inline size_t
-Options::high_water_mark (void)
-{
- return this->high_water_mark_;
-}
-
-inline void
-Options::low_water_mark (size_t size)
-{
- this->low_water_mark_ = size;
-}
-
-inline size_t
-Options::low_water_mark (void)
-{
- return this->low_water_mark_;
-}
-
-inline void
-Options::message_size (size_t size)
-{
- this->message_size_ = size;
-}
-
-inline size_t
-Options::message_size (void)
-{
- return this->message_size_;
-}
-
-inline void
-Options::iterations (size_t n)
-{
- this->iterations_ = n;
-}
-
-inline size_t
-Options::iterations (void)
-{
- return this->iterations_;
-}
-
-inline void
-Options::t_flags (long flag)
-{
- this->t_flags_ |= flag;
-}
-
-inline long
-Options::t_flags (void)
-{
- return this->t_flags_;
-}
-
-inline int
-Options::debug (void)
-{
- return this->debugging_;
-}
-
-inline int
-Options::verbose (void)
-{
- return this->verbosity_;
-}
-
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
deleted file mode 100644
index cb82eec16df..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ /dev/null
@@ -1,435 +0,0 @@
-// $Id$
-
-#if !defined (_PEER_ROUTER_C)
-#define _PEER_ROUTER_C
-
-#include "ace/Service_Config.h"
-#include "ace/Get_Opt.h"
-#include "Options.h"
-#include "Peer_Router.h"
-
-ACE_RCSID(Event_Server, Peer_Router, "$Id$")
-
-// Send the <ACE_Message_Block> to all the peers. Note that in a
-// "real" application this logic would most likely be more selective,
-// i.e., it would actually do "routing" based on addressing
-// information passed in the <ACE_Message_Block>.
-
-int
-Peer_Router_Context::send_peers (ACE_Message_Block *mb)
-{
- PEER_ITERATOR map_iter = this->peer_map_;
- int bytes = 0;
- int iterations = 0;
-
- // Skip past the header and get the message to send.
- ACE_Message_Block *data_block = mb->cont ();
-
- // Use an iterator to "multicast" the data to *all* the registered
- // peers. Note that this doesn't really multicast, it just makes a
- // "logical" copy of the <ACE_Message_Block> and enqueues it in the
- // appropriate <Peer_Handler> corresponding to each peer. Note that
- // a "real" application would probably "route" the data to a subset
- // of connected peers here, rather than send it to all the peers.
-
- for (PEER_ENTRY *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) sending to peer via handle %d\n"),
- ss->ext_id_));
-
- iterations++;
-
- // Increment reference count before sending since the
- // <Peer_Handler> might be running in its own thread of control.
- bytes += ss->int_id_->put (data_block->duplicate ());
- }
-
- mb->release ();
- return bytes == 0 ? 0 : bytes / iterations;
-}
-
-// Remove the <Peer_Handler> from the peer connection map.
-
-int
-Peer_Router_Context::unbind_peer (ROUTING_KEY key)
-{
- return this->peer_map_.unbind (key);
-}
-
-// Add the <Peer_Handler> to the peer connection map.
-
-int
-Peer_Router_Context::bind_peer (ROUTING_KEY key,
- Peer_Handler *peer_handler)
-{
- return this->peer_map_.bind (key, peer_handler);
-}
-
-void
-Peer_Router_Context::duplicate (void)
-{
- this->reference_count_++;
-}
-
-void
-Peer_Router_Context::release (void)
-{
- ACE_ASSERT (this->reference_count_ > 0);
- this->reference_count_--;
-
- if (this->reference_count_ == 0)
- delete this;
-}
-
-Peer_Router_Context::Peer_Router_Context (u_short port)
- : reference_count_ (0)
-{
- // Initialize the Acceptor's "listen-mode" socket.
- ACE_INET_Addr endpoint (port);
- if (this->open (endpoint) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Acceptor::open")));
-
- // Initialize the connection map.
- else if (this->peer_map_.open () == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Map_Manager::open")));
- else
- {
- ACE_INET_Addr addr;
-
- if (this->acceptor ().get_local_addr (addr) != -1)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) initializing %C on port = %d, handle = %d, this = %u\n"),
- addr.get_port_number () == Options::instance ()->supplier_port ()
- ? "Supplier_Handler" : "Consumer_Handler",
- addr.get_port_number (),
- this->acceptor().get_handle (),
- this));
- else
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("get_local_addr")));
- }
-}
-
-Peer_Router_Context::~Peer_Router_Context (void)
-{
- // Free up the handle and close down the listening socket.
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) closing down Peer_Router_Context\n")));
-
- // Close down the Acceptor and take ourselves out of the Reactor.
- this->handle_close ();
-
- PEER_ITERATOR map_iter = this->peer_map_;
-
- // Make sure to take all the handles out of the map to avoid
- // "resource leaks."
-
- for (PEER_ENTRY *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) closing down peer on handle %d\n"),
- ss->ext_id_));
-
- if (ACE_Reactor::instance ()->remove_handler
- (ss->ext_id_,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) p\n"),
- ACE_TEXT ("remove_handle")));
- }
-
- // Close down the map.
- this->peer_map_.close ();
-}
-
-Peer_Router *
-Peer_Router_Context::peer_router (void)
-{
- return this->peer_router_;
-}
-
-void
-Peer_Router_Context::peer_router (Peer_Router *pr)
-{
- this->peer_router_ = pr;
-}
-
-// Factory Method that creates a new <Peer_Handler> for each
-// connection.
-
-int
-Peer_Router_Context::make_svc_handler (Peer_Handler *&sh)
-{
- ACE_NEW_RETURN (sh,
- Peer_Handler (this),
- -1);
- return 0;
-}
-
-Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
- : peer_router_context_ (prc)
-{
-}
-
-// Send output to a peer. Note that this implementation "blocks" if
-// flow control occurs. This is undesirable for "real" applications.
-// The best way around this is to make the <Peer_Handler> an Active
-// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway
-// application.
-
-int
-Peer_Handler::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
-{
-#if 0
- // If we're running as Active Objects just enqueue the message here.
- return this->putq (mb, tv);
-#else
- ACE_UNUSED_ARG (tv);
-
- int result = this->peer ().send_n (mb->rd_ptr (),
- mb->length ());
- // Release the memory.
- mb->release ();
-
- return result;
-#endif /* 0 */
-}
-
-// Initialize a newly connected handler.
-
-int
-Peer_Handler::open (void *)
-{
- ACE_TCHAR buf[BUFSIZ], *p = buf;
-
- if (this->peer_router_context_->peer_router ()->info (&p,
- sizeof buf) != -1)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) creating handler for %s, handle = %d\n"),
- buf,
- this->get_handle ()));
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("info")),
- -1);
-#if 0
- // If we're running as an Active Object activate the Peer_Handler
- // here.
- if (this->activate (Options::instance ()->t_flags ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("activation of thread failed")),
- -1);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Peer_Handler::open registering with Reactor for handle_input\n")));
-#else
-
- // Register with the Reactor to receive messages from our Peer.
- if (ACE_Reactor::instance ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("register_handler")),
- -1);
-#endif /* 0 */
-
- // Insert outselves into the routing map.
- else if (this->peer_router_context_->bind_peer (this->get_handle (),
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("bind_peer")),
- -1);
- else
- return 0;
-}
-
-// Receive a message from a Peer.
-
-int
-Peer_Handler::handle_input (ACE_HANDLE h)
-{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) input arrived on handle %d\n"),
- h));
-
- ACE_Message_Block *db;
-
- ACE_NEW_RETURN (db, ACE_Message_Block (BUFSIZ), -1);
-
- ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
- ACE_Message_Block::MB_PROTO, db);
- // Check for memory failures.
- if (hb == 0)
- {
- db->release ();
- errno = ENOMEM;
- return -1;
- }
-
- ssize_t n = this->peer ().recv (db->rd_ptr (),
- db->size ());
-
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p"),
- ACE_TEXT ("recv failed")),
- -1);
- else if (n == 0) // Client has closed down the connection.
- {
- if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p"),
- ACE_TEXT ("unbind failed")),
- -1);
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) shutting down handle %d\n"), h));
- // Instruct the <ACE_Reactor> to deregister us by returning -1.
- return -1;
- }
- else
- {
- // Transform incoming buffer into an <ACE_Message_Block>.
-
- // First, increment the write pointer to the end of the newly
- // read data block.
- db->wr_ptr (n);
-
- // Second, copy the "address" into the header block. Note that
- // for this implementation the HANDLE we receive the message on
- // is considered the "address." A "real" application would want
- // to do something more sophisticated.
- *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
-
- // Third, update the write pointer in the header block.
- hb->wr_ptr (sizeof (ACE_HANDLE));
-
- // Finally, pass the message through the stream. Note that we
- // use <Task::put> here because this gives the method at *our*
- // level in the stream a chance to do something with the message
- // before it is sent up the other side. For instance, if we
- // receive messages in the <Supplier_Router>, it will just call
- // <put_next> and send them up the stream to the
- // <Consumer_Router> (which broadcasts them to consumers).
- // However, if we receive messages in the <Consumer_Router>, it
- // could reply to the Consumer with an error since it's not
- // correct for Consumers to send messages (we don't do this in
- // the current implementation, but it could be done in a "real"
- // application).
-
- if (this->peer_router_context_->peer_router ()->put (hb) == -1)
- return -1;
- else
- return 0;
- }
-}
-
-Peer_Router::Peer_Router (Peer_Router_Context *prc)
- : peer_router_context_ (prc)
-{
-}
-
-Peer_Router_Context *
-Peer_Router::context (void) const
-{
- return this->peer_router_context_;
-}
-
-int
-Peer_Router::control (ACE_Message_Block *mb)
-{
- ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
- ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
-
- switch (command = ioc->cmd ())
- {
- case ACE_IO_Cntl_Msg::SET_LWM:
- case ACE_IO_Cntl_Msg::SET_HWM:
- this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
- break;
- default:
- return -1;
- }
- return 0;
-}
-
-#if 0
-
-// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
-// a single thread of control. It would be easy to make them Active
-// Objects by calling activate() in Peer_Handler::open(), making
-// Peer_Handler::put() enqueue each message on the message queue, and
-// (3) then running the following svc() routine to route each message
-// to its final destination within a separate thread. Note that we'd
-// want to move the svc() call up to the Consumer_Router and
-// Supplier_Router level in order to get the right level of control
-// for input and output.
-
-Peer_Handler::svc (void)
-{
- ACE_Message_Block *db, *hb;
-
- // Do an endless loop
- for (;;)
- {
- db = new Message_Block (BUFSIZ);
- hb = new Message_Block (sizeof (ROUTING_KEY),
- Message_Block::MB_PROTO,
- db);
-
- ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ());
-
- if (n == -1)
- LM_ERROR_RETURN ((LOG_ERROR,
- ACE_TEXT ("%p"),
- ACE_TEXT ("recv failed")),
- -1);
- else if (n == 0) // Client has closed down the connection.
- {
- if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
- LM_ERROR_RETURN ((LOG_ERROR,
- ACE_TEXT ("%p"),
- ACE_TEXT ("unbind failed")),
- -1);
- LM_DEBUG ((LOG_DEBUG,
- ACE_TEXT ("(%t) shutting down \n")));
-
- // We do not need to be deregistered by reactor
- // as we were not registered at all.
- return -1;
- }
- else
- {
- // Transform incoming buffer into a Message.
- db->wr_ptr (n);
- *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
- hb->wr_ptr (sizeof (long));
-
- // Pass the message to the stream.
- if (this->peer_router_context_->peer_router ()->reply (hb) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Peer_Handler.svc : peer_router->reply failed")),
- -1);
- }
- }
- return 0;
-}
-#endif /* 0 */
-#endif /* _PEER_ROUTER_C */
-
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h
deleted file mode 100644
index 044ef07ea07..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h
+++ /dev/null
@@ -1,158 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#ifndef _PEER_ROUTER_H
-#define _PEER_ROUTER_H
-
-#include "ace/Acceptor.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/SOCK_Acceptor.h"
-#include "ace/Svc_Handler.h"
-#include "ace/Map_Manager.h"
-#include "ace/RW_Thread_Mutex.h"
-
-// Type of search key for CONSUMER_MAP
-typedef ACE_HANDLE ROUTING_KEY;
-
-// Forward declarations.
-class Peer_Router;
-class Peer_Router_Context;
-
-class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
-{
- // = TITLE
- // Receive input from a Peer and forward to the appropriate
- // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>).
-public:
- Peer_Handler (Peer_Router_Context * = 0);
- // Initialization method.
-
- virtual int open (void * = 0);
- // Called by the ACE_Acceptor::handle_input() to activate this
- // object.
-
- virtual int handle_input (ACE_HANDLE);
- // Receive input from a peer.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Send output to a peer. Note that this implementation "blocks" if
- // flow control occurs. This is undesirable for "real"
- // applications. The best way around this is to make the
- // <Peer_Handler> an Active Object, e.g., as done in the
- // $ACE_ROOT/apps/Gateway/Gateway application.
-
-protected:
- Peer_Router_Context *peer_router_context_;
- // Pointer to router context. This maintains the state that is
- // shared by both Tasks in a <Peer_Router> Module.
-};
-
-class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
-{
- // = TITLE
- // Defines state and behavior shared between both Tasks in a
- // <Peer_Router> Module.
- //
- // = DESCRIPTION
- // This class also serves as an <ACE_Acceptor>, which creates
- // <Peer_Handlers> when Peers connect.
-public:
- // = Initialization and termination methods.
- Peer_Router_Context (u_short port);
- // Constructor.
-
- virtual int unbind_peer (ROUTING_KEY);
- // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds
- // to the <ROUTING_KEY>.
-
- virtual int bind_peer (ROUTING_KEY, Peer_Handler *);
- // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the
- // <ROUTING_KEY>.
-
- int send_peers (ACE_Message_Block *mb);
- // Send the <ACE_Message_Block> to all the peers. Note that in a
- // "real" application this logic would most likely be more
- // selective, i.e., it would actually do "routing" based on
- // addressing information passed in the <ACE_Message_Block>.
-
- int make_svc_handler (Peer_Handler *&sh);
- // Factory Method that creates a new <Peer_Handler> for each
- // connection. This method overrides the default behavior in
- // <ACE_Acceptor>.
-
- // = Set/Get Router Task.
- Peer_Router *peer_router (void);
- void peer_router (Peer_Router *);
-
- void release (void);
- // Decrement the reference count and delete <this> when count == 0;
-
- void duplicate (void);
- // Increment the reference count.
-
-private:
- Peer_Router *peer_router_;
- // Pointer to the <Peer_Router> that we are accepting for.
-
- // = Useful typedefs.
- typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
- PEER_MAP;
- typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
- PEER_ITERATOR;
- typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>
- PEER_ENTRY;
-
- PEER_MAP peer_map_;
- // Map used to keep track of active peers.
-
- int reference_count_;
- // Keep track of when we can delete ourselves.
-
- ~Peer_Router_Context (void);
- // Private to ensure dynamic allocation.
-
- friend class Friend_Of_Peer_Router_Context;
- // Declare a friend class to avoid compiler warnings because the
- // destructor is private.
-};
-
-class Peer_Router : public ACE_Task<ACE_SYNCH>
-{
- // = TITLE
- // This abstract base class provides mechanisms for routing
- // messages to/from a <ACE_Stream> from/to one or more peers (which
- // are typically running on remote hosts).
- //
- // = DESCRIPTION
- // Subclasses of <Peer_Router> (such as <Consumer_Router> or
- // <Supplier_Router>) override the <open>, <close>, and
- // <put> methods to specialize the behavior of the router to
- // meet application-specific requirements.
-protected:
- Peer_Router (Peer_Router_Context *prc);
- // Initialization method.
-
- virtual int control (ACE_Message_Block *);
- // Handle control messages arriving from adjacent Modules.
-
- Peer_Router_Context *context (void) const;
- // Returns the routing context.
-
- typedef ACE_Task<ACE_SYNCH> inherited;
- // Helpful typedef.
-
-private:
- Peer_Router_Context *peer_router_context_;
- // Reference to the context shared by the writer and reader Tasks,
- // e.g., in the <Consumer_Router> and <Supplier_Router> Modules.
-
- // = Prevent copies and pass-by-value.
- Peer_Router (const Peer_Router &);
- void operator= (const Peer_Router &);
-};
-
-#endif /* _PEER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
deleted file mode 100644
index 72c43ee6312..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-// $Id$
-
-#include "ace/os_include/os_assert.h"
-#include "ace/OS_NS_stdio.h"
-#include "ace/OS_NS_string.h"
-#include "Supplier_Router.h"
-#include "Options.h"
-
-ACE_RCSID(Event_Server, Supplier_Router, "$Id$")
-
-// Handle outgoing messages in a separate thread.
-
-int
-Supplier_Router::svc (void)
-{
- assert (this->is_writer ());
-
- ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
-
- for (ACE_Message_Block *mb = 0;
- this->getq (mb) >= 0;
- )
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) warning: Supplier_Router is "
- "forwarding a message via send_peers\n"));
-
- // Broadcast the message to the Suppliers, even though this is
- // "incorrect" (assuming a oneway flow of events from Suppliers
- // to Consumers)!
-
- if (this->context ()->send_peers (mb) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) send_peers failed in Supplier_Router\n"),
- -1);
- }
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) stopping svc in Supplier_Router\n"));
- return 0;
-}
-
-Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
- : Peer_Router (prc)
-{
- // Increment the reference count.
- this->context ()->duplicate ();
-}
-
-// Initialize the Supplier Router.
-
-int
-Supplier_Router::open (void *)
-{
- if (this->is_reader ())
- {
- // Set the <Peer_Router_Context> to point back to us so that all
- // the Peer_Handler's <put> their incoming <Message_Blocks> to
- // our reader Task.
- this->context ()->peer_router (this);
- return 0;
- }
-
- else // if (this->is_writer ()
- {
- // Increment the reference count.
- this->context ()->duplicate ();
-
- // Make this an active object to handle the error cases in a
- // separate thread.
- return this->activate (Options::instance ()->t_flags ());
- }
-}
-
-// Close down the router.
-
-int
-Supplier_Router::close (u_long)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) closing Supplier_Router %s\n",
- this->is_reader () ? "reader" : "writer"));
-
- if (this->is_writer ())
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
-
- // Both writer and reader call release(), so the context knows when
- // to clean itself up.
- this->context ()->release ();
- return 0;
-}
-
-// Send an <ACE_Message_Block> to the supplier(s).
-
-int
-Supplier_Router::put (ACE_Message_Block *mb,
- ACE_Time_Value *)
-{
- // Perform the necessary control operations before passing
- // the message up the stream.
-
- if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
- {
- this->control (mb);
- return this->put_next (mb);
- }
-
- // If we're the reader then we are responsible for pass messages up
- // to the next Module's reader Task. Note that in a "real"
- // application this is likely where we'd take a look a the actual
- // information that was in the message, e.g., in order to figure out
- // what operation it was and what it's "parameters" where, etc.
- else if (this->is_reader ())
- return this->put_next (mb);
-
- else // if (this->is_writer ())
- {
- // Someone is trying to write to the Supplier. In this
- // implementation this is considered an "error." However, we'll
- // just go ahead and forward the message to the Supplier (who
- // hopefully is prepared to receive it).
- ACE_DEBUG ((LM_WARNING,
- "(%t) warning: sending to a Supplier\n"));
-
- // Queue up the message to processed by <Supplier_Router::svc>.
- // Since we don't expect to be getting many of these messages,
- // we queue them up and run them in a separate thread to avoid
- // taxing the main thread.
- return this->putq (mb);
- }
-}
-
-// Return information about the <Supplier_Router>.
-#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
-# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n")
-#else
-# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n")
-#endif /* ACE_WIN32 || !ACE_USES_WCHAR */
-
-int
-Supplier_Router::info (ACE_TCHAR **strp, size_t length) const
-{
- ACE_TCHAR buf[BUFSIZ];
- ACE_INET_Addr addr;
- const ACE_TCHAR *mod_name = this->name ();
-
- if (this->context ()->acceptor ().get_local_addr (addr) == -1)
- return -1;
-
- ACE_OS::sprintf (buf,
- FMTSTR,
- mod_name,
- addr.get_port_number (),
- ACE_TEXT ("tcp"),
- ACE_TEXT ("# supplier router"),
- this->is_reader () ?
- ACE_TEXT ("reader") : ACE_TEXT ("writer"));
- if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, mod_name, length);
-
- return ACE_OS::strlen (mod_name);
-}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
deleted file mode 100644
index 8a42943c147..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#ifndef _SUPPLIER_ROUTER_H
-#define _SUPPLIER_ROUTER_H
-
-#include "ace/INET_Addr.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/SOCK_Acceptor.h"
-#include "ace/Map_Manager.h"
-#include "ace/Svc_Handler.h"
-#include "Peer_Router.h"
-
-class Supplier_Router : public Peer_Router
-{
- // = TITLE
- // Provides the interface between one or more Suppliers and the
- // Event Server ACE_Stream.
- //
- // = DESCRIPTION
- // This class normally sits on "bottom" of the Stream and sends
- // all messages coming from Suppliers via its "write" <Task>
- // "upstream" to all the Consumers connected to the
- // <Consumer_Router>. Normally, the messages flow up the
- // stream to <Consumer_Router>s. However, if Consumers
- // transmit data to the <Consumer_Router>, we dutifully push it
- // out to the Suppliers via the <Supplier_Router>.
- //
- // When used on the "reader" side of a Stream, the
- // <Supplier_Router> simply forwards all messages up the stream.
- // When used on the "writer" side, the <Supplier_Router> queues
- // up outgoing messages to suppliers and sends them in a
- // separate thread. The reason for this is that it's really an
- // "error" for a <Supplier_Router> to send messages to
- // Suppliers, so we don't expect this to happen very much. When
- // it does we use a separate thread to avoid taxing the main
- // thread, which processes "normal" messages.
- //
- // All of these methods are called via base class pointers by
- // the <ACE_Stream> apparatus. Therefore, we can put them in
- // the protected section.
-public:
- Supplier_Router (Peer_Router_Context *prc);
- // Initialization method.
-
-protected:
- // = ACE_Task hooks.
-
- virtual int open (void *a = 0);
- // Called by the Stream to initialize the router.
-
- virtual int close (u_long flags = 0);
- // Called by the Stream to shutdown the router.
-
- virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- // Called by the <SUPPLIER_HANDLER> to pass a message to the Router.
- // The Router queues up this message, which is then processed in the
- // <svc> method in a separate thread.
-
- virtual int svc (void);
- // Runs in a separate thread to dequeue messages and pass them up
- // the stream.
-
- virtual int info (ACE_TCHAR **info_string, size_t length) const;
- // Dynamic linking hook.
-};
-
-#endif /* _SUPPLIER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp
deleted file mode 100644
index 3858bad1fc2..00000000000
--- a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp
+++ /dev/null
@@ -1,258 +0,0 @@
-// $Id$
-
-// Main driver program for the event server example.
-
-#include "ace/OS_main.h"
-#include "ace/Service_Config.h"
-#include "ace/OS_NS_unistd.h"
-#include "Options.h"
-#include "Consumer_Router.h"
-#include "Event_Analyzer.h"
-#include "Supplier_Router.h"
-#include "ace/Sig_Adapter.h"
-#include "ace/Stream.h"
-
-ACE_RCSID (Event_Server,
- event_server,
- "$Id$")
-
-// Typedef these components to handle multi-threading correctly.
-typedef ACE_Stream<ACE_SYNCH> MT_Stream;
-typedef ACE_Module<ACE_SYNCH> MT_Module;
-
-
-class Event_Server : public ACE_Sig_Adapter
-{
- // = TITLE
- // Run the logic for the <Event_Server>.
-
- //
- // = DESCRIPTION
- // In addition to packaging the <Event_Server> components, this
- // class also handles SIGINT and terminate the entire
- // application process. There are several ways to terminate
- // this application process:
- //
- // 1. Send a SIGINT signal (e.g., via ^C)
- // 2. Type any character on the STDIN.
- //
- // Note that by inheriting from the <ACE_Sig_Adapter> we can
- // shutdown the <ACE_Reactor> cleanly when a SIGINT is
- // generated.
-public:
- Event_Server (void);
- // Constructor.
-
- int svc (void);
- // Run the event-loop for the event server.
-
-private:
- virtual int handle_input (ACE_HANDLE handle);
- // Hook method called back when a user types something into the
- // STDIN in order to shut down the program.
-
- int configure_stream (void);
- // Setup the plumbing in the stream.
-
- int set_watermarks (void);
- // Set the high and low queue watermarks.
-
- int run_event_loop (void);
- // Run the event-loop for the <Event_Server>.
-
- MT_Stream event_server_;
- // The <ACE_Stream> that contains the <Event_Server> application
- // <Modules>.
-};
-
-Event_Server::Event_Server (void)
- : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
- // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is
- // received.
-{
- // Register to trap STDIN from the user.
- if (ACE_Event_Handler::register_stdin_handler (this,
- ACE_Reactor::instance (),
- ACE_Thread_Manager::instance ()) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("register_stdin_handler")));
- // Register to trap the SIGINT signal.
- else if (ACE_Reactor::instance ()->register_handler
- (SIGINT, this) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("register_handler")));
-}
-
-int
-Event_Server::handle_input (ACE_HANDLE)
-{
- // This code here will make sure we actually wait for the user to
- // type something. On platforms like Win32, <handle_input> is called
- // prematurely (even when there is no data).
- char temp_buffer [BUFSIZ];
-
- ssize_t n = ACE_OS::read (ACE_STDIN,
- temp_buffer,
- sizeof (temp_buffer));
- // This ought to be > 0, otherwise something very strange has
- // happened!!
- ACE_ASSERT (n > 0);
- ACE_UNUSED_ARG (n); // To avoid compile warning with ACE_NDEBUG.
-
- Options::instance ()->stop_timer ();
-
- ACE_DEBUG ((LM_INFO,
- ACE_TEXT ("(%t) closing down the test\n")));
- Options::instance ()->print_results ();
-
- ACE_Reactor::instance ()->end_reactor_event_loop ();
- return -1;
-}
-
-int
-Event_Server::configure_stream (void)
-{
- Peer_Router_Context *src;
- // Create the <Supplier_Router>'s routing context. This contains a
- // context shared by both the write-side and read-side of the
- // <Supplier_Router> Module.
- ACE_NEW_RETURN (src,
- Peer_Router_Context (Options::instance ()->supplier_port ()),
- -1);
-
- MT_Module *srm = 0;
- // Create the <Supplier_Router> module.
- ACE_NEW_RETURN (srm,
- MT_Module
- (ACE_TEXT ("Supplier_Router"),
- new Supplier_Router (src),
- new Supplier_Router (src)),
- -1);
-
- MT_Module *eam = 0;
- // Create the <Event_Analyzer> module.
- ACE_NEW_RETURN (eam,
- MT_Module
- (ACE_TEXT ("Event_Analyzer"),
- new Event_Analyzer,
- new Event_Analyzer),
- -1);
-
- Peer_Router_Context *crc;
- // Create the <Consumer_Router>'s routing context. This contains a
- // context shared by both the write-side and read-side of the
- // <Consumer_Router> Module.
- ACE_NEW_RETURN (crc,
- Peer_Router_Context (Options::instance ()->consumer_port ()),
- -1);
-
- MT_Module *crm = 0;
- // Create the <Consumer_Router> module.
- ACE_NEW_RETURN (crm,
- MT_Module
- (ACE_TEXT ("Consumer_Router"),
- new Consumer_Router (crc),
- new Consumer_Router (crc)),
- -1);
-
- // Push the Modules onto the event_server stream.
-
- if (this->event_server_.push (srm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("push (Supplier_Router)")),
- -1);
- else if (this->event_server_.push (eam) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("push (Event_Analyzer)")),
- -1);
- else if (this->event_server_.push (crm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("push (Consumer_Router)")),
- -1);
- return 0;
-}
-
-int
-Event_Server::set_watermarks (void)
-{
- // Set the high and low water marks appropriately. The water marks
- // control how much data can be buffered before the queues are
- // considered "full."
- size_t wm = Options::instance ()->low_water_mark ();
-
- if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM,
- &wm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
- ACE_TEXT ("push (setting low watermark)")),
- -1);
-
- wm = Options::instance ()->high_water_mark ();
- if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM,
- &wm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
- ACE_TEXT ("push (setting high watermark)")),
- -1);
- return 0;
-}
-
-int
-Event_Server::run_event_loop (void)
-{
- // Begin the timer.
- Options::instance ()->start_timer ();
-
- // Perform the main event loop waiting for the user to type ^C or to
- // enter a line on the ACE_STDIN.
-
- ACE_Reactor::instance ()->run_reactor_event_loop ();
-
- // Close down the stream and call the <close> hooks on all the
- // <ACE_Task>s in the various Modules in the Stream.
- this->event_server_.close ();
-
- // Wait for the threads in the <Consumer_Router> and
- // <Supplier_Router> to exit.
- return ACE_Thread_Manager::instance ()->wait ();
-}
-
-int
-Event_Server::svc (void)
-{
- if (this->configure_stream () == -1)
- return -1;
- else if (this->set_watermarks () == -1)
- return -1;
- else if (this->run_event_loop () == -1)
- return -1;
- else
- return 0;
-}
-
-int
-ACE_TMAIN (int argc, ACE_TCHAR *argv[])
-{
-#if defined (ACE_HAS_THREADS)
- Options::instance ()->parse_args (argc, argv);
-
- // Initialize the <Event_Server>.
- Event_Server event_server;
-
- // Run the event server's event-loop.
- int result = event_server.svc ();
-
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("exiting main\n")));
-
- return result;
-#else
- ACE_UNUSED_ARG (argc);
- ACE_UNUSED_ARG (argv);
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("threads not supported on this platform\n")),
- 1);
-#endif /* ACE_HAS_THREADS */
-}