summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2008-03-04 14:51:23 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2008-03-04 14:51:23 +0000
commit99aa8c60282c7b8072eb35eb9ac815702f5bf586 (patch)
treebda96bf8c3a4c2875a083d7b16720533c8ffeaf4 /ACE/examples/ASX
parentc4078c377d74290ebe4e66da0b4975da91732376 (diff)
downloadATCD-99aa8c60282c7b8072eb35eb9ac815702f5bf586.tar.gz
undoing accidental deletion
Diffstat (limited to 'ACE/examples/ASX')
-rw-r--r--ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc26
-rw-r--r--ACE/examples/ASX/CCM_App/CCM_App.cpp122
-rw-r--r--ACE/examples/ASX/CCM_App/Makefile.am59
-rw-r--r--ACE/examples/ASX/CCM_App/SC_Client.cpp13
-rw-r--r--ACE/examples/ASX/CCM_App/SC_Server.cpp86
-rw-r--r--ACE/examples/ASX/CCM_App/svc.conf21
-rw-r--r--ACE/examples/ASX/CCM_App/svc.conf.xml33
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp159
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h71
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event.mpc15
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp80
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h44
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Makefile.am50
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.cpp208
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.h122
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Options.inl141
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp435
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h158
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp165
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h72
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp258
-rw-r--r--ACE/examples/ASX/Event_Server/Makefile.am14
-rw-r--r--ACE/examples/ASX/Event_Server/README79
-rw-r--r--ACE/examples/ASX/Event_Server/Transceiver/Makefile.am35
-rw-r--r--ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc9
-rw-r--r--ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp238
-rw-r--r--ACE/examples/ASX/Event_Server/Transceiver/transceiver.h60
-rw-r--r--ACE/examples/ASX/Makefile.am16
-rw-r--r--ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc25
-rw-r--r--ACE/examples/ASX/Message_Queue/Makefile.am67
-rw-r--r--ACE/examples/ASX/Message_Queue/bounded_buffer.cpp140
-rw-r--r--ACE/examples/ASX/Message_Queue/buffer_stream.cpp314
-rw-r--r--ACE/examples/ASX/Message_Queue/priority_buffer.cpp145
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp138
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h53
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp73
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h37
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Makefile.am50
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Options.cpp209
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Options.h88
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Options.inl166
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp283
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h127
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp137
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h57
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc15
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp271
47 files changed, 5184 insertions, 0 deletions
diff --git a/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc b/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc
new file mode 100644
index 00000000000..b5c40beb84c
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc
@@ -0,0 +1,26 @@
+// -*- MPC -*-
+// $Id$
+
+project(*Lib) : acelib {
+ sharedname = ccm_app
+ Source_Files {
+ CCM_App.cpp
+ }
+}
+
+project(*Server) : aceexe {
+ exename = server
+ after += ASX_CCM_App_Lib
+ Source_Files {
+ SC_Server.cpp
+ }
+}
+
+project(*Client) : aceexe {
+ exename = client
+ after += ASX_CCM_App_Server
+ Source_Files {
+ SC_Client.cpp
+ }
+}
+
diff --git a/ACE/examples/ASX/CCM_App/CCM_App.cpp b/ACE/examples/ASX/CCM_App/CCM_App.cpp
new file mode 100644
index 00000000000..540dd545d90
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/CCM_App.cpp
@@ -0,0 +1,122 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/Stream.h"
+#include "ace/Task.h"
+#include "ace/Module.h"
+#include "ace/svc_export.h"
+
+ACE_RCSID(CCM_App, CCM_App, "$Id$")
+
+typedef ACE_Task<ACE_SYNCH> MT_Task;
+typedef ACE_Stream<ACE_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_SYNCH> MT_Module;
+
+class ACE_Svc_Export Test_Task : public MT_Task
+{
+public:
+ //FUZZ: disable check_for_lack_ACE_OS
+ virtual int open (void *);
+ virtual int close (u_long);
+ //FUZZ: enable check_for_lack_ACE_OS
+
+ virtual int init (int, ACE_TCHAR *[]);
+ virtual int fini (void);
+ virtual int suspend (void);
+ virtual int resume (void);
+};
+
+int
+Test_Task::open (void *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("opening %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+ return 0;
+}
+
+int
+Test_Task::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("closing %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+ return 0;
+}
+
+int
+Test_Task::suspend (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("suspending in %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+ return 0;
+}
+
+int
+Test_Task::resume (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("resuming in %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+ return 0;
+}
+
+int
+Test_Task::init (int, ACE_TCHAR *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("initializing %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+
+ return 0;
+}
+
+int
+Test_Task::fini (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("finalizing %s\n"),
+ this->name () ? this->name () : ACE_TEXT ("task")));
+ return 0;
+}
+
+// Factories used to control configuration.
+
+ACE_SVC_FACTORY_DECLARE (Test_Task)
+ACE_SVC_FACTORY_DEFINE (Test_Task)
+
+// Dynamically linked functions used to control configuration.
+
+extern "C" ACE_Svc_Export MT_Stream *make_stream (void);
+extern "C" ACE_Svc_Export MT_Module *make_da (void);
+extern "C" ACE_Svc_Export MT_Module *make_ea (void);
+extern "C" ACE_Svc_Export MT_Module *make_mr (void);
+
+MT_Stream *
+make_stream (void)
+{
+ return new MT_Stream;
+}
+
+MT_Module *
+make_da (void)
+{
+ return new MT_Module (ACE_TEXT ("Device_Adapter"),
+ new Test_Task, new Test_Task);
+}
+
+MT_Module *
+make_ea (void)
+{
+ return new MT_Module (ACE_TEXT ("Event_Analyzer"),
+ new Test_Task, new Test_Task);
+}
+
+MT_Module *
+make_mr (void)
+{
+ return new MT_Module (ACE_TEXT ("Multicast_Router"),
+ new Test_Task, new Test_Task);
+}
diff --git a/ACE/examples/ASX/CCM_App/Makefile.am b/ACE/examples/ASX/CCM_App/Makefile.am
new file mode 100644
index 00000000000..78ded02a046
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/Makefile.am
@@ -0,0 +1,59 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.ASX_CCM_App_Lib.am
+
+noinst_LTLIBRARIES = libccm_app.la
+
+libccm_app_la_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+libccm_app_la_SOURCES = \
+ CCM_App.cpp
+
+## Makefile.ASX_CCM_App_Server.am
+
+noinst_PROGRAMS = server
+
+server_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+server_SOURCES = \
+ SC_Server.cpp
+
+server_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Makefile.ASX_CCM_App_Client.am
+
+noinst_PROGRAMS += client
+
+client_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+client_SOURCES = \
+ SC_Client.cpp
+
+client_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/CCM_App/SC_Client.cpp b/ACE/examples/ASX/CCM_App/SC_Client.cpp
new file mode 100644
index 00000000000..fbd4439784a
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/SC_Client.cpp
@@ -0,0 +1,13 @@
+// $Id$
+
+#include "ace/ACE.h"
+
+ACE_RCSID(CCM_App, SC_Client, "$Id$")
+
+// Pretty simple, eh? ;-)
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ return 0;
+}
diff --git a/ACE/examples/ASX/CCM_App/SC_Server.cpp b/ACE/examples/ASX/CCM_App/SC_Server.cpp
new file mode 100644
index 00000000000..b93e49005bf
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/SC_Server.cpp
@@ -0,0 +1,86 @@
+// $Id$
+
+// Simple driver program for the server. This driver dynamically
+// links in all the services in the <svc.conf> file.
+
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_main.h"
+#include "ace/Service_Config.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Reactor.h"
+#include "ace/Sig_Adapter.h"
+
+ACE_RCSID(CCM_App, SC_Server, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ virtual int handle_input (ACE_HANDLE handle);
+ virtual int handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask);
+};
+
+int
+Event_Handler::handle_input (ACE_HANDLE handle)
+{
+ char buf[BUFSIZ];
+ ssize_t n = ACE_OS::read (handle, buf, sizeof buf);
+
+ if (n == -1)
+ return -1;
+ else if (n == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_TEXT ("shutting down on EOF\n")),
+ -1);
+ else if (ACE_OS::write (ACE_STDOUT, buf, n) != n)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_TEXT ("%p\n"), ACE_TEXT ("write failed")),
+ -1);
+ else
+ return 0;
+}
+
+int
+Event_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("closing Event_Handler\n")));
+ ACE_Reactor::instance ()->end_reactor_event_loop ();
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Service_Config loggerd;
+ Event_Handler handler;
+ ACE_Sig_Adapter shutdown_handler ((ACE_Sig_Handler_Ex) ACE_Reactor::end_event_loop);
+
+ if (ACE_Event_Handler::register_stdin_handler (&handler,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_stdin_handler")));
+
+ if (loggerd.open (argc,
+ argv,
+ ACE_DEFAULT_LOGGER_KEY,
+ // Don't ignore static services!
+ 0) == -1 && errno != ENOENT)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n%a"),
+ ACE_TEXT ("open"),
+ 1));
+ else if (ACE_Reactor::instance ()->register_handler
+ (SIGINT, &shutdown_handler) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n%a"),
+ ACE_TEXT ("register_handler"),
+ 1));
+
+ // Perform logging service until we receive SIGINT.
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+ return 0;
+}
diff --git a/ACE/examples/ASX/CCM_App/svc.conf b/ACE/examples/ASX/CCM_App/svc.conf
new file mode 100644
index 00000000000..b894e6e69e5
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/svc.conf
@@ -0,0 +1,21 @@
+static ACE_Service_Manager "-d -p 4911"
+
+dynamic Test_Task Service_Object *CCM_App:_make_Test_Task() "-p 3000"
+
+stream dynamic CCM_App STREAM *CCM_App:make_stream() active
+{
+ dynamic Device_Adapter Module *CCM_App:make_da()
+ dynamic Event_Analyzer Module *CCM_App:make_ea()
+ dynamic Multicast_Router Module *CCM_App:make_mr() "-p 3001"
+}
+
+stream CCM_App
+{
+ remove Device_Adapter
+# remove Event_Analyzer
+# remove Multicast_Router
+}
+
+# remove CCM_App
+remove Test_Task
+
diff --git a/ACE/examples/ASX/CCM_App/svc.conf.xml b/ACE/examples/ASX/CCM_App/svc.conf.xml
new file mode 100644
index 00000000000..e743d4ed986
--- /dev/null
+++ b/ACE/examples/ASX/CCM_App/svc.conf.xml
@@ -0,0 +1,33 @@
+<?xml version='1.0'?>
+<!-- Converted from svc.conf by svcconf-convert.pl -->
+<ACE_Svc_Conf>
+ <static id="ACE_Service_Manager" params="-d -p 4911"/>
+ <dynamic id="Test_Task" type="Service_Object">
+ <initializer path="CCM_App" init="_make_Test_Task" params="-p 3000"/>
+ </dynamic>
+ <streamdef>
+ <dynamic id="CCM_App" type="STREAM">
+ <initializer path="CCM_App" init="make_stream"/>
+ </dynamic>
+ <module>
+ <dynamic id="Device_Adapter" type="Module">
+ <initializer path="CCM_App" init="make_da"/>
+ </dynamic>
+ <dynamic id="Event_Analyzer" type="Module">
+ <initializer path="CCM_App" init="make_ea"/>
+ </dynamic>
+ <dynamic id="Multicast_Router" type="Module">
+ <initializer path="CCM_App" init="make_mr" params="-p 3001"/>
+ </dynamic>
+ </module>
+ </streamdef>
+ <stream id="CCM_App">
+ <module>
+ <remove id="Device_Adapter"/>
+ <!-- remove Event_Analyzer -->
+ <!-- remove Multicast_Router -->
+ </module>
+ </stream>
+ <!-- remove CCM_App -->
+ <remove id="Test_Task"/>
+</ACE_Svc_Conf>
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
new file mode 100644
index 00000000000..d5215ffcd26
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
@@ -0,0 +1,159 @@
+// $Id$
+
+#include "ace/os_include/os_assert.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Consumer_Router.h"
+#include "Options.h"
+
+ACE_RCSID(Event_Server, Consumer_Router, "$Id$")
+
+Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+ this->context ()->duplicate ();
+}
+
+// Initialize the Router.
+
+int
+Consumer_Router::open (void *)
+{
+ if (this->is_writer ())
+ {
+ // Set the <Peer_Router_Context> to point back to us so that if
+ // any Consumer's "accidentally" send us data we'll be able to
+ // handle it by passing it down the stream.
+ this->context ()->peer_router (this);
+
+ // Increment the reference count.
+ this->context ()->duplicate ();
+
+ // Make this an active object to handle the error cases in a
+ // separate thread. This is mostly just for illustration, i.e.,
+ // it's probably overkill to use a thread for this!
+ return this->activate (Options::instance ()->t_flags ());
+ }
+ else // if (this->is_reader ())
+
+ // Nothing to do since this side is primarily used to transmit to
+ // Consumers, rather than receive.
+ return 0;
+}
+
+int
+Consumer_Router::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) closing Consumer_Router %s\n"),
+ this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")));
+
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call <release>, so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
+ return 0;
+}
+
+// Handle incoming messages in a separate thread.
+
+int
+Consumer_Router::svc (void)
+{
+ assert (this->is_writer ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) starting svc in Consumer_Router\n")));
+
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) warning: Consumer_Router is ")
+ ACE_TEXT ("forwarding a message to Supplier_Router\n")));
+
+ // Pass this message down to the next Module's writer Task.
+ if (this->put_next (mb) == -1)
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
+ -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) stopping svc in Consumer_Router\n")));
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via destructor.
+}
+
+// Send a <Message_Block> to the supplier(s).
+
+int
+Consumer_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
+{
+ // Perform the necessary control operations before passing the
+ // message down the stream.
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+
+ // If we're the reader then we're responsible for broadcasting
+ // messages to Consumers.
+
+ else if (this->is_reader ())
+ {
+ if (this->context ()->send_peers (mb) == -1)
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
+ -1);
+ else
+ return 0;
+ }
+ else // if (this->is_writer ())
+
+ // Queue up the message to processed by <Consumer_Router::svc>
+ // Since we don't expect to be getting many of these messages, we
+ // queue them up and run them in a separate thread to avoid taxing
+ // the main thread.
+ return this->putq (mb);
+}
+
+// Return information about the <Consumer_Router>.
+#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n")
+#else
+# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n")
+#endif /* ACE_WIN32 || !ACE_USES_WCHAR */
+
+int
+Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_INET_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf,
+ FMTSTR,
+ mod_name,
+ addr.get_port_number (),
+ ACE_TEXT ("tcp"),
+ ACE_TEXT ("# consumer router"),
+ this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"));
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+
+ return ACE_OS::strlen (mod_name);
+}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
new file mode 100644
index 00000000000..062a07116ea
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
@@ -0,0 +1,71 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef _CONSUMER_ROUTER_H
+#define _CONSUMER_ROUTER_H
+
+#include "ace/SOCK_Acceptor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/Svc_Handler.h"
+#include "ace/RW_Thread_Mutex.h"
+#include "Peer_Router.h"
+
+class Consumer_Router : public Peer_Router
+{
+ // = TITLE
+ // Provides the interface between one or more Consumers and the
+ // Event Server <ACE_Stream>.
+ //
+ // = DESCRIPTION
+ // This class normally sits on "top" of the Stream and routes
+ // messages coming from "downstream" to all the Consumers
+ // connected to it via its "read" <Task>. Normally, the messages
+ // flow up the stream from <Supplier_Router>s. However, if
+ // Consumers transmit data to the <Consumer_Router>, we dutifully
+ // push it out to the Suppliers via the <Supplier_Router>.
+ //
+ // When used on the "reader" side of a Stream, the
+ // <Consumer_Router> simply forwards all messages up the stream.
+ // When used on the "writer" side, the <Consumer_Router> queues
+ // up outgoing messages to suppliers and sends them down to the
+ // <Supplier_Router> in a separate thread. The reason for this
+ // is that it's really an "error" for a <Consumer_Router> to
+ // send messages to Suppliers, so we don't expect this to happen
+ // very much. When it does we use a separate thread to avoid
+ // taxing the main thread, which processes "normal" messages.
+ //
+ // All of the methods in this class except the constructor are
+ // called via base class pointers by the <ACE_Stream>.
+ // Therefore, we can put them in the protected section.
+public:
+ Consumer_Router (Peer_Router_Context *prc);
+ // Initialization method.
+
+protected:
+ // = ACE_Task hooks.
+ virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
+ virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the <Peer_Handler> to pass a message to the
+ // <Consumer_Router>. The <Consumer_Router> queues up this message,
+ // which is then processed in the <svc> method in a separate thread.
+
+ virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
+
+ // = Dynamic linking hooks.
+ virtual int info (ACE_TCHAR **info_string, size_t length) const;
+ // Returns information about this service.
+};
+
+#endif /* _CONSUMER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc
new file mode 100644
index 00000000000..f99e912ce04
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc
@@ -0,0 +1,15 @@
+// -*- MPC -*-
+// $Id$
+
+project(*Server) : aceexe {
+ avoids += ace_for_tao
+ exename = Event_Server
+ Source_Files {
+ Consumer_Router.cpp
+ Event_Analyzer.cpp
+ Options.cpp
+ Peer_Router.cpp
+ Supplier_Router.cpp
+ event_server.cpp
+ }
+}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
new file mode 100644
index 00000000000..a064da6459a
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
@@ -0,0 +1,80 @@
+// $Id$
+
+#include "ace/OS_NS_string.h"
+#include "Options.h"
+#include "Event_Analyzer.h"
+
+ACE_RCSID(Event_Server, Event_Analyzer, "$Id$")
+
+int
+Event_Analyzer::open (void *)
+{
+ // No-op for now...
+ return 0;
+}
+
+int
+Event_Analyzer::close (u_long)
+{
+ // No-op for now...
+ return 0;
+}
+
+int
+Event_Analyzer::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd;
+
+ switch (cmd = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ break;
+ }
+ return 0;
+}
+
+int
+Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) passing through Event_Analyser::put() (%s)\n"),
+ this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")));
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ this->control (mb);
+
+ // Just pass the message along to the next Module in the stream...
+ return this->put_next (mb);
+}
+
+int
+Event_Analyzer::init (int, ACE_TCHAR *[])
+{
+ // No-op for now.
+ return 0;
+}
+
+int
+Event_Analyzer::fini (void)
+{
+ // No-op for now.
+ return 0;
+}
+
+int
+Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const
+{
+ const ACE_TCHAR *mod_name = this->name ();
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
new file mode 100644
index 00000000000..d4f88c8b68d
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -0,0 +1,44 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef _EVENT_ANALYZER_H
+#define _EVENT_ANALYZER_H
+
+#include "ace/Stream.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Module.h"
+#include "ace/Task.h"
+
+class Event_Analyzer : public ACE_Task<ACE_SYNCH>
+{
+ // = TITLE
+ // This class forwards all the <ACE_Message_Block>s it receives
+ // onto its neighboring Module in the Stream.
+ //
+ // = DESCRIPTION
+ // In a "real" event service, application-specific processing
+ // would be done in the <put> (or <svc>) method in this class.
+public:
+ // = Initialization hooks called by <ACE_Stream> (not used).
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+
+ virtual int put (ACE_Message_Block *msg,
+ ACE_Time_Value * = 0);
+ // Entry point into this task.
+
+ // Dynamic linking hooks (not used).
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ virtual int fini (void);
+ virtual int info (ACE_TCHAR **info_string,
+ size_t length) const;
+private:
+ virtual int control (ACE_Message_Block *);
+ // Implements the watermark control processing.
+};
+
+#endif /* _EVENT_ANALYZER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am
new file mode 100644
index 00000000000..aefe1873d9d
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am
@@ -0,0 +1,50 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+
+## Makefile.Event_Server.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS = Event_Server
+
+Event_Server_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+Event_Server_SOURCES = \
+ Consumer_Router.cpp \
+ Event_Analyzer.cpp \
+ Options.cpp \
+ Peer_Router.cpp \
+ Supplier_Router.cpp \
+ event_server.cpp \
+ Consumer_Router.h \
+ Event_Analyzer.h \
+ Options.h \
+ Options.inl \
+ Peer_Router.h \
+ Supplier_Router.h
+
+Event_Server_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp
new file mode 100644
index 00000000000..8683f48d153
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp
@@ -0,0 +1,208 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/Thread.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_stdio.h"
+#if defined (ACE_HAS_TRACE)
+# include "ace/OS_NS_strings.h"
+#endif /* ACE_HAS_TRACE */
+
+#include "Options.h"
+
+ACE_RCSID(Event_Server, Options, "$Id$")
+
+/* static */
+Options *Options::instance_ = 0;
+
+Options *
+Options::instance (void)
+{
+ if (Options::instance_ == 0)
+ Options::instance_ = new Options;
+
+ return Options::instance_;
+}
+
+Options::Options (void)
+ : thr_count_ (4),
+ t_flags_ (0),
+ high_water_mark_ (8 * 1024),
+ low_water_mark_ (1024),
+ message_size_ (128),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ debugging_ (0),
+ verbosity_ (0),
+ consumer_port_ (ACE_DEFAULT_SERVER_PORT),
+ supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1)
+{
+}
+
+Options::~Options (void)
+{
+}
+
+void Options::print_results (void)
+{
+#if !defined (ACE_WIN32)
+ ACE_Profile_Timer::ACE_Elapsed_Time et;
+
+ this->itimer_.elapsed_time (et);
+
+ if (this->verbose ())
+ {
+#if defined (ACE_HAS_PRUSAGE_T)
+ ACE_Profile_Timer::Rusage rusage;
+ this->itimer_.get_rusage (rusage);
+
+ ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ());
+ ACE_OS::printf ("%8d = lwpid\n"
+ "%8d = lwp count\n"
+ "%8d = minor page faults\n"
+ "%8d = major page faults\n"
+ "%8d = input blocks\n"
+ "%8d = output blocks\n"
+ "%8d = messages sent\n"
+ "%8d = messages received\n"
+ "%8d = signals received\n"
+ "%8ds, %dms = wait-cpu (latency) time\n"
+ "%8ds, %dms = user lock wait sleep time\n"
+ "%8ds, %dms = all other sleep time\n"
+ "%8d = voluntary context switches\n"
+ "%8d = involuntary context switches\n"
+ "%8d = system calls\n"
+ "%8d = chars read/written\n",
+ (int) rusage.pr_lwpid,
+ (int) rusage.pr_count,
+ (int) rusage.pr_minf,
+ (int) rusage.pr_majf,
+ (int) rusage.pr_inblk,
+ (int) rusage.pr_oublk,
+ (int) rusage.pr_msnd,
+ (int) rusage.pr_mrcv,
+ (int) rusage.pr_sigs,
+ (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000,
+ (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000,
+ (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000,
+ (int) rusage.pr_vctx,
+ (int) rusage.pr_ictx,
+ (int) rusage.pr_sysc,
+ (int) rusage.pr_ioch);
+#else
+ /* Someone needs to write the corresponding dump for rusage... */
+#endif /* ACE_HAS_PRUSAGE_T */
+ }
+
+ ACE_OS::printf ("---------------------\n"
+ "real time = %.3f\n"
+ "user time = %.3f\n"
+ "system time = %.3f\n"
+ "---------------------\n",
+ et.real_time, et.user_time, et.system_time);
+#endif /* ACE_WIN32 */
+}
+
+void
+Options::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ //FUZZ: disable check_for_lack_ACE_OS
+ ACE_LOG_MSG->open (argv[0]);
+
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("c:bdH:i:L:l:M:ns:t:T:v"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ //FUZZ: enable check_for_lack_ACE_OS
+ switch (c)
+ {
+ case 'b':
+ this->t_flags (THR_BOUND);
+ break;
+ case 'c':
+ this->consumer_port (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'd':
+ this->debugging_ = 1;
+ break;
+ case 'H':
+ this->high_water_mark (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'i':
+ this->iterations (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'L':
+ this->low_water_mark (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'l':
+ this->initial_queue_length (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'M':
+ this->message_size (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'n':
+ this->t_flags (THR_NEW_LWP);
+ break;
+ case 's':
+ this->supplier_port (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'T':
+ #if defined (ACE_HAS_TRACE)
+ if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("ON")) == 0)
+ ACE_Trace::start_tracing ();
+ else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("OFF")) == 0)
+ ACE_Trace::stop_tracing ();
+ #endif /* ACE_HAS_TRACE */
+ break;
+ case 't':
+ this->thr_count (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 'v':
+ this->verbosity_ = 1;
+ break;
+ default:
+ ACE_OS::fprintf (stderr, "%s\n"
+ "\t[-b] (THR_BOUND)\n"
+ "\t[-c consumer port]\n"
+ "\t[-d] (enable debugging)\n"
+ "\t[-H high water mark]\n"
+ "\t[-i number of test iterations]\n"
+ "\t[-L low water mark]\n"
+ "\t[-M] message size \n"
+ "\t[-n] (THR_NEW_LWP)\n"
+ "\t[-q max queue size]\n"
+ "\t[-s supplier port]\n"
+ "\t[-t number of threads]\n"
+ "\t[-v] (verbose) \n",
+ ACE_TEXT_ALWAYS_CHAR (argv[0]));
+ ACE_OS::exit (1);
+ /* NOTREACHED */
+ break;
+ }
+
+ // This is a major hack to get the size_t format spec to be a narrow
+ // char, same as the other strings for printf() here. It only works
+ // because this is the end of the source file. It makes the
+ // ACE_SIZE_T_FORMAT_SPECIFIER not use ACE_TEXT, effectively.
+#undef ACE_TEXT
+#define ACE_TEXT(A) A
+ if (this->verbose ())
+ ACE_OS::printf ("%8d = initial concurrency hint\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n"
+ "%8d = THR_BOUND\n"
+ "%8d = THR_NEW_LWP\n",
+ ACE_Thread::getconcurrency (),
+ this->iterations (),
+ this->thr_count (),
+ this->low_water_mark (),
+ this->high_water_mark (),
+ this->message_size (),
+ this->initial_queue_length (),
+ (this->t_flags () & THR_BOUND) != 0,
+ (this->t_flags () & THR_NEW_LWP) != 0);
+}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.h b/ACE/examples/ASX/Event_Server/Event_Server/Options.h
new file mode 100644
index 00000000000..4c935434d26
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.h
@@ -0,0 +1,122 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef OPTIONS_H
+#define OPTIONS_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Profile_Timer.h"
+
+class Options
+{
+ // = TITLE
+ // Option Singleton for Event Server.
+public:
+ static Options *instance (void);
+ // Singleton access point.
+
+ void parse_args (int argc, ACE_TCHAR *argv[]);
+ // Parse the command-line arguments and set the options.
+
+ // = Timer management.
+ void stop_timer (void);
+ void start_timer (void);
+
+ // = Set/get the number of threads.
+ void thr_count (size_t count);
+ size_t thr_count (void);
+
+ // = Set/get the size of the queue.
+ void initial_queue_length (size_t length);
+ size_t initial_queue_length (void);
+
+ // = Set/get the high water mark.
+ void high_water_mark (size_t size);
+ size_t high_water_mark (void);
+
+ // = Set/get the high water mark.
+ void low_water_mark (size_t size);
+ size_t low_water_mark (void);
+
+ // = Set/get the size of a message.
+ void message_size (size_t size);
+ size_t message_size (void);
+
+ // = Set/get the number of iterations.
+ void iterations (size_t n);
+ size_t iterations (void);
+
+ // Set/get threading flags.
+ void t_flags (long flag);
+ long t_flags (void);
+
+ // Set/get supplier port number.
+ void supplier_port (u_short port);
+ u_short supplier_port (void);
+
+ // Set/get consumer port number.
+ void consumer_port (u_short port);
+ u_short consumer_port (void);
+
+ // Enabled if we're in debugging mode.
+ int debug (void);
+
+ // Enabled if we're in verbose mode.
+ int verbose (void);
+
+ // Print the results to the STDERR.
+ void print_results (void);
+
+private:
+ // = Ensure we're a Singleton.
+ Options (void);
+ ~Options (void);
+
+ ACE_Profile_Timer itimer_;
+ // Time the process.
+
+ size_t thr_count_;
+ // Number of threads to spawn.
+
+ long t_flags_;
+ // Flags to <thr_create>.
+
+ size_t high_water_mark_;
+ // ACE_Task high water mark.
+
+ size_t low_water_mark_;
+ // ACE_Task low water mark.
+
+ size_t message_size_;
+ // Size of a message.
+
+ size_t initial_queue_length_;
+ // Initial number of items in the queue.
+
+ size_t iterations_;
+ // Number of iterations to run the test program.
+
+ int debugging_;
+ // Extra debugging info.
+
+ int verbosity_;
+ // Extra verbose messages.
+
+ u_short consumer_port_;
+ // Port that the Consumer_Router is using.
+
+ u_short supplier_port_;
+ // Port that the Supplier_Router is using.
+
+ static Options *instance_;
+ // Static Singleton.
+
+};
+
+#include "Options.inl"
+#endif /* OPTIONS_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl
new file mode 100644
index 00000000000..87ff395c503
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl
@@ -0,0 +1,141 @@
+/* -*- C++ -*- */
+// $Id$
+
+/* Option manager for ustreams */
+
+// Since this is only included in Options.h these should stay
+// inline, not ACE_INLINE.
+// FUZZ: disable check_for_inline
+
+inline void
+Options::supplier_port (u_short port)
+{
+ this->supplier_port_ = port;
+}
+
+inline u_short
+Options::supplier_port (void)
+{
+ return this->supplier_port_;
+}
+
+inline void
+Options::consumer_port (u_short port)
+{
+ this->consumer_port_ = port;
+}
+
+inline u_short
+Options::consumer_port (void)
+{
+ return this->consumer_port_;
+}
+
+inline void
+Options::start_timer (void)
+{
+ this->itimer_.start ();
+}
+
+inline void
+Options::stop_timer (void)
+{
+ this->itimer_.stop ();
+}
+
+inline void
+Options::thr_count (size_t count)
+{
+ this->thr_count_ = count;
+}
+
+inline size_t
+Options::thr_count (void)
+{
+ return this->thr_count_;
+}
+
+inline void
+Options::initial_queue_length (size_t length)
+{
+ this->initial_queue_length_ = length;
+}
+
+inline size_t
+Options::initial_queue_length (void)
+{
+ return this->initial_queue_length_;
+}
+
+inline void
+Options::high_water_mark (size_t size)
+{
+ this->high_water_mark_ = size;
+}
+
+inline size_t
+Options::high_water_mark (void)
+{
+ return this->high_water_mark_;
+}
+
+inline void
+Options::low_water_mark (size_t size)
+{
+ this->low_water_mark_ = size;
+}
+
+inline size_t
+Options::low_water_mark (void)
+{
+ return this->low_water_mark_;
+}
+
+inline void
+Options::message_size (size_t size)
+{
+ this->message_size_ = size;
+}
+
+inline size_t
+Options::message_size (void)
+{
+ return this->message_size_;
+}
+
+inline void
+Options::iterations (size_t n)
+{
+ this->iterations_ = n;
+}
+
+inline size_t
+Options::iterations (void)
+{
+ return this->iterations_;
+}
+
+inline void
+Options::t_flags (long flag)
+{
+ this->t_flags_ |= flag;
+}
+
+inline long
+Options::t_flags (void)
+{
+ return this->t_flags_;
+}
+
+inline int
+Options::debug (void)
+{
+ return this->debugging_;
+}
+
+inline int
+Options::verbose (void)
+{
+ return this->verbosity_;
+}
+
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
new file mode 100644
index 00000000000..cb82eec16df
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -0,0 +1,435 @@
+// $Id$
+
+#if !defined (_PEER_ROUTER_C)
+#define _PEER_ROUTER_C
+
+#include "ace/Service_Config.h"
+#include "ace/Get_Opt.h"
+#include "Options.h"
+#include "Peer_Router.h"
+
+ACE_RCSID(Event_Server, Peer_Router, "$Id$")
+
+// Send the <ACE_Message_Block> to all the peers. Note that in a
+// "real" application this logic would most likely be more selective,
+// i.e., it would actually do "routing" based on addressing
+// information passed in the <ACE_Message_Block>.
+
+int
+Peer_Router_Context::send_peers (ACE_Message_Block *mb)
+{
+ PEER_ITERATOR map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+
+ // Skip past the header and get the message to send.
+ ACE_Message_Block *data_block = mb->cont ();
+
+ // Use an iterator to "multicast" the data to *all* the registered
+ // peers. Note that this doesn't really multicast, it just makes a
+ // "logical" copy of the <ACE_Message_Block> and enqueues it in the
+ // appropriate <Peer_Handler> corresponding to each peer. Note that
+ // a "real" application would probably "route" the data to a subset
+ // of connected peers here, rather than send it to all the peers.
+
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) sending to peer via handle %d\n"),
+ ss->ext_id_));
+
+ iterations++;
+
+ // Increment reference count before sending since the
+ // <Peer_Handler> might be running in its own thread of control.
+ bytes += ss->int_id_->put (data_block->duplicate ());
+ }
+
+ mb->release ();
+ return bytes == 0 ? 0 : bytes / iterations;
+}
+
+// Remove the <Peer_Handler> from the peer connection map.
+
+int
+Peer_Router_Context::unbind_peer (ROUTING_KEY key)
+{
+ return this->peer_map_.unbind (key);
+}
+
+// Add the <Peer_Handler> to the peer connection map.
+
+int
+Peer_Router_Context::bind_peer (ROUTING_KEY key,
+ Peer_Handler *peer_handler)
+{
+ return this->peer_map_.bind (key, peer_handler);
+}
+
+void
+Peer_Router_Context::duplicate (void)
+{
+ this->reference_count_++;
+}
+
+void
+Peer_Router_Context::release (void)
+{
+ ACE_ASSERT (this->reference_count_ > 0);
+ this->reference_count_--;
+
+ if (this->reference_count_ == 0)
+ delete this;
+}
+
+Peer_Router_Context::Peer_Router_Context (u_short port)
+ : reference_count_ (0)
+{
+ // Initialize the Acceptor's "listen-mode" socket.
+ ACE_INET_Addr endpoint (port);
+ if (this->open (endpoint) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Acceptor::open")));
+
+ // Initialize the connection map.
+ else if (this->peer_map_.open () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Map_Manager::open")));
+ else
+ {
+ ACE_INET_Addr addr;
+
+ if (this->acceptor ().get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) initializing %C on port = %d, handle = %d, this = %u\n"),
+ addr.get_port_number () == Options::instance ()->supplier_port ()
+ ? "Supplier_Handler" : "Consumer_Handler",
+ addr.get_port_number (),
+ this->acceptor().get_handle (),
+ this));
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("get_local_addr")));
+ }
+}
+
+Peer_Router_Context::~Peer_Router_Context (void)
+{
+ // Free up the handle and close down the listening socket.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) closing down Peer_Router_Context\n")));
+
+ // Close down the Acceptor and take ourselves out of the Reactor.
+ this->handle_close ();
+
+ PEER_ITERATOR map_iter = this->peer_map_;
+
+ // Make sure to take all the handles out of the map to avoid
+ // "resource leaks."
+
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) closing down peer on handle %d\n"),
+ ss->ext_id_));
+
+ if (ACE_Reactor::instance ()->remove_handler
+ (ss->ext_id_,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) p\n"),
+ ACE_TEXT ("remove_handle")));
+ }
+
+ // Close down the map.
+ this->peer_map_.close ();
+}
+
+Peer_Router *
+Peer_Router_Context::peer_router (void)
+{
+ return this->peer_router_;
+}
+
+void
+Peer_Router_Context::peer_router (Peer_Router *pr)
+{
+ this->peer_router_ = pr;
+}
+
+// Factory Method that creates a new <Peer_Handler> for each
+// connection.
+
+int
+Peer_Router_Context::make_svc_handler (Peer_Handler *&sh)
+{
+ ACE_NEW_RETURN (sh,
+ Peer_Handler (this),
+ -1);
+ return 0;
+}
+
+Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
+ : peer_router_context_ (prc)
+{
+}
+
+// Send output to a peer. Note that this implementation "blocks" if
+// flow control occurs. This is undesirable for "real" applications.
+// The best way around this is to make the <Peer_Handler> an Active
+// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway
+// application.
+
+int
+Peer_Handler::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
+{
+#if 0
+ // If we're running as Active Objects just enqueue the message here.
+ return this->putq (mb, tv);
+#else
+ ACE_UNUSED_ARG (tv);
+
+ int result = this->peer ().send_n (mb->rd_ptr (),
+ mb->length ());
+ // Release the memory.
+ mb->release ();
+
+ return result;
+#endif /* 0 */
+}
+
+// Initialize a newly connected handler.
+
+int
+Peer_Handler::open (void *)
+{
+ ACE_TCHAR buf[BUFSIZ], *p = buf;
+
+ if (this->peer_router_context_->peer_router ()->info (&p,
+ sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) creating handler for %s, handle = %d\n"),
+ buf,
+ this->get_handle ()));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("info")),
+ -1);
+#if 0
+ // If we're running as an Active Object activate the Peer_Handler
+ // here.
+ if (this->activate (Options::instance ()->t_flags ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("activation of thread failed")),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Peer_Handler::open registering with Reactor for handle_input\n")));
+#else
+
+ // Register with the Reactor to receive messages from our Peer.
+ if (ACE_Reactor::instance ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_handler")),
+ -1);
+#endif /* 0 */
+
+ // Insert outselves into the routing map.
+ else if (this->peer_router_context_->bind_peer (this->get_handle (),
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("bind_peer")),
+ -1);
+ else
+ return 0;
+}
+
+// Receive a message from a Peer.
+
+int
+Peer_Handler::handle_input (ACE_HANDLE h)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) input arrived on handle %d\n"),
+ h));
+
+ ACE_Message_Block *db;
+
+ ACE_NEW_RETURN (db, ACE_Message_Block (BUFSIZ), -1);
+
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
+ ACE_Message_Block::MB_PROTO, db);
+ // Check for memory failures.
+ if (hb == 0)
+ {
+ db->release ();
+ errno = ENOMEM;
+ return -1;
+ }
+
+ ssize_t n = this->peer ().recv (db->rd_ptr (),
+ db->size ());
+
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p"),
+ ACE_TEXT ("recv failed")),
+ -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p"),
+ ACE_TEXT ("unbind failed")),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) shutting down handle %d\n"), h));
+ // Instruct the <ACE_Reactor> to deregister us by returning -1.
+ return -1;
+ }
+ else
+ {
+ // Transform incoming buffer into an <ACE_Message_Block>.
+
+ // First, increment the write pointer to the end of the newly
+ // read data block.
+ db->wr_ptr (n);
+
+ // Second, copy the "address" into the header block. Note that
+ // for this implementation the HANDLE we receive the message on
+ // is considered the "address." A "real" application would want
+ // to do something more sophisticated.
+ *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
+
+ // Third, update the write pointer in the header block.
+ hb->wr_ptr (sizeof (ACE_HANDLE));
+
+ // Finally, pass the message through the stream. Note that we
+ // use <Task::put> here because this gives the method at *our*
+ // level in the stream a chance to do something with the message
+ // before it is sent up the other side. For instance, if we
+ // receive messages in the <Supplier_Router>, it will just call
+ // <put_next> and send them up the stream to the
+ // <Consumer_Router> (which broadcasts them to consumers).
+ // However, if we receive messages in the <Consumer_Router>, it
+ // could reply to the Consumer with an error since it's not
+ // correct for Consumers to send messages (we don't do this in
+ // the current implementation, but it could be done in a "real"
+ // application).
+
+ if (this->peer_router_context_->peer_router ()->put (hb) == -1)
+ return -1;
+ else
+ return 0;
+ }
+}
+
+Peer_Router::Peer_Router (Peer_Router_Context *prc)
+ : peer_router_context_ (prc)
+{
+}
+
+Peer_Router_Context *
+Peer_Router::context (void) const
+{
+ return this->peer_router_context_;
+}
+
+int
+Peer_Router::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
+
+ switch (command = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ return -1;
+ }
+ return 0;
+}
+
+#if 0
+
+// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
+// a single thread of control. It would be easy to make them Active
+// Objects by calling activate() in Peer_Handler::open(), making
+// Peer_Handler::put() enqueue each message on the message queue, and
+// (3) then running the following svc() routine to route each message
+// to its final destination within a separate thread. Note that we'd
+// want to move the svc() call up to the Consumer_Router and
+// Supplier_Router level in order to get the right level of control
+// for input and output.
+
+Peer_Handler::svc (void)
+{
+ ACE_Message_Block *db, *hb;
+
+ // Do an endless loop
+ for (;;)
+ {
+ db = new Message_Block (BUFSIZ);
+ hb = new Message_Block (sizeof (ROUTING_KEY),
+ Message_Block::MB_PROTO,
+ db);
+
+ ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ());
+
+ if (n == -1)
+ LM_ERROR_RETURN ((LOG_ERROR,
+ ACE_TEXT ("%p"),
+ ACE_TEXT ("recv failed")),
+ -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
+ LM_ERROR_RETURN ((LOG_ERROR,
+ ACE_TEXT ("%p"),
+ ACE_TEXT ("unbind failed")),
+ -1);
+ LM_DEBUG ((LOG_DEBUG,
+ ACE_TEXT ("(%t) shutting down \n")));
+
+ // We do not need to be deregistered by reactor
+ // as we were not registered at all.
+ return -1;
+ }
+ else
+ {
+ // Transform incoming buffer into a Message.
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
+ hb->wr_ptr (sizeof (long));
+
+ // Pass the message to the stream.
+ if (this->peer_router_context_->peer_router ()->reply (hb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Peer_Handler.svc : peer_router->reply failed")),
+ -1);
+ }
+ }
+ return 0;
+}
+#endif /* 0 */
+#endif /* _PEER_ROUTER_C */
+
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h
new file mode 100644
index 00000000000..044ef07ea07
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h
@@ -0,0 +1,158 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef _PEER_ROUTER_H
+#define _PEER_ROUTER_H
+
+#include "ace/Acceptor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SOCK_Acceptor.h"
+#include "ace/Svc_Handler.h"
+#include "ace/Map_Manager.h"
+#include "ace/RW_Thread_Mutex.h"
+
+// Type of search key for CONSUMER_MAP
+typedef ACE_HANDLE ROUTING_KEY;
+
+// Forward declarations.
+class Peer_Router;
+class Peer_Router_Context;
+
+class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
+{
+ // = TITLE
+ // Receive input from a Peer and forward to the appropriate
+ // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>).
+public:
+ Peer_Handler (Peer_Router_Context * = 0);
+ // Initialization method.
+
+ virtual int open (void * = 0);
+ // Called by the ACE_Acceptor::handle_input() to activate this
+ // object.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive input from a peer.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ // Send output to a peer. Note that this implementation "blocks" if
+ // flow control occurs. This is undesirable for "real"
+ // applications. The best way around this is to make the
+ // <Peer_Handler> an Active Object, e.g., as done in the
+ // $ACE_ROOT/apps/Gateway/Gateway application.
+
+protected:
+ Peer_Router_Context *peer_router_context_;
+ // Pointer to router context. This maintains the state that is
+ // shared by both Tasks in a <Peer_Router> Module.
+};
+
+class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
+{
+ // = TITLE
+ // Defines state and behavior shared between both Tasks in a
+ // <Peer_Router> Module.
+ //
+ // = DESCRIPTION
+ // This class also serves as an <ACE_Acceptor>, which creates
+ // <Peer_Handlers> when Peers connect.
+public:
+ // = Initialization and termination methods.
+ Peer_Router_Context (u_short port);
+ // Constructor.
+
+ virtual int unbind_peer (ROUTING_KEY);
+ // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds
+ // to the <ROUTING_KEY>.
+
+ virtual int bind_peer (ROUTING_KEY, Peer_Handler *);
+ // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the
+ // <ROUTING_KEY>.
+
+ int send_peers (ACE_Message_Block *mb);
+ // Send the <ACE_Message_Block> to all the peers. Note that in a
+ // "real" application this logic would most likely be more
+ // selective, i.e., it would actually do "routing" based on
+ // addressing information passed in the <ACE_Message_Block>.
+
+ int make_svc_handler (Peer_Handler *&sh);
+ // Factory Method that creates a new <Peer_Handler> for each
+ // connection. This method overrides the default behavior in
+ // <ACE_Acceptor>.
+
+ // = Set/Get Router Task.
+ Peer_Router *peer_router (void);
+ void peer_router (Peer_Router *);
+
+ void release (void);
+ // Decrement the reference count and delete <this> when count == 0;
+
+ void duplicate (void);
+ // Increment the reference count.
+
+private:
+ Peer_Router *peer_router_;
+ // Pointer to the <Peer_Router> that we are accepting for.
+
+ // = Useful typedefs.
+ typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
+ PEER_MAP;
+ typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
+ PEER_ITERATOR;
+ typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>
+ PEER_ENTRY;
+
+ PEER_MAP peer_map_;
+ // Map used to keep track of active peers.
+
+ int reference_count_;
+ // Keep track of when we can delete ourselves.
+
+ ~Peer_Router_Context (void);
+ // Private to ensure dynamic allocation.
+
+ friend class Friend_Of_Peer_Router_Context;
+ // Declare a friend class to avoid compiler warnings because the
+ // destructor is private.
+};
+
+class Peer_Router : public ACE_Task<ACE_SYNCH>
+{
+ // = TITLE
+ // This abstract base class provides mechanisms for routing
+ // messages to/from a <ACE_Stream> from/to one or more peers (which
+ // are typically running on remote hosts).
+ //
+ // = DESCRIPTION
+ // Subclasses of <Peer_Router> (such as <Consumer_Router> or
+ // <Supplier_Router>) override the <open>, <close>, and
+ // <put> methods to specialize the behavior of the router to
+ // meet application-specific requirements.
+protected:
+ Peer_Router (Peer_Router_Context *prc);
+ // Initialization method.
+
+ virtual int control (ACE_Message_Block *);
+ // Handle control messages arriving from adjacent Modules.
+
+ Peer_Router_Context *context (void) const;
+ // Returns the routing context.
+
+ typedef ACE_Task<ACE_SYNCH> inherited;
+ // Helpful typedef.
+
+private:
+ Peer_Router_Context *peer_router_context_;
+ // Reference to the context shared by the writer and reader Tasks,
+ // e.g., in the <Consumer_Router> and <Supplier_Router> Modules.
+
+ // = Prevent copies and pass-by-value.
+ Peer_Router (const Peer_Router &);
+ void operator= (const Peer_Router &);
+};
+
+#endif /* _PEER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
new file mode 100644
index 00000000000..72c43ee6312
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -0,0 +1,165 @@
+// $Id$
+
+#include "ace/os_include/os_assert.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Supplier_Router.h"
+#include "Options.h"
+
+ACE_RCSID(Event_Server, Supplier_Router, "$Id$")
+
+// Handle outgoing messages in a separate thread.
+
+int
+Supplier_Router::svc (void)
+{
+ assert (this->is_writer ());
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
+
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Supplier_Router is "
+ "forwarding a message via send_peers\n"));
+
+ // Broadcast the message to the Suppliers, even though this is
+ // "incorrect" (assuming a oneway flow of events from Suppliers
+ // to Consumers)!
+
+ if (this->context ()->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Supplier_Router\n"),
+ -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) stopping svc in Supplier_Router\n"));
+ return 0;
+}
+
+Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+ // Increment the reference count.
+ this->context ()->duplicate ();
+}
+
+// Initialize the Supplier Router.
+
+int
+Supplier_Router::open (void *)
+{
+ if (this->is_reader ())
+ {
+ // Set the <Peer_Router_Context> to point back to us so that all
+ // the Peer_Handler's <put> their incoming <Message_Blocks> to
+ // our reader Task.
+ this->context ()->peer_router (this);
+ return 0;
+ }
+
+ else // if (this->is_writer ()
+ {
+ // Increment the reference count.
+ this->context ()->duplicate ();
+
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
+ }
+}
+
+// Close down the router.
+
+int
+Supplier_Router::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing Supplier_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
+
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
+ return 0;
+}
+
+// Send an <ACE_Message_Block> to the supplier(s).
+
+int
+Supplier_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
+{
+ // Perform the necessary control operations before passing
+ // the message up the stream.
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+
+ // If we're the reader then we are responsible for pass messages up
+ // to the next Module's reader Task. Note that in a "real"
+ // application this is likely where we'd take a look a the actual
+ // information that was in the message, e.g., in order to figure out
+ // what operation it was and what it's "parameters" where, etc.
+ else if (this->is_reader ())
+ return this->put_next (mb);
+
+ else // if (this->is_writer ())
+ {
+ // Someone is trying to write to the Supplier. In this
+ // implementation this is considered an "error." However, we'll
+ // just go ahead and forward the message to the Supplier (who
+ // hopefully is prepared to receive it).
+ ACE_DEBUG ((LM_WARNING,
+ "(%t) warning: sending to a Supplier\n"));
+
+ // Queue up the message to processed by <Supplier_Router::svc>.
+ // Since we don't expect to be getting many of these messages,
+ // we queue them up and run them in a separate thread to avoid
+ // taxing the main thread.
+ return this->putq (mb);
+ }
+}
+
+// Return information about the <Supplier_Router>.
+#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n")
+#else
+# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n")
+#endif /* ACE_WIN32 || !ACE_USES_WCHAR */
+
+int
+Supplier_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_INET_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf,
+ FMTSTR,
+ mod_name,
+ addr.get_port_number (),
+ ACE_TEXT ("tcp"),
+ ACE_TEXT ("# supplier router"),
+ this->is_reader () ?
+ ACE_TEXT ("reader") : ACE_TEXT ("writer"));
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+
+ return ACE_OS::strlen (mod_name);
+}
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
new file mode 100644
index 00000000000..8a42943c147
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
@@ -0,0 +1,72 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef _SUPPLIER_ROUTER_H
+#define _SUPPLIER_ROUTER_H
+
+#include "ace/INET_Addr.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SOCK_Acceptor.h"
+#include "ace/Map_Manager.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+class Supplier_Router : public Peer_Router
+{
+ // = TITLE
+ // Provides the interface between one or more Suppliers and the
+ // Event Server ACE_Stream.
+ //
+ // = DESCRIPTION
+ // This class normally sits on "bottom" of the Stream and sends
+ // all messages coming from Suppliers via its "write" <Task>
+ // "upstream" to all the Consumers connected to the
+ // <Consumer_Router>. Normally, the messages flow up the
+ // stream to <Consumer_Router>s. However, if Consumers
+ // transmit data to the <Consumer_Router>, we dutifully push it
+ // out to the Suppliers via the <Supplier_Router>.
+ //
+ // When used on the "reader" side of a Stream, the
+ // <Supplier_Router> simply forwards all messages up the stream.
+ // When used on the "writer" side, the <Supplier_Router> queues
+ // up outgoing messages to suppliers and sends them in a
+ // separate thread. The reason for this is that it's really an
+ // "error" for a <Supplier_Router> to send messages to
+ // Suppliers, so we don't expect this to happen very much. When
+ // it does we use a separate thread to avoid taxing the main
+ // thread, which processes "normal" messages.
+ //
+ // All of these methods are called via base class pointers by
+ // the <ACE_Stream> apparatus. Therefore, we can put them in
+ // the protected section.
+public:
+ Supplier_Router (Peer_Router_Context *prc);
+ // Initialization method.
+
+protected:
+ // = ACE_Task hooks.
+
+ virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
+ virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the <SUPPLIER_HANDLER> to pass a message to the Router.
+ // The Router queues up this message, which is then processed in the
+ // <svc> method in a separate thread.
+
+ virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
+
+ virtual int info (ACE_TCHAR **info_string, size_t length) const;
+ // Dynamic linking hook.
+};
+
+#endif /* _SUPPLIER_ROUTER_H */
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp
new file mode 100644
index 00000000000..3858bad1fc2
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp
@@ -0,0 +1,258 @@
+// $Id$
+
+// Main driver program for the event server example.
+
+#include "ace/OS_main.h"
+#include "ace/Service_Config.h"
+#include "ace/OS_NS_unistd.h"
+#include "Options.h"
+#include "Consumer_Router.h"
+#include "Event_Analyzer.h"
+#include "Supplier_Router.h"
+#include "ace/Sig_Adapter.h"
+#include "ace/Stream.h"
+
+ACE_RCSID (Event_Server,
+ event_server,
+ "$Id$")
+
+// Typedef these components to handle multi-threading correctly.
+typedef ACE_Stream<ACE_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_SYNCH> MT_Module;
+
+
+class Event_Server : public ACE_Sig_Adapter
+{
+ // = TITLE
+ // Run the logic for the <Event_Server>.
+
+ //
+ // = DESCRIPTION
+ // In addition to packaging the <Event_Server> components, this
+ // class also handles SIGINT and terminate the entire
+ // application process. There are several ways to terminate
+ // this application process:
+ //
+ // 1. Send a SIGINT signal (e.g., via ^C)
+ // 2. Type any character on the STDIN.
+ //
+ // Note that by inheriting from the <ACE_Sig_Adapter> we can
+ // shutdown the <ACE_Reactor> cleanly when a SIGINT is
+ // generated.
+public:
+ Event_Server (void);
+ // Constructor.
+
+ int svc (void);
+ // Run the event-loop for the event server.
+
+private:
+ virtual int handle_input (ACE_HANDLE handle);
+ // Hook method called back when a user types something into the
+ // STDIN in order to shut down the program.
+
+ int configure_stream (void);
+ // Setup the plumbing in the stream.
+
+ int set_watermarks (void);
+ // Set the high and low queue watermarks.
+
+ int run_event_loop (void);
+ // Run the event-loop for the <Event_Server>.
+
+ MT_Stream event_server_;
+ // The <ACE_Stream> that contains the <Event_Server> application
+ // <Modules>.
+};
+
+Event_Server::Event_Server (void)
+ : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
+ // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is
+ // received.
+{
+ // Register to trap STDIN from the user.
+ if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_stdin_handler")));
+ // Register to trap the SIGINT signal.
+ else if (ACE_Reactor::instance ()->register_handler
+ (SIGINT, this) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_handler")));
+}
+
+int
+Event_Server::handle_input (ACE_HANDLE)
+{
+ // This code here will make sure we actually wait for the user to
+ // type something. On platforms like Win32, <handle_input> is called
+ // prematurely (even when there is no data).
+ char temp_buffer [BUFSIZ];
+
+ ssize_t n = ACE_OS::read (ACE_STDIN,
+ temp_buffer,
+ sizeof (temp_buffer));
+ // This ought to be > 0, otherwise something very strange has
+ // happened!!
+ ACE_ASSERT (n > 0);
+ ACE_UNUSED_ARG (n); // To avoid compile warning with ACE_NDEBUG.
+
+ Options::instance ()->stop_timer ();
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("(%t) closing down the test\n")));
+ Options::instance ()->print_results ();
+
+ ACE_Reactor::instance ()->end_reactor_event_loop ();
+ return -1;
+}
+
+int
+Event_Server::configure_stream (void)
+{
+ Peer_Router_Context *src;
+ // Create the <Supplier_Router>'s routing context. This contains a
+ // context shared by both the write-side and read-side of the
+ // <Supplier_Router> Module.
+ ACE_NEW_RETURN (src,
+ Peer_Router_Context (Options::instance ()->supplier_port ()),
+ -1);
+
+ MT_Module *srm = 0;
+ // Create the <Supplier_Router> module.
+ ACE_NEW_RETURN (srm,
+ MT_Module
+ (ACE_TEXT ("Supplier_Router"),
+ new Supplier_Router (src),
+ new Supplier_Router (src)),
+ -1);
+
+ MT_Module *eam = 0;
+ // Create the <Event_Analyzer> module.
+ ACE_NEW_RETURN (eam,
+ MT_Module
+ (ACE_TEXT ("Event_Analyzer"),
+ new Event_Analyzer,
+ new Event_Analyzer),
+ -1);
+
+ Peer_Router_Context *crc;
+ // Create the <Consumer_Router>'s routing context. This contains a
+ // context shared by both the write-side and read-side of the
+ // <Consumer_Router> Module.
+ ACE_NEW_RETURN (crc,
+ Peer_Router_Context (Options::instance ()->consumer_port ()),
+ -1);
+
+ MT_Module *crm = 0;
+ // Create the <Consumer_Router> module.
+ ACE_NEW_RETURN (crm,
+ MT_Module
+ (ACE_TEXT ("Consumer_Router"),
+ new Consumer_Router (crc),
+ new Consumer_Router (crc)),
+ -1);
+
+ // Push the Modules onto the event_server stream.
+
+ if (this->event_server_.push (srm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Supplier_Router)")),
+ -1);
+ else if (this->event_server_.push (eam) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Event_Analyzer)")),
+ -1);
+ else if (this->event_server_.push (crm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Consumer_Router)")),
+ -1);
+ return 0;
+}
+
+int
+Event_Server::set_watermarks (void)
+{
+ // Set the high and low water marks appropriately. The water marks
+ // control how much data can be buffered before the queues are
+ // considered "full."
+ size_t wm = Options::instance ()->low_water_mark ();
+
+ if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM,
+ &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (setting low watermark)")),
+ -1);
+
+ wm = Options::instance ()->high_water_mark ();
+ if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM,
+ &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (setting high watermark)")),
+ -1);
+ return 0;
+}
+
+int
+Event_Server::run_event_loop (void)
+{
+ // Begin the timer.
+ Options::instance ()->start_timer ();
+
+ // Perform the main event loop waiting for the user to type ^C or to
+ // enter a line on the ACE_STDIN.
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+
+ // Close down the stream and call the <close> hooks on all the
+ // <ACE_Task>s in the various Modules in the Stream.
+ this->event_server_.close ();
+
+ // Wait for the threads in the <Consumer_Router> and
+ // <Supplier_Router> to exit.
+ return ACE_Thread_Manager::instance ()->wait ();
+}
+
+int
+Event_Server::svc (void)
+{
+ if (this->configure_stream () == -1)
+ return -1;
+ else if (this->set_watermarks () == -1)
+ return -1;
+ else if (this->run_event_loop () == -1)
+ return -1;
+ else
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+#if defined (ACE_HAS_THREADS)
+ Options::instance ()->parse_args (argc, argv);
+
+ // Initialize the <Event_Server>.
+ Event_Server event_server;
+
+ // Run the event server's event-loop.
+ int result = event_server.svc ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("exiting main\n")));
+
+ return result;
+#else
+ ACE_UNUSED_ARG (argc);
+ ACE_UNUSED_ARG (argv);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("threads not supported on this platform\n")),
+ 1);
+#endif /* ACE_HAS_THREADS */
+}
diff --git a/ACE/examples/ASX/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Makefile.am
new file mode 100644
index 00000000000..f00f4c3dd09
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Makefile.am
@@ -0,0 +1,14 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+SUBDIRS = \
+ Event_Server \
+ Transceiver
+
diff --git a/ACE/examples/ASX/Event_Server/README b/ACE/examples/ASX/Event_Server/README
new file mode 100644
index 00000000000..262b7ee9633
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/README
@@ -0,0 +1,79 @@
+This subdirectory illustrates a number of the ACE ASX framework
+features using an ACE_Stream application called the Event Server. For
+more information on the design and use of the ACE ASX framework please
+see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
+http://www.cs.wustl.edu/~schmidt/ACE-concurrency.ps.gz. For more
+information on the Event Server, please see
+http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz.
+
+The Event Server example works as follows:
+
+1. When the ./Event_Server/event_server executable is run it
+ creates two SOCK_Acceptors, which listen for and accept incoming
+ connections from Consumers and Suppliers.
+
+2. The ./Event_Server/Transceiver/transceiver application plays
+ the role of either a Consumer or a Supplier (with the current
+ implementation it can only play one role at a time). The
+ transceiver process can be started multiple times. Each call
+ should be either:
+
+ # Consumer
+ % transceiver -p 10002 -h hostname -C
+
+ or
+
+ # Supplier
+ % transceiver -p 10003 -h hostname -S
+
+ where 10002 and 10003 are the default Consumer listening port and
+ the Supplier listening port, respectively, on the event server,
+ "hostname" is the name of the machine the event_server is running,
+ and -C and -S indicate that the transceiver plays the role of a
+ Consumer or Supplier, respectively. I typically run the
+ Consumer(s) and Supplier(s) in different windows to make it easier
+ to understand the output.
+
+3. Once the Consumer(s) and Supplier(s) are connected, you can
+ type data from any Supplier window. This data will be routed
+ through the Modules/Tasks in the event_server's Stream and be
+ forwarded to the Consumer(s).
+
+ Since the transceivers are full-duplex you can also send messages
+ from the Consumer(s) to Supplier(s). However, the Event Server will
+ warn you about this since it's not really kosher to have Consumers
+ sending to Suppliers...
+
+4. When you want to shut down the tranceivers or event server
+ just type ^C (which generates a SIGINT) or type any input in the
+ window running the Event Server application.
+
+What makes this example particularly interesting is that once you've
+got the hang of the ASX Streams architecture, you can "push" new
+filtering Modules onto the event_server Stream and modify the
+application's behavior transparently to the other components.
+
+There are a bunch of features that aren't implemented in this
+prototype that you'd probably want to do for a "real" application.
+Some of the more interesting things to add would be:
+
+0. Complete "full-duplex" support, i.e., Peers could play the
+ role of Suppliers and Consumers simultaneously.
+
+1. Support for "commands", which would change the behavior
+ of the Event_Server based on messages it got from Suppliers
+ (or Consumers).
+
+3. Support for "pull" operations, as well as "push" operations.
+ This would basically involve adding a "MIB Module" to get/set
+ the "values" associated with "names" passed in by Peers. This
+ could probably replace the Event_Analysis Module.
+
+4. Filtering and correlation (this should probably be done
+ via a separate Module that handles filtering and correlation).
+
+5. More flexible concurrency model(s), e.g., "Active Object per-Consumer".
+ This would enable the Event Server process to handle flow control
+ more gracefully than it does not (it currently "hangs," which isn't
+ desirable).
+
diff --git a/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am b/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am
new file mode 100644
index 00000000000..f4ba3b5bf56
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am
@@ -0,0 +1,35 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.Transceiver.am
+
+noinst_PROGRAMS = Transceiver
+
+Transceiver_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+Transceiver_SOURCES = \
+ transceiver.cpp \
+ transceiver.h
+
+Transceiver_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc b/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc
new file mode 100644
index 00000000000..a6a3309727c
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc
@@ -0,0 +1,9 @@
+// -*- MPC -*-
+// $Id$
+
+project(*) : aceexe {
+ exename = Transceiver
+ Source_Files {
+ transceiver.cpp
+ }
+}
diff --git a/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp
new file mode 100644
index 00000000000..37bbaad7d3d
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp
@@ -0,0 +1,238 @@
+// $Id$
+
+// Test program for the event transceiver. This program can play the
+// role of either Consumer or Supplier. You can terminate this
+// program by typing ^C....
+
+#include "ace/OS_main.h"
+#include "ace/OS_NS_string.h"
+#include "ace/Service_Config.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Connector.h"
+#include "ace/Get_Opt.h"
+#include "ace/Signal.h"
+#include "ace/OS_NS_unistd.h"
+
+#include "transceiver.h"
+
+ACE_RCSID (Transceiver,
+ transceiver,
+ "$Id$")
+
+// Handle the command-line arguments.
+
+int
+Event_Transceiver::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("Ch:p:S"));
+
+ this->port_number_ = ACE_DEFAULT_SERVER_PORT;
+ this->host_name_ = ACE_DEFAULT_SERVER_HOST;
+ this->role_ = ACE_TEXT ("Supplier");
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'C':
+ this->role_ = ACE_TEXT ("Consumer");
+ break;
+ case 'h':
+ this->host_name_ = get_opt.opt_arg ();
+ break;
+ case 'p':
+ this->port_number_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'S':
+ this->role_ = ACE_TEXT ("Supplier");
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("usage: %n [-CS] [-h host_name] [-p portnum] \n")),
+ -1);
+ /* NOTREACHED */
+ break;
+ }
+
+ // Increment by 1 if we're the supplier to mirror the default
+ // behavior of the Event_Server (which sets the Consumer port to
+ // ACE_DEFAULT_SERVER_PORT and the Supplier port to
+ // ACE_DEFAULT_SERVER_PORT + 1). Note that this is kind of a
+ // hack...
+ if (ACE_OS::strcmp (this->role_, ACE_TEXT ("Supplier")) == 0
+ && this->port_number_ == ACE_DEFAULT_SERVER_PORT)
+ this->port_number_++;
+ return 0;
+}
+
+int
+Event_Transceiver::handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask)
+{
+ ACE_Reactor::instance ()->end_reactor_event_loop ();
+ return 0;
+}
+
+// Close down via SIGINT or SIGQUIT.
+
+int
+Event_Transceiver::handle_signal (int,
+ siginfo_t *,
+ ucontext_t *)
+{
+ ACE_Reactor::instance ()->end_reactor_event_loop ();
+ return 0;
+}
+
+Event_Transceiver::Event_Transceiver (void)
+{
+}
+
+Event_Transceiver::Event_Transceiver (int argc, ACE_TCHAR *argv[])
+{
+ if (this->parse_args (argc, argv) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("parse_args")));
+ else
+ {
+ ACE_Sig_Set sig_set;
+
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register to handle the SIGINT and SIGQUIT signals.
+ if (ACE_Reactor::instance ()->register_handler
+ (sig_set,
+ this) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_handler")));
+
+ // We need to register <this> here before we're connected since
+ // otherwise <get_handle> will return the connection socket
+ // handle for the peer.
+ else if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_stdin_handler")));
+
+ // Address of the server.
+ ACE_INET_Addr server_addr (this->port_number_,
+ this->host_name_);
+
+ ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
+
+ // We need a pointer here because connect takes a reference to a
+ // pointer!
+ Event_Transceiver *etp = this;
+
+ // Establish the connection to the Event Server.
+ if (connector.connect (etp,
+ server_addr) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ this->host_name_));
+ ACE_Reactor::instance()->remove_handler (sig_set);
+ ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance(),
+ ACE_Thread_Manager::instance());
+ }
+ }
+}
+
+int
+Event_Transceiver::open (void *)
+{
+ // Register ourselves to be notified when there's data to read on
+ // the socket.
+ if (ACE_Reactor::instance ()->register_handler
+ (this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_handler")),
+ -1);
+ return 0;
+}
+
+int
+Event_Transceiver::handle_input (ACE_HANDLE handle)
+{
+ // Determine whether we play the role of a consumer or a supplier.
+ if (handle == ACE_STDIN)
+ return this->transmitter ();
+ else
+ return this->receiver ();
+}
+
+int
+Event_Transceiver::transmitter (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) entering %s transmitter\n"),
+ this->role_));
+
+ char buf[BUFSIZ];
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+ int result = 0;
+
+ if (n <= 0 || this->peer ().send_n (buf, n) != n)
+ result = -1;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) leaving %s transmitter\n"),
+ this->role_));
+ return result;
+}
+
+int
+Event_Transceiver::receiver (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) entering %s receiver\n"),
+ this->role_));
+
+ char buf[BUFSIZ];
+
+ ssize_t n = this->peer ().recv (buf, sizeof buf);
+ int result = 0;
+
+ if (n <= 0
+ || ACE_OS::write (ACE_STDOUT, buf, n) != n)
+ result = -1;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) leaving %s receiver\n"),
+ this->role_));
+ return result;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (ACE_Service_Config::open (argv[0]) == -1
+ && errno != ENOENT)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("open")),
+ -1);
+
+ // Create and initialize the transceiver.
+ Event_Transceiver transceiver (argc, argv);
+
+ // Demonstrate how we can check if a constructor failed...
+ if (ACE_LOG_MSG->op_status () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Event_Transceiver constructor failed")),
+ -1);
+
+
+ // Run event loop until either the event server shuts down or we get
+ // a SIGINT.
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+ return 0;
+}
+
diff --git a/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h
new file mode 100644
index 00000000000..864b88a0b48
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h
@@ -0,0 +1,60 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef ACE_TRANSCEIVER_H
+#define ACE_TRANSCEIVER_H
+
+#include "ace/SOCK_Stream.h"
+#include "ace/Svc_Handler.h"
+
+class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+{
+ // = TITLE
+ // Generate and receives messages from the event server.
+ //
+ // = DESCRIPTION
+ // This class is both a consumer and supplier of events, i.e.,
+ // it's a ``transceiver.''
+public:
+ // = Initialization method.
+ Event_Transceiver (int argc, ACE_TCHAR *argv[]);
+ // Performs the actual initialization.
+
+ Event_Transceiver (void);
+ // No-op constructor (required by the <ACE_Connector>).
+
+ // = Svc_Handler hook called by the <ACE_Connector>.
+ virtual int open (void *);
+ // Initialize the transceiver when we are connected.
+
+ // = Demultplexing hooks from the <ACE_Reactor>.
+ virtual int handle_input (ACE_HANDLE);
+ // Receive data from STDIN or socket.
+
+ virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
+ // Close down via SIGINT.
+
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
+ // Close down the event loop.
+
+private:
+ int receiver (void);
+ // Reads data from socket and writes to ACE_STDOUT.
+
+ int transmitter (void);
+ // Writes data from ACE_STDIN to socket.
+
+ int parse_args (int argc, ACE_TCHAR *argv[]);
+ // Parse the command-line arguments.
+
+ u_short port_number_;
+ // Port number of event server.
+
+ const ACE_TCHAR *host_name_;
+ // Name of event server.
+
+ const ACE_TCHAR *role_;
+ // Are we playing the Consumer or Supplier role?
+};
+
+#endif /* ACE_TRANSCEIVER_H */
diff --git a/ACE/examples/ASX/Makefile.am b/ACE/examples/ASX/Makefile.am
new file mode 100644
index 00000000000..22df464dcfc
--- /dev/null
+++ b/ACE/examples/ASX/Makefile.am
@@ -0,0 +1,16 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+SUBDIRS = \
+ CCM_App \
+ Event_Server \
+ Message_Queue \
+ UPIPE_Event_Server
+
diff --git a/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc b/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc
new file mode 100644
index 00000000000..8c1853bcee7
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc
@@ -0,0 +1,25 @@
+// -*- MPC -*-
+// $Id$
+
+project(*Bounded_Buffer) : aceexe {
+ exename = bounded_buffer
+ Source_Files {
+ bounded_buffer.cpp
+ }
+}
+
+project(*Buffer_Stream) : aceexe {
+ avoids += uses_wchar
+ exename = buffer_stream
+ Source_Files {
+ buffer_stream.cpp
+ }
+}
+
+project(*Priority_Buffer) : aceexe {
+ exename = priority_buffer
+ Source_Files {
+ priority_buffer.cpp
+ }
+}
+
diff --git a/ACE/examples/ASX/Message_Queue/Makefile.am b/ACE/examples/ASX/Message_Queue/Makefile.am
new file mode 100644
index 00000000000..3305b17bf1f
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/Makefile.am
@@ -0,0 +1,67 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+
+## Makefile.ASX_Message_Queue_Bounded_Buffer.am
+
+noinst_PROGRAMS = bounded_buffer
+
+bounded_buffer_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+bounded_buffer_SOURCES = \
+ bounded_buffer.cpp
+
+bounded_buffer_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Makefile.ASX_Message_Queue_Buffer_Stream.am
+
+if !BUILD_USES_WCHAR
+
+noinst_PROGRAMS += buffer_stream
+
+buffer_stream_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+buffer_stream_SOURCES = \
+ buffer_stream.cpp
+
+buffer_stream_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_USES_WCHAR
+
+## Makefile.ASX_Message_Queue_Priority_Buffer.am
+
+noinst_PROGRAMS += priority_buffer
+
+priority_buffer_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+priority_buffer_SOURCES = \
+ priority_buffer.cpp
+
+priority_buffer_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp
new file mode 100644
index 00000000000..2cdc50f2116
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp
@@ -0,0 +1,140 @@
+// $Id$
+
+// This short program copies stdin to stdout via the use of an ASX
+// Message_Queue. It illustrates an implementation of the classic
+// "bounded buffer" program.
+
+#include "ace/Message_Queue.h"
+#include "ace/Thread_Manager.h"
+#include "ace/OS_NS_time.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID(Message_Queue, bounded_buffer, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);
+
+ n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());
+
+ if (n <= 0)
+ {
+ // Send a shutdown message to the other thread and exit.
+ mb->length (0);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
+ break;
+ }
+
+ // Send the message to the other thread.
+ else
+ {
+ mb->msg_priority (n);
+ mb->wr_ptr (n);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
+ }
+ }
+
+ return 0;
+}
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ int result = 0;
+
+ // Keep looping, reading a message out of the queue, until we timeout
+ // or get a message with a length == 0, which signals us to quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
+
+ result = msg_queue->dequeue_head (mb, &timeout);
+
+ if (result == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
+
+ mb->release ();
+
+ if (length == 0)
+ break;
+ }
+
+ if (result == -1 && errno == EWOULDBLOCK)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n%a",
+ "timed out waiting for message",
+ 1));
+ return 0;
+}
+
+// Spawn off two threads that copy stdin to stdout.
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Message list.
+ ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;
+
+ if (ACE_Thread_Manager::instance ()->spawn
+ (ACE_THR_FUNC (producer),
+ (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "spawn"),
+ 1);
+ else if (ACE_Thread_Manager::instance ()->spawn
+ (ACE_THR_FUNC (consumer),
+ (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "spawn"),
+ 1);
+
+ // Wait for producer and consumer threads to exit.
+ ACE_Thread_Manager::instance ()->wait ();
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/Message_Queue/buffer_stream.cpp b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp
new file mode 100644
index 00000000000..12f2a0303a1
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp
@@ -0,0 +1,314 @@
+// $Id$
+
+// This short program copies stdin to stdout via the use of an ASX
+// Stream. It illustrates an implementation of the classic "bounded
+// buffer" program using an ASX Stream containing two Modules. Each
+// ACE_Module contains two Tasks. Each ACE_Task contains a
+// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how
+// the use of these reusable components reduces the reliance on global
+// variables, as compared with the bounded_buffer.C example.
+
+#include "ace/OS_main.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_time.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Service_Config.h"
+#include "ace/Stream.h"
+#include "ace/Module.h"
+#include "ace/Task.h"
+
+ACE_RCSID(Message_Queue, buffer_stream, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
+typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
+
+class Common_Task : public MT_Task
+ // = TITLE
+ // Methods that are common to the producer and consumer.
+{
+public:
+ Common_Task (void) {}
+
+ //FUZZ: disable check_for_lack_ACE_OS
+ // ACE_Task hooks
+ virtual int open (void * = 0);
+ virtual int close (u_long = 0);
+ //FUZZ: enable check_for_lack_ACE_OS
+};
+
+// Define the Producer interface.
+
+class Producer : public Common_Task
+{
+public:
+ Producer (void) {}
+
+ // Read data from stdin and pass to consumer.
+ virtual int svc (void);
+};
+
+class Consumer : public Common_Task
+ // = TITLE
+ // Define the Consumer interface.
+{
+public:
+ Consumer (void) {}
+
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
+ // Enqueue the message on the ACE_Message_Queue for subsequent
+ // handling in the svc() method.
+
+ virtual int svc (void);
+ // Receive message from producer and print to stdout.
+
+private:
+
+ ACE_Time_Value timeout_;
+};
+
+class Filter : public MT_Task
+ // = TITLE
+ // Defines a Filter that prepends a line number in front of each
+ // line.
+{
+public:
+ Filter (void): count_ (1) {}
+
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
+ // Change the size of the message before passing it downstream.
+
+private:
+ size_t count_;
+ // Count the number of lines passing through the filter.
+};
+
+// Spawn off a new thread.
+
+int
+Common_Task::open (void *)
+{
+ if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("spawn")),
+ -1);
+ return 0;
+}
+
+int
+Common_Task::close (u_long exit_status)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),
+ exit_status,
+ this->name ()));
+
+ // Can do anything here that is required when a thread exits, e.g.,
+ // storing thread-specific information in some other storage
+ // location, etc.
+ return 0;
+}
+
+// The Consumer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+int
+Producer::svc (void)
+{
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new message (add one to avoid nasty boundary
+ // conditions).
+
+ ACE_Message_Block *mb = 0;
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ);
+
+ if (n <= 0)
+ {
+ // Send a shutdown message to the other thread and exit.
+ mb->length (0);
+
+ if (this->put_next (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("put_next")));
+ break;
+ }
+
+ // Send the message to the other thread.
+ else
+ {
+ mb->wr_ptr (n);
+ // NUL-terminate the string (since we use strlen() on it
+ // later).
+ mb->rd_ptr ()[n] = '\0';
+
+ if (this->put_next (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("put_next")));
+ }
+ }
+
+ return 0;
+}
+
+// Simply enqueue the Message_Block into the end of the queue.
+
+int
+Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+{
+ return this->putq (mb, tv);
+}
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// Consumer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+int
+Consumer::svc (void)
+{
+ int result = 0;
+
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb = 0;
+
+ // Wait for upto 4 seconds.
+ this->timeout_.sec (ACE_OS::time (0) + 4);
+
+ result = this->getq (mb, &this->timeout_);
+
+ if (result == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::write (ACE_STDOUT,
+ mb->rd_ptr (),
+ ACE_OS::strlen (mb->rd_ptr ()));
+
+ mb->release ();
+
+ if (length == 0)
+ break;
+ }
+
+ if (result == -1 && errno == EWOULDBLOCK)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n%a"),
+ ACE_TEXT ("timed out waiting for message"),
+ 1));
+ return 0;
+}
+
+int
+Filter::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
+{
+ if (mb->length () == 0)
+ return this->put_next (mb, tv);
+ else
+ {
+ char buf[BUFSIZ];
+
+ // Stash a copy of the buffer away.
+ ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf);
+
+ // Increase the size of the buffer large enough that it will be
+ // reallocated (in order to test the reallocation mechanisms).
+
+ mb->size (mb->length () + BUFSIZ);
+ mb->length (mb->size ());
+
+ // Prepend the line count in front of the buffer.
+ ACE_OS::sprintf (mb->rd_ptr (),
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ": %s",
+ this->count_++,
+ buf);
+ return this->put_next (mb, tv);
+ }
+}
+
+// Main driver function.
+
+int
+ACE_TMAIN (int, ACE_TCHAR *argv[])
+{
+ ACE_Service_Config daemon (argv[0]);
+
+ // This Stream controls hierachically-related active objects.
+ MT_Stream stream;
+
+ MT_Module *pm = 0;
+ MT_Module *fm = 0;
+ MT_Module *cm = 0;
+
+ ACE_NEW_RETURN (cm,
+ MT_Module (ACE_TEXT ("Consumer"),
+ new Consumer),
+ -1);
+ ACE_NEW_RETURN (fm,
+ MT_Module (ACE_TEXT ("Filter"),
+ new Filter),
+ -1);
+ ACE_NEW_RETURN (pm,
+ MT_Module (ACE_TEXT ("Producer"),
+ new Producer),
+ -1);
+
+ // Create Consumer, Filter, and Producer Modules and push them onto
+ // the Stream. All processing is performed in the Stream.
+
+ if (stream.push (cm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push")),
+ 1);
+ else if (stream.push (fm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push")),
+ 1);
+ else if (stream.push (pm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push")),
+ 1);
+ // Barrier synchronization: wait for the threads to exit, then exit
+ // ourselves.
+ ACE_Thread_Manager::instance ()->wait ();
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("threads not supported on this platform\n")));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/Message_Queue/priority_buffer.cpp b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp
new file mode 100644
index 00000000000..db60a33bcae
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp
@@ -0,0 +1,145 @@
+// $Id$
+
+// This short program prints the contents of stdin to stdout sorted by
+// the length of each line via the use of an ASX Message_Queue. It
+// illustrates how priorities can be used for ACE Message_Queues.
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/Malloc_Base.h" // To get ACE_Allocator
+#include "ace/Message_Queue.h"
+#include "ace/Read_Buffer.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Service_Config.h"
+
+ACE_RCSID(Message_Queue, priority_buffer, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+// Global thread manager.
+static ACE_Thread_Manager thr_mgr;
+
+// Make the queue be capable of being *very* large.
+static const long max_queue = LONG_MAX;
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *
+consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ if (msg_queue->dequeue_head (mb) == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::puts (mb->rd_ptr ());
+
+ // Free up the buffer memory and the Message_Block.
+ ACE_Allocator::instance ()->free (mb->rd_ptr ());
+ mb->release ();
+
+ if (length == 0)
+ break;
+ }
+
+ return 0;
+}
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ ACE_Read_Buffer rb (ACE_STDIN);
+
+ // Keep reading stdin, until we reach EOF.
+
+ for (;;)
+ {
+ // Allocate a new buffer.
+ char *buffer = rb.read ('\n');
+
+ ACE_Message_Block *mb;
+
+ if (buffer == 0)
+ {
+ // Send a 0-sized shutdown message to the other thread and
+ // exit.
+
+ ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0);
+
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ break;
+ }
+
+ // Enqueue the message in priority order.
+ else
+ {
+ // Allocate a new message, but have it "borrow" its memory
+ // from the buffer.
+ ACE_NEW_RETURN (mb, ACE_Message_Block (rb.size (),
+ ACE_Message_Block::MB_DATA,
+ 0,
+ buffer),
+ 0);
+ mb->msg_priority (rb.size ());
+ mb->wr_ptr (rb.size ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ "enqueueing message of size %d\n",
+ mb->msg_priority ()));
+
+ // Enqueue in priority order.
+ if (msg_queue->enqueue_prio (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ }
+ }
+
+ // Now read all the items out in priority order (i.e., ordered by
+ // the size of the lines!).
+ consumer (msg_queue);
+
+ return 0;
+}
+
+// Spawn off one thread that copies stdin to stdout in order of the
+// size of each line.
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Message queue.
+ ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
+
+ if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ // Wait for producer and consumer threads to exit.
+ thr_mgr.wait ();
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
new file mode 100644
index 00000000000..b9c9c0cf2bd
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
@@ -0,0 +1,138 @@
+// $Id$
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Consumer_Router.h"
+#include "Options.h"
+
+ACE_RCSID(UPIPE_Event_Server, Consumer_Router, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
+
+int
+Consumer_Handler::open (void *a)
+{
+ CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
+}
+
+Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
+{
+}
+
+// Create a new handler that will interact with a consumer and point
+// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
+
+Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
+ : CONSUMER_ROUTER (tm)
+{
+}
+
+// Initialize the Router..
+
+int
+Consumer_Router::open (void *)
+{
+ ACE_ASSERT (this->is_reader ());
+ ACE_TCHAR *argv[3];
+
+ argv[0] = (ACE_TCHAR *) this->name ();
+ argv[1] = (ACE_TCHAR *) options.consumer_file ();
+ argv[2] = 0;
+
+ if (this->init (1, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+ // return this->activate (options.t_flags ());
+
+ // Until that's done, return 1 to indicate that the object wasn't activated.
+ return 1;
+}
+
+int
+Consumer_Router::close (u_long)
+{
+ ACE_ASSERT (this->is_reader ());
+ this->peer_map_.close ();
+ this->msg_queue ()->deactivate();
+ return 0;
+}
+
+
+// Handle incoming messages in a separate thread..
+
+int
+Consumer_Router::svc (void)
+{
+ ACE_Message_Block *mb = 0;
+
+ ACE_ASSERT (this->is_reader ());
+
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting svc in %s\n"),
+ this->name ()));
+
+ while (this->getq (mb) > 0)
+ if (this->put_next (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) put_next failed in %s\n"),
+ this->name ()), -1);
+
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via destructor.
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s)..
+
+int
+Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ ACE_ASSERT (this->is_reader ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+{
+//printf("consumer-Router is routing : send_peers\n");
+ return this->send_peers (mb);
+}
+}
+
+// Return information about the Client_Router ACE_Module..
+
+int
+Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_UPIPE_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+ ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls")
+#else
+# define FMTSTR ACE_TEXT ("%s\t %s/ %s")
+#endif
+
+ ACE_OS::sprintf (buf, FMTSTR,
+ mod_name, ACE_TEXT ("upipe"),
+ ACE_TEXT ("# consumer router\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h
new file mode 100644
index 00000000000..93a1220dc11
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h
@@ -0,0 +1,53 @@
+/* -*- C++ -*- */
+// $Id$
+
+// The interface between one or more consumers and an Event Server
+// ACE_Stream.
+
+#ifndef _CONSUMER_ROUTER_H
+#define _CONSUMER_ROUTER_H
+
+#include "ace/Thread_Manager.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Addr.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Consumer_Handler; // Forward declaration....
+
+typedef ACE_HANDLE CONSUMER_KEY;
+
+typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER;
+
+class Consumer_Handler
+ : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>
+{
+public:
+ Consumer_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Consumer_Router : public CONSUMER_ROUTER
+{
+public:
+ Consumer_Router (ACE_Thread_Manager *thr_manager);
+
+protected:
+ // ACE_Task hooks..
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ // Dynamic linking hooks.
+ virtual int info (ACE_TCHAR **info_string, size_t length) const;
+};
+#endif /* ACE_HAS_THREADS */
+#endif /* _CONSUMER_ROUTER_H */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp
new file mode 100644
index 00000000000..689a9280766
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp
@@ -0,0 +1,73 @@
+// $Id$
+
+#include "ace/OS_NS_string.h"
+#include "Event_Analyzer.h"
+
+ACE_RCSID(UPIPE_Event_Server, Event_Analyzer, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+int
+Event_Analyzer::open (void *)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::close (u_long)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd;
+
+ switch (cmd = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ break;
+ }
+ return 0;
+}
+
+int
+Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ this->control (mb);
+
+ return this->put_next (mb);
+}
+
+int
+Event_Analyzer::init (int, ACE_TCHAR *[])
+{
+ return 0;
+}
+
+int
+Event_Analyzer::fini (void)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const
+{
+ const ACE_TCHAR *mod_name = this->name ();
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
new file mode 100644
index 00000000000..01bc3028964
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
@@ -0,0 +1,37 @@
+/* -*- C++ -*- */
+// $Id$
+
+// Signal router.
+
+#ifndef _EVENT_ANALYZER_H
+#define _EVENT_ANALYZER_H
+
+#include "ace/Stream.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Module.h"
+#include "ace/Task.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+
+ // Dynamic linking hooks.
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ virtual int fini (void);
+ virtual int info (ACE_TCHAR **info_string, size_t length) const;
+
+private:
+ virtual int control (ACE_Message_Block *);
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _EVENT_ANALYZER_H */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am b/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am
new file mode 100644
index 00000000000..f8ac89a7479
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am
@@ -0,0 +1,50 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ./bin/mwc.pl -type automake -noreldefs ACE.mwc
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+
+## Makefile.UPIPE_Event_Server.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS = UPIPE_Event_Server
+
+UPIPE_Event_Server_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+UPIPE_Event_Server_SOURCES = \
+ Consumer_Router.cpp \
+ Event_Analyzer.cpp \
+ Options.cpp \
+ Peer_Router.cpp \
+ Supplier_Router.cpp \
+ event_server.cpp \
+ Consumer_Router.h \
+ Event_Analyzer.h \
+ Options.h \
+ Options.inl \
+ Peer_Router.h \
+ Supplier_Router.h
+
+UPIPE_Event_Server_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp
new file mode 100644
index 00000000000..033132610aa
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp
@@ -0,0 +1,209 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_Thread.h"
+#include "ace/OS_NS_stdio.h"
+#if defined (ACE_HAS_TRACE)
+# include "ace/OS_NS_strings.h"
+#endif /* ACE_HAS_TRACE */
+
+#include "Options.h"
+
+ACE_RCSID(UPIPE_Event_Server, Options, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+Options::Options (void)
+ : thr_count_ (4),
+ t_flags_ (THR_DETACHED),
+ high_water_mark_ (8 * 1024),
+ low_water_mark_ (1024),
+ message_size_ (128),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ debugging_ (0),
+ verbosity_ (0),
+ consumer_port_ (ACE_TEXT ("-p 10000")),
+ supplier_port_ (ACE_TEXT ("-p 10001")),
+ consumer_file_ (ACE_TEXT ("-f/tmp/conupipe")),
+ supplier_file_ (ACE_TEXT ("-f/tmp/supupipe"))
+{
+}
+
+Options::~Options (void)
+{
+}
+
+void Options::print_results (void)
+{
+ ACE_Profile_Timer::ACE_Elapsed_Time et;
+ this->itimer_.elapsed_time (et);
+
+#if defined (ACE_HAS_PRUSAGE_T)
+ prusage_t rusage;
+ this->itimer_.get_rusage (rusage);
+
+ if (options.verbose ())
+ {
+ ACE_OS::printf ("final concurrency hint = %d\n", ACE_OS::thr_getconcurrency ());
+ ACE_OS::printf ("%8d = lwpid\n"
+ "%8d = lwp count\n"
+ "%8d = minor page faults\n"
+ "%8d = major page faults\n"
+ "%8d = input blocks\n"
+ "%8d = output blocks\n"
+ "%8d = messages sent\n"
+ "%8d = messages received\n"
+ "%8d = signals received\n"
+ "%8ds, %dms = wait-cpu (latency) time\n"
+ "%8ds, %dms = user lock wait sleep time\n"
+ "%8ds, %dms = all other sleep time\n"
+ "%8d = voluntary context switches\n"
+ "%8d = involuntary context switches\n"
+ "%8d = system calls\n"
+ "%8d = chars read/written\n",
+ (int) rusage.pr_lwpid,
+ (int) rusage.pr_count,
+ (int) rusage.pr_minf,
+ (int) rusage.pr_majf,
+ (int) rusage.pr_inblk,
+ (int) rusage.pr_oublk,
+ (int) rusage.pr_msnd,
+ (int) rusage.pr_mrcv,
+ (int) rusage.pr_sigs,
+ (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000,
+ (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000,
+ (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000,
+ (int) rusage.pr_vctx,
+ (int) rusage.pr_ictx,
+ (int) rusage.pr_sysc,
+ (int) rusage.pr_ioch);
+ }
+#endif /* ACE_HAS_PRUSAGE_T */
+
+ ACE_OS::printf ("---------------------\n"
+ "real time = %.3f\n"
+ "user time = %.3f\n"
+ "system time = %.3f\n"
+ "---------------------\n",
+ et.real_time, et.user_time, et.system_time);
+}
+
+// Manages the options.
+Options options;
+
+void
+Options::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_LOG_MSG->open (argv[0]);
+
+ //FUZZ: disable check_for_lack_ACE_OS
+ ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("C:c:bdH:i:L:l:M:nS:s:t:T:v"));
+ int c;
+
+ while ((c = getopt ()) != -1)
+ //FUZZ: enable check_for_lack_ACE_OS
+ switch (c)
+ {
+ case 'b':
+ this->t_flags (THR_BOUND);
+ break;
+ case 'C':
+ this->consumer_file (getopt.opt_arg ());
+ break;
+ case 'c':
+ this->consumer_port (getopt.opt_arg ());
+ break;
+ case 'd':
+ this->debugging_ = 1;
+ break;
+ case 'H':
+ this->high_water_mark (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'i':
+ this->iterations (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'L':
+ this->low_water_mark (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'l':
+ this->initial_queue_length (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'M':
+ this->message_size (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'n':
+ this->t_flags (THR_NEW_LWP);
+ break;
+ case 'S':
+ this->supplier_file (getopt.opt_arg ());
+ break;
+ case 's':
+ this->supplier_port (getopt.opt_arg ());
+ break;
+ case 'T':
+ #if defined (ACE_HAS_TRACE)
+ if (ACE_OS::strcasecmp (getopt.opt_arg (), ACE_TEXT ("ON")) == 0)
+ ACE_Trace::start_tracing ();
+ else if (ACE_OS::strcasecmp (getopt.opt_arg (), ACE_TEXT ("OFF")) == 0)
+ ACE_Trace::stop_tracing ();
+ #endif /* ACE_HAS_TRACE */
+ break;
+ case 't':
+ this->thr_count (ACE_OS::atoi (getopt.opt_arg ()));
+ break;
+ case 'v':
+ this->verbosity_ = 1;
+ break;
+ default:
+ ACE_OS::fprintf (stderr, "%s\n"
+ "\t[-b] (THR_BOUND)\n"
+ "\t[-C consumer file]\n"
+ "\t[-c consumer port]\n"
+ "\t[-d] (enable debugging)\n"
+ "\t[-H high water mark]\n"
+ "\t[-i number of test iterations]\n"
+ "\t[-L low water mark]\n"
+ "\t[-M] message size \n"
+ "\t[-n] (THR_NEW_LWP)\n"
+ "\t[-q max queue size]\n"
+ "\t[-S supplier file]\n"
+ "\t[-s supplier port]\n"
+ "\t[-t number of threads]\n"
+ "\t[-v] (verbose) \n",
+ ACE_TEXT_ALWAYS_CHAR (argv[0]));
+ ACE_OS::exit (1);
+ /* NOTREACHED */
+ break;
+ }
+
+ // HACK! This needs to be done to avoid the mismatch from ACE_TEXT
+ // in ACE_SIZE_T_FORMAT_SPECIFIER to narrow-char on wide-char builds.
+ // It only works because it's at the end of the file.
+# if defined (ACE_TEXT)
+# undef ACE_TEXT
+# endif
+# define ACE_TEXT(X) X
+ if (this->verbose ())
+ ACE_OS::printf ("%8d = initial concurrency hint\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n"
+ ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n"
+ "%8d = THR_BOUND\n"
+ "%8d = THR_NEW_LWP\n",
+ ACE_OS::thr_getconcurrency (),
+ this->iterations (),
+ this->thr_count (),
+ this->low_water_mark (),
+ this->high_water_mark (),
+ this->message_size (),
+ this->initial_queue_length (),
+ (this->t_flags () & THR_BOUND) != 0,
+ (this->t_flags () & THR_NEW_LWP) != 0);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.h b/ACE/examples/ASX/UPIPE_Event_Server/Options.h
new file mode 100644
index 00000000000..d775a49a928
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.h
@@ -0,0 +1,88 @@
+/* -*- C++ -*- */
+// $Id$
+
+// Option manager for Event Server.
+
+#ifndef DEVICE_OPTIONS_H
+#define DEVICE_OPTIONS_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Profile_Timer.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Options
+{
+public:
+ Options (void);
+ ~Options (void);
+ void parse_args (int argc, ACE_TCHAR *argv[]);
+
+ void stop_timer (void);
+ void start_timer (void);
+
+ void thr_count (size_t count);
+ size_t thr_count (void);
+
+ void initial_queue_length (size_t length);
+ size_t initial_queue_length (void);
+
+ void high_water_mark (size_t size);
+ size_t high_water_mark (void);
+
+ void low_water_mark (size_t size);
+ size_t low_water_mark (void);
+
+ void message_size (size_t size);
+ size_t message_size (void);
+
+ void iterations (size_t n);
+ size_t iterations (void);
+
+ void t_flags (long flag);
+ long t_flags (void);
+
+ void supplier_port (const ACE_TCHAR *port);
+ const ACE_TCHAR *supplier_port (void);
+
+ void consumer_port (const ACE_TCHAR *port);
+ const ACE_TCHAR *consumer_port (void);
+
+ void supplier_file (const ACE_TCHAR *file);
+ const ACE_TCHAR *supplier_file (void);
+
+ void consumer_file (const ACE_TCHAR *file);
+ const ACE_TCHAR *consumer_file (void);
+
+ int debug (void);
+ int verbose (void);
+
+ void print_results (void);
+
+private:
+ ACE_Profile_Timer itimer_; // Time the process.
+ size_t thr_count_; // Number of threads to spawn.
+ long t_flags_; // Flags to thr_create().
+ size_t high_water_mark_; // ACE_Task high water mark.
+ size_t low_water_mark_; // ACE_Task low water mark.
+ size_t message_size_; // Size of a message.
+ size_t initial_queue_length_; // Initial number of items in the queue.
+ size_t iterations_; // Number of iterations to run the test program.
+ int debugging_; // Extra debugging info.
+ int verbosity_; // Extra verbose messages.
+ const ACE_TCHAR *consumer_port_; // Port that the Consumer_Router is using.
+ const ACE_TCHAR *supplier_port_; // Port that the Supplier_Router is using.
+ const ACE_TCHAR *consumer_file_; // file that the Consumer_Router is using.
+ const ACE_TCHAR *supplier_file_; // file that the Supplier_Router is using.
+};
+
+extern Options options;
+
+#include "Options.inl"
+#endif /* ACE_HAS_THREADS */
+#endif /* DEVICE_OPTIONS_H */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.inl b/ACE/examples/ASX/UPIPE_Event_Server/Options.inl
new file mode 100644
index 00000000000..af04f73eb26
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.inl
@@ -0,0 +1,166 @@
+/* -*- C++ -*- */
+// $Id$
+
+// Option manager for ustreams.
+
+// Since this is only included in Options.h these should stay
+// inline, not ACE_INLINE.
+// FUZZ: disable check_for_inline
+
+
+inline void
+Options::supplier_port (const ACE_TCHAR *port)
+{
+ this->supplier_port_ = port;
+}
+
+inline const ACE_TCHAR *
+Options::supplier_port (void)
+{
+ return this->supplier_port_;
+}
+
+inline void
+Options::supplier_file (const ACE_TCHAR *file)
+{
+ this->supplier_file_ = file;
+}
+
+inline const ACE_TCHAR *
+Options::supplier_file (void)
+{
+ return this->supplier_file_;
+}
+
+inline void
+Options::consumer_file (const ACE_TCHAR *file)
+{
+ this->consumer_file_ = file;
+}
+
+inline const ACE_TCHAR *
+Options::consumer_file (void)
+{
+ return this->consumer_file_;
+}
+
+inline void
+Options::consumer_port (const ACE_TCHAR *port)
+{
+ this->consumer_port_ = port;
+}
+
+inline const ACE_TCHAR *
+Options::consumer_port (void)
+{
+ return this->consumer_port_;
+}
+
+inline void
+Options::start_timer (void)
+{
+ this->itimer_.start ();
+}
+
+inline void
+Options::stop_timer (void)
+{
+ this->itimer_.stop ();
+}
+
+inline void
+Options::thr_count (size_t count)
+{
+ this->thr_count_ = count;
+}
+
+inline size_t
+Options::thr_count (void)
+{
+ return this->thr_count_;
+}
+
+inline void
+Options::initial_queue_length (size_t length)
+{
+ this->initial_queue_length_ = length;
+}
+
+inline size_t
+Options::initial_queue_length (void)
+{
+ return this->initial_queue_length_;
+}
+
+inline void
+Options::high_water_mark (size_t size)
+{
+ this->high_water_mark_ = size;
+}
+
+inline size_t
+Options::high_water_mark (void)
+{
+ return this->high_water_mark_;
+}
+
+inline void
+Options::low_water_mark (size_t size)
+{
+ this->low_water_mark_ = size;
+}
+
+inline size_t
+Options::low_water_mark (void)
+{
+ return this->low_water_mark_;
+}
+
+inline void
+Options::message_size (size_t size)
+{
+ this->message_size_ = size;
+}
+
+inline size_t
+Options::message_size (void)
+{
+ return this->message_size_;
+}
+
+inline void
+Options::iterations (size_t n)
+{
+ this->iterations_ = n;
+}
+
+inline size_t
+Options::iterations (void)
+{
+ return this->iterations_;
+}
+
+inline void
+Options::t_flags (long flag)
+{
+ this->t_flags_ |= flag;
+}
+
+inline long
+Options::t_flags (void)
+{
+ return this->t_flags_;
+}
+
+inline int
+Options::debug (void)
+{
+ return this->debugging_;
+}
+
+inline int
+Options::verbose (void)
+{
+ return this->verbosity_;
+}
+
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
new file mode 100644
index 00000000000..757eecedc33
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
@@ -0,0 +1,283 @@
+// $Id$
+
+#if !defined (_PEER_ROUTER_C)
+
+#define _PEER_ROUTER_C
+
+#include "ace/Get_Opt.h"
+#include "ace/Service_Config.h"
+
+#include "Peer_Router.h"
+#include "Options.h"
+
+ACE_RCSID(UPIPE_Event_Server, Peer_Router, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+// Define some short-hand macros to deal with long templates
+// names...
+
+#define PH PEER_HANDLER
+#define PA PEER_ACCEPTOR
+#define PAD PEER_ADDR
+#define PK PEER_KEY
+#define PM PEER_MAP
+
+template <class PH, class PK> int
+Acceptor_Factory<PH, PK>::init (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("df:"), 0);
+ ACE_UPIPE_Addr addr;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'f':
+ addr.set (get_opt.opt_arg ());
+ break;
+ case 'd':
+ break;
+ default:
+ break;
+ }
+
+ if (this->open (addr, ACE_Reactor::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1);
+ return 0;
+}
+
+template <class PH, class PK>
+Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
+ : pr_ (pr)
+{
+}
+
+template <class PH, class PK> Peer_Router<PH, PK> *
+Acceptor_Factory<PH, PK>::router (void)
+{
+ return this->pr_;
+}
+
+template <class ROUTER, class KEY>
+Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
+ : ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH> (tm)
+{
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::svc (void)
+{
+ // Just a try !! we're just reading from our ACE_Message_Queue.
+ ACE_Message_Block *db, *hb;
+ int n;
+ // do an endless loop
+ for (;;)
+ {
+ db = new ACE_Message_Block (BUFSIZ);
+ hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
+
+ if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("recv failed")), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("unbind failed")), -1);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down \n")));
+ return -1; // We do not need to be deregistered by reactor
+ // as we were not registered at all
+ }
+ else // Transform incoming buffer into a Message and pass downstream.
+ {
+ db->wr_ptr (n);
+ *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment.
+ hb->wr_ptr (sizeof (long));
+ if (this->router_task_->reply (hb) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Peer_Handler.svc : router_task->reply failed\n")));
+ return -1;
+ }
+
+ // return this->router_task_->reply (hb) == -1 ? -1 : 0;
+ }
+ }
+ ACE_NOTREACHED(return 0);
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ return this->peer ().send_n (mb->rd_ptr (), mb->length ());
+}
+
+// Create a new handler and point its ROUTER_TASK_ data member to the
+// corresponding router. Note that this router is extracted out of
+// the Acceptor_Factory * that is passed in via the
+// ACE_Acceptor::handle_input() method.
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::open (void *a)
+{
+ ACE_TCHAR buf[BUFSIZ], *p = buf;
+
+ if (this->router_task_->info (&p, sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) creating handler for %s, fd = %d, this = %@\n"),
+ buf, this->get_handle (), a));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("info")), -1);
+
+ if ( this->activate (options.t_flags ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("activation of thread failed")), -1);
+ else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("bind_peer")), -1);
+ return 0;
+}
+
+// Receive a message from a supplier..
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
+{
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) input arrived on sd %d\n"), h));
+// ACE_Reactor::instance ()->remove_handler(h,
+// ACE_Event_Handler::ALL_EVENTS_MASK
+// |ACE_Event_Handler::DONT_CALL);
+// this method should be called only if the peer shuts down
+// so we deactivate our ACE_Message_Queue to awake our svc thread
+
+ return 0;
+
+#if 0
+ ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
+ int n;
+
+ if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("recv failed")), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("unbind failed")), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
+ return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
+ }
+ else // Transform incoming buffer into a Message and pass downstream.
+ {
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
+ hb->wr_ptr (sizeof (long));
+ return this->router_task_->reply (hb) == -1 ? -1 : 0;
+ }
+#endif
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
+ : ACE_Task<ACE_MT_SYNCH> (tm)
+{
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
+{
+ ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+ ACE_Message_Block *data_block = mb->cont ();
+ for (ACE_Map_Entry<PK, PH *> *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) sending to peer via sd %d\n"),
+ ss->ext_id_));
+
+ iterations++;
+ bytes += ss->int_id_->put (data_block);
+ }
+
+ mb->release ();
+ return bytes == 0 ? 0 : bytes / iterations;
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::~Peer_Router (void)
+{
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::fini (void)
+{
+ delete this->acceptor_;
+ return 0;
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
+
+ switch (command = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ return -1;
+ }
+ return 0;
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::unbind_peer (PK key)
+{
+ return this->peer_map_.unbind (key);
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
+{
+ PH *peer_handler = (PH *) ph;
+ return this->peer_map_.bind (key, peer_handler);
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::init (int argc, ACE_TCHAR *argv[])
+{
+ this->acceptor_ = new Acceptor_Factory <PH, PK> (this);
+
+ if (this->acceptor_->init (argc, argv) == -1
+ || this->peer_map_.open () == -1)
+ return -1;
+ else
+ {
+ ACE_UPIPE_Addr addr;
+ ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor ();
+
+ if (pa.get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) initializing %s, file = %s, fd = %d, this = %@\n"),
+ this->name (), addr.get_path_name (), pa.get_handle (), this));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("get_local_addr")), -1);
+ }
+ return 0;
+}
+
+#undef PH
+#undef PA
+#undef PAD
+#undef PK
+#undef PM
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_C */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h
new file mode 100644
index 00000000000..3962d371ae0
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h
@@ -0,0 +1,127 @@
+/* -*- C++ -*- */
+// $Id$
+
+// The interface between one or more peers and a stream. A peer
+// typically runs remotely on another machine.
+
+#ifndef _PEER_ROUTER_H
+#define _PEER_ROUTER_H
+
+#include "ace/Acceptor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Svc_Handler.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Addr.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Map_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+#include "ace/RW_Mutex.h"
+
+// Forward declaration.
+template <class PEER_HANDLER, class KEY>
+class Peer_Router;
+
+template <class PEER_HANDLER, class KEY>
+class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_UPIPE_ACCEPTOR>
+{
+public:
+ Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr);
+ Peer_Router<PEER_HANDLER, KEY> *router (void);
+
+ int init (int argc, ACE_TCHAR *argv[]);
+ // Initialize the acceptor when it's linked dynamically.
+
+private:
+ Peer_Router<PEER_HANDLER, KEY> *pr_;
+};
+
+// Receive input from a Peer..
+template <class ROUTER, class KEY>
+class Peer_Handler : public ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH>
+{
+public:
+ Peer_Handler (ACE_Thread_Manager * = 0);
+
+ virtual int open (void * = 0);
+ // Called by the ACE_Acceptor::handle_input() to activate this object.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive input from the peer..
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ // Send output to a peer.
+
+protected:
+ ROUTER *router_task_;
+ // Pointer to write task..
+
+private:
+ // Don't need this method here...
+ virtual int svc (void);
+};
+
+// This abstract base class provides mechanisms for routing messages
+// to/from a ACE_Stream from/to one or more peers (which are typically
+// running on remote hosts). A subclass of Peer_Router overrides the
+// open(), close(), and put() methods in order to specialize the
+// behavior of the router to meet application-specific requirements.
+
+template <class PEER_HANDLER, class PEER_KEY>
+class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Peer_Router (ACE_Thread_Manager * = 0);
+ ~Peer_Router (void);
+
+ typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER;
+
+ // Remove a PEER_HANDLER from the PEER_MAP.
+ virtual int unbind_peer (PEER_KEY);
+
+ // Add a PEER_HANDLER to the PEER_MAP.
+ virtual int bind_peer (PEER_KEY, HANDLER *);
+
+ // Send the message block to the peer(s)..
+ int send_peers (ACE_Message_Block *mb);
+
+protected:
+// Handle control messages arriving from adjacent Modules.
+ virtual int control (ACE_Message_Block *);
+
+ // Map used to keep track of active peers.
+ ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> peer_map_;
+
+ // Dynamic linking initialization hooks inherited from ACE_Task.
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ virtual int fini (void);
+
+ // Factory for accepting new PEER_HANDLERs.
+ Acceptor_Factory<PEER_HANDLER, PEER_KEY> *acceptor_;
+
+private:
+// Prevent copies and pass-by-value.
+ ACE_UNIMPLEMENTED_FUNC (Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &))
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &))
+};
+
+#if defined (__ACE_INLINE__)
+#define ACE_INLINE inline
+#else
+#define ACE_INLINE
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Peer_Router.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Peer_Router.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_H */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp
new file mode 100644
index 00000000000..8b4a53d6331
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp
@@ -0,0 +1,137 @@
+// $Id$
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Options.h"
+#include "Supplier_Router.h"
+
+ACE_RCSID(UPIPE_Event_Server, Supplier_Router, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
+
+int
+Supplier_Handler::open (void *a)
+{
+ SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
+}
+
+Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
+{
+}
+
+// Create a new router and associate it with the REACTOR parameter..
+
+Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
+ : SUPPLIER_ROUTER (tm)
+{
+}
+
+// Handle incoming messages in a separate thread..
+
+int
+Supplier_Router::svc (void)
+{
+ ACE_ASSERT (this->is_writer ());
+
+ ACE_Message_Block *message_block = 0;
+
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ()));
+
+ while (this->getq (message_block) > 0)
+ {
+ if (this->put_next (message_block) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1);
+ }
+
+ return 0;
+}
+
+// Initialize the Router..
+
+int
+Supplier_Router::open (void *)
+{
+ ACE_ASSERT (this->is_writer ());
+ ACE_TCHAR *argv[3];
+
+ argv[0] = (ACE_TCHAR *)this->name ();
+ argv[1] = (ACE_TCHAR *)options.supplier_file ();
+ argv[2] = 0;
+
+ if (this->init (1, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+ // return this->activate (options.t_flags ());
+
+ // Until that's done, return 1 to indicate that the object wasn't activated.
+ return 1;
+}
+
+// Close down the router..
+
+int
+Supplier_Router::close (u_long)
+{
+ ACE_ASSERT (this->is_writer ());
+ this->peer_map_.close ();
+ this->msg_queue ()->deactivate();
+ return 0;
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s)..
+
+int
+Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ ACE_ASSERT (this->is_writer ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+ {
+//printf("supplier-Router is routing: send_peers\n");
+ return this->send_peers (mb);
+ }
+}
+
+// Return information about the Supplier_Router ACE_Module..
+
+int
+Supplier_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_UPIPE_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+ ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls")
+#else
+# define FMTSTR ACE_TEXT ("%s\t %s/ %s")
+#endif
+
+ ACE_OS::sprintf (buf, FMTSTR,
+ mod_name, ACE_TEXT ("upipe"),
+ ACE_TEXT ("# supplier router\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h
new file mode 100644
index 00000000000..4d5d440e018
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h
@@ -0,0 +1,57 @@
+/* -*- C++ -*- */
+// $Id$
+
+// The interface between a supplier and an Event Service ACE_Stream.
+
+#ifndef _SUPPLIER_ROUTER_H
+#define _SUPPLIER_ROUTER_H
+
+#include "ace/UPIPE_Addr.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/Map_Manager.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Forward declaration.
+class Supplier_Handler;
+
+// Type of search key for SUPPLIER_MAP.
+typedef ACE_HANDLE SUPPLIER_KEY;
+
+// Instantiated type for routing messages to suppliers.
+
+typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER;
+
+class Supplier_Handler
+ : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>
+{
+public:
+ Supplier_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Supplier_Router : public SUPPLIER_ROUTER
+{
+public:
+ Supplier_Router (ACE_Thread_Manager *);
+
+protected:
+ // ACE_Task hooks..
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ // Dynamic linking hooks inherited from Peer_Router.
+ virtual int info (ACE_TCHAR **info_string, size_t length) const;
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _SUPPLIER_ROUTER_H */
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc b/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc
new file mode 100644
index 00000000000..a4c93dc1fdb
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc
@@ -0,0 +1,15 @@
+// -*- MPC -*-
+// $Id$
+
+project(*Server) : aceexe {
+ avoids += ace_for_tao
+ exename = UPIPE_Event_Server
+ Source_Files {
+ Consumer_Router.cpp
+ Event_Analyzer.cpp
+ Options.cpp
+ Peer_Router.cpp
+ Supplier_Router.cpp
+ event_server.cpp
+ }
+}
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp b/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp
new file mode 100644
index 00000000000..91ae382fac1
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp
@@ -0,0 +1,271 @@
+// $Id$
+
+// Test the event server.
+
+#include "ace/OS_main.h"
+#include "ace/Stream.h"
+#include "ace/Service_Config.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Connector.h"
+
+// FUZZ: disable check_for_streams_include
+#include "ace/streams.h"
+
+#include "Options.h"
+#include "Consumer_Router.h"
+#include "Event_Analyzer.h"
+#include "Supplier_Router.h"
+#include "ace/Sig_Adapter.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID (UPIPE_Event_Server,
+ event_server,
+ "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
+
+// Handle SIGINT and terminate the entire application.
+
+class Quit_Handler : public ACE_Sig_Adapter
+{
+public:
+ Quit_Handler (void);
+ virtual int handle_input (ACE_HANDLE fd);
+};
+
+Quit_Handler::Quit_Handler (void)
+ : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
+{
+ // Register to trap input from the user.
+ if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
+ // Register to trap the SIGINT signal.
+ else if (ACE_Reactor::instance ()->register_handler
+ (SIGINT, this) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
+}
+
+int
+Quit_Handler::handle_input (ACE_HANDLE)
+{
+ options.stop_timer ();
+ ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n"));
+ options.print_results ();
+
+ ACE_Reactor::end_event_loop();
+ return 0;
+}
+
+static void *
+consumer (void *)
+{
+ ACE_UPIPE_Stream c_stream;
+ ACE_UPIPE_Addr c_addr (ACE_TEXT ("/tmp/conupipe"));
+
+ int verb = options.verbose ();
+ int msiz = options.message_size ();
+ time_t secs, par1, par2;
+ time_t currsec;
+
+ if (verb)
+ cout << "consumer starting connect" << endl;
+
+ ACE_UPIPE_Connector con;
+
+ if (con.connect (c_stream, c_addr) == -1)
+ ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
+ else
+ cout << "consumer :we're connected" << endl;
+
+ ACE_Message_Block *mb_p;
+
+ int done = 0;
+ int cnt = 0;
+ ACE_OS::time (&currsec);
+
+ par1 = currsec;
+
+ while (done == 0
+ && (c_stream.recv (mb_p) != -1))
+ if (mb_p->length () > 1)
+ {
+ cnt++;
+ if (verb)
+ cout << " consumer received message !!!!!! "
+ << mb_p->rd_ptr () << endl;
+ }
+ else
+ {
+ if (verb)
+ cout << "consumer got last mb"
+ << (char) * (mb_p->rd_ptr ()) << endl;
+ c_stream.close ();
+ done = 1;
+ }
+
+ ACE_OS::time (&currsec);
+ par2 = currsec;
+
+ secs = par2 - par1;
+
+ if (secs <= 0)
+ secs=1;
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("consumer got %d messages of size %d ")
+ ACE_TEXT ("within %: seconds\n"),
+ cnt, msiz, secs));
+
+ ACE_OS::sleep (2);
+ cout << "consumer terminating " << endl;
+ return 0;
+}
+
+static void *
+supplier (void *dummy)
+{
+ ACE_UPIPE_Stream s_stream;
+ ACE_UPIPE_Addr serv_addr (ACE_TEXT ("/tmp/supupipe"));
+ ACE_UPIPE_Connector con;
+
+ int iter = options.iterations ();
+ int verb = options.verbose ();
+ int msiz = options.message_size ();
+ cout << "supplier starting connect" << endl;
+
+ if (con.connect (s_stream, serv_addr) == -1)
+ ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
+
+ cout << "supplier : we're connected" << endl;
+ int n;
+ n = 0;
+ ACE_Message_Block * mb_p;
+
+ while (n < iter)
+ {
+ mb_p = new ACE_Message_Block (msiz);
+ ACE_OS::strcpy (mb_p->rd_ptr (), (char *) dummy);
+ mb_p->length (msiz);
+ if (verb)
+ cout << "supplier sending 1 message_block" << endl;
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send failed" << endl;
+ return (void *) -1;
+ }
+ n++;
+ }
+
+ mb_p = new ACE_Message_Block (10);
+ mb_p->length (1);
+ *mb_p->rd_ptr () = 'g';
+
+ cout << "supplier sending last message_block" << endl;
+
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send last mb failed" << endl;
+ return (void *) -1;
+ }
+ mb_p = new ACE_Message_Block (10);
+ mb_p->length (0);
+
+ if (verb)
+ cout << "supplier sending very last message_block" << endl;
+
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send very last mb failed" << endl;
+ return (void *) -1;
+ }
+
+ ACE_OS::sleep (2);
+ cout << "supplier terminating" << endl;
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Service_Config daemon;
+
+ options.parse_args (argc, argv);
+ options.start_timer ();
+
+ // Primary ACE_Stream for EVENT_SERVER application.
+ MT_Stream event_server;
+
+ // Enable graceful shutdowns....
+ Quit_Handler quit_handler;
+
+ // Create the modules..
+
+ MT_Module *sr = new MT_Module (ACE_TEXT ("Supplier_Router"),
+ new Supplier_Router (ACE_Thread_Manager::instance ()));
+ MT_Module *ea = new MT_Module (ACE_TEXT ("Event_Analyzer"),
+ new Event_Analyzer,
+ new Event_Analyzer);
+ MT_Module *cr = new MT_Module (ACE_TEXT ("Consumer_Router"),
+ 0, // 0 triggers the creation of a ACE_Thru_Task...
+ new Consumer_Router (ACE_Thread_Manager::instance ()));
+
+ // Push the modules onto the event_server stream.
+
+ if (event_server.push (sr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Supplier_Router)")), -1);
+
+ if (event_server.push (ea) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Event_Analyzer)")), -1);
+
+ if (event_server.push (cr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (Consumer_Router)")), -1);
+
+ // Set the high and low water marks appropriately.
+
+ int wm = options.low_water_mark ();
+
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (setting low watermark)")), -1);
+
+ wm = options.high_water_mark ();
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("push (setting high watermark)")), -1);
+
+ // spawn the two threads.
+
+ if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer), (void *) 0,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
+
+ else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello",
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
+
+ // Perform the main event loop waiting for the user to type ^C or to
+ // enter a line on the ACE_STDIN.
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("main exiting\n")));
+
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("test not defined for this platform\n")),
+ -1);
+}
+#endif /* ACE_HAS_THREADS */