summaryrefslogtreecommitdiff
path: root/examples/ASX
diff options
context:
space:
mode:
Diffstat (limited to 'examples/ASX')
-rw-r--r--examples/ASX/CCM_App/CCM_App.cpp123
-rw-r--r--examples/ASX/CCM_App/Makefile176
-rw-r--r--examples/ASX/CCM_App/SC_Client.cpp9
-rw-r--r--examples/ASX/CCM_App/SC_Server.cpp63
-rw-r--r--examples/ASX/CCM_App/svc.conf21
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp132
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.h46
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp68
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.h34
-rw-r--r--examples/ASX/Event_Server/Event_Server/Makefile433
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.cpp186
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.h75
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.i137
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp279
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.h121
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp134
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.h51
-rw-r--r--examples/ASX/Event_Server/Event_Server/event_server.cpp125
-rw-r--r--examples/ASX/Event_Server/Makefile23
-rw-r--r--examples/ASX/Event_Server/README38
-rw-r--r--examples/ASX/Event_Server/Transceiver/Makefile135
-rw-r--r--examples/ASX/Event_Server/Transceiver/transceiver.cpp187
-rw-r--r--examples/ASX/Makefile25
-rw-r--r--examples/ASX/Message_Queue/Makefile218
-rw-r--r--examples/ASX/Message_Queue/bounded_buffer.cpp130
-rw-r--r--examples/ASX/Message_Queue/buffer_stream.cpp215
-rw-r--r--examples/ASX/Message_Queue/priority_buffer.cpp139
-rw-r--r--examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp126
-rw-r--r--examples/ASX/UPIPE_Event_Server/Consumer_Router.h48
-rw-r--r--examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp68
-rw-r--r--examples/ASX/UPIPE_Event_Server/Event_Analyzer.h34
-rw-r--r--examples/ASX/UPIPE_Event_Server/Makefile455
-rw-r--r--examples/ASX/UPIPE_Event_Server/Options.cpp191
-rw-r--r--examples/ASX/UPIPE_Event_Server/Options.h83
-rw-r--r--examples/ASX/UPIPE_Event_Server/Options.i161
-rw-r--r--examples/ASX/UPIPE_Event_Server/Peer_Router.cpp273
-rw-r--r--examples/ASX/UPIPE_Event_Server/Peer_Router.h116
-rw-r--r--examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp126
-rw-r--r--examples/ASX/UPIPE_Event_Server/Supplier_Router.h52
-rw-r--r--examples/ASX/UPIPE_Event_Server/event_server.cpp252
40 files changed, 5308 insertions, 0 deletions
diff --git a/examples/ASX/CCM_App/CCM_App.cpp b/examples/ASX/CCM_App/CCM_App.cpp
new file mode 100644
index 00000000000..218b3037be2
--- /dev/null
+++ b/examples/ASX/CCM_App/CCM_App.cpp
@@ -0,0 +1,123 @@
+#define ACE_BUILD_SVC_DLL
+// @(#)CCM_App.cpp 1.1 10/18/96
+
+#include "ace/Stream.h"
+#include "ace/Task.h"
+#include "ace/Module.h"
+
+typedef ACE_Task<ACE_SYNCH> MT_Task;
+typedef ACE_Stream<ACE_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_SYNCH> MT_Module;
+
+class ACE_Svc_Export Test_Task : public MT_Task
+{
+public:
+ virtual int open (void *);
+ virtual int close (u_long);
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ virtual int svc (void);
+ virtual int info (char **, size_t) const;
+ virtual int init (int, char *[]);
+ virtual int fini (void);
+ virtual int suspend (void);
+ virtual int resume (void);
+};
+
+int
+Test_Task::open (void *)
+{
+ ACE_DEBUG ((LM_DEBUG, "opening %s\n", this->name () ? this->name () : "task"));
+ return 0;
+}
+
+int
+Test_Task::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "closing %s\n", this->name () ? this->name () : "task"));
+ return 0;
+}
+
+int
+Test_Task::suspend (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "suspending in %s\n", this->name () ? this->name () : "task"));
+ return 0;
+}
+
+int
+Test_Task::resume (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "resuming in %s\n", this->name () ? this->name () : "task"));
+ return 0;
+}
+
+int
+Test_Task::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ return 0;
+}
+
+int
+Test_Task::svc (void)
+{
+ return 0;
+}
+
+int
+Test_Task::info (char **, size_t) const
+{
+ return 0;
+}
+
+int
+Test_Task::init (int, char *[])
+{
+ ACE_DEBUG ((LM_DEBUG, "initializing %s\n", this->name () ? this->name () : "task"));
+
+ return 0;
+}
+
+int
+Test_Task::fini (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "finalizing %s\n", this->name () ? this->name () : "task"));
+ return 0;
+}
+
+// Dynamically linked functions used to control configuration.
+
+extern "C" ACE_Svc_Export MT_Stream *make_stream (void);
+extern "C" ACE_Svc_Export MT_Module *make_da (void);
+extern "C" ACE_Svc_Export MT_Module *make_ea (void);
+extern "C" ACE_Svc_Export MT_Module *make_mr (void);
+extern "C" ACE_Svc_Export ACE_Service_Object *make_task (void);
+
+ACE_Service_Object *
+make_task (void)
+{
+ return new Test_Task;
+}
+
+MT_Stream *
+make_stream (void)
+{
+ return new MT_Stream;
+}
+
+MT_Module *
+make_da (void)
+{
+ return new MT_Module ("Device_Adapter", new Test_Task, new Test_Task);
+}
+
+MT_Module *
+make_ea (void)
+{
+ return new MT_Module ("Event_Analyzer", new Test_Task, new Test_Task);
+}
+
+MT_Module *
+make_mr (void)
+{
+ return new MT_Module ("Multicast_Router", new Test_Task, new Test_Task);
+}
diff --git a/examples/ASX/CCM_App/Makefile b/examples/ASX/CCM_App/Makefile
new file mode 100644
index 00000000000..3902aef1235
--- /dev/null
+++ b/examples/ASX/CCM_App/Makefile
@@ -0,0 +1,176 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for CCM tests
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = SC_Client \
+ SC_Server
+
+LSRC = $(addsuffix .cpp,$(BIN)) \
+ CCM_App.cpp
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VSHLIB) $(SHLIBA) $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/SC_Client.o .shobj/SC_Client.so: SC_Client.cpp
+.obj/SC_Server.o .shobj/SC_Server.so: SC_Server.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
+.obj/CCM_App.o .shobj/CCM_App.so: CCM_App.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/ASX/CCM_App/SC_Client.cpp b/examples/ASX/CCM_App/SC_Client.cpp
new file mode 100644
index 00000000000..498ff89ba9b
--- /dev/null
+++ b/examples/ASX/CCM_App/SC_Client.cpp
@@ -0,0 +1,9 @@
+// Pretty simple, eh? ;-)
+// @(#)SC_Client.cpp 1.1 10/18/96
+
+
+int
+main (int, char *[])
+{
+ return 0;
+}
diff --git a/examples/ASX/CCM_App/SC_Server.cpp b/examples/ASX/CCM_App/SC_Server.cpp
new file mode 100644
index 00000000000..49a11179dcd
--- /dev/null
+++ b/examples/ASX/CCM_App/SC_Server.cpp
@@ -0,0 +1,63 @@
+// Simple driver program for the server.
+// @(#)SC_Server.cpp 1.1 10/18/96
+
+
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+#include "ace/Signal.h"
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ virtual int handle_input (ACE_HANDLE handle);
+ virtual int handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask);
+};
+
+int
+Event_Handler::handle_input (ACE_HANDLE handle)
+{
+ char buf[BUFSIZ];
+
+ ssize_t n = ACE_OS::read (handle, buf, sizeof buf);
+
+ if (n == -1)
+ return -1;
+ else if (ACE_OS::write (ACE_STDOUT, buf, n) != n)
+ return -1;
+ else
+ return 0;
+}
+
+int
+Event_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "closing Event_Handler\n"));
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config loggerd;
+ Event_Handler handler;
+ ACE_Sig_Adapter shutdown_handler ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+
+ if (ACE::register_stdin_handler (&handler,
+ ACE_Service_Config::reactor (),
+ ACE_Service_Config::thr_mgr ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
+
+ if (loggerd.open (argc, argv) == -1 && errno != ENOENT)
+ ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
+
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (SIGINT, &shutdown_handler) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n%a", "register_handler", 1));
+
+ // Perform logging service until we receive SIGINT.
+
+ loggerd.run_reactor_event_loop ();
+
+ return 0;
+}
diff --git a/examples/ASX/CCM_App/svc.conf b/examples/ASX/CCM_App/svc.conf
new file mode 100644
index 00000000000..4737cc6312e
--- /dev/null
+++ b/examples/ASX/CCM_App/svc.conf
@@ -0,0 +1,21 @@
+static ACE_Service_Manager "-d -p 3911"
+
+dynamic My_Task Service_Object *.shobj/CCM_App.so:make_task() "-p 3000"
+
+stream dynamic CCM_App STREAM *.shobj/CCM_App.so:make_stream() active
+{
+ dynamic Device_Adapter Module *.shobj/CCM_App.so:make_da()
+ dynamic Event_Analyzer Module *.shobj/CCM_App.so:make_ea()
+ dynamic Multicast_Router Module *.shobj/CCM_App.so:make_mr() "-p 3001"
+}
+
+stream CCM_App
+{
+ remove Device_Adapter
+ remove Event_Analyzer
+ remove Multicast_Router
+}
+
+remove CCM_App
+remove My_Task
+
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
new file mode 100644
index 00000000000..aad0adf313e
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
@@ -0,0 +1,132 @@
+#include "ace/Log_Msg.h"
+// @(#)Consumer_Router.cpp 1.1 10/18/96
+
+#include "Consumer_Router.h"
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
+
+int
+Consumer_Handler::open (void *a)
+{
+ CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
+}
+
+Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
+{
+}
+
+// Create a new handler that will interact with a consumer and point
+// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
+
+Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
+ : CONSUMER_ROUTER (tm)
+{
+}
+
+// Initialize the Router.
+
+int
+Consumer_Router::open (void *)
+{
+ assert (this->is_reader ());
+
+ char *argv[4];
+
+ argv[0] = (char *) this->name ();
+ argv[1] = "-p";
+ argv[2] = options.consumer_port ();
+ argv[3] = 0;
+
+ if (this->init (2, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+ return this->activate (options.t_flags ());
+}
+
+int
+Consumer_Router::close (u_long)
+{
+ assert (this->is_reader ());
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router\n"));
+ this->peer_map_.close ();
+
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+ return 0;
+}
+
+// Handle incoming messages in a separate thread.
+
+int
+Consumer_Router::svc (void)
+{
+ ACE_Thread_Control tc (this->thr_mgr ());
+ ACE_Message_Block *mb = 0;
+
+ assert (this->is_reader ());
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Consumer_Router\n"));
+
+ while (this->getq (mb) >= 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Consumer_Router is routing via send_peers\n"));
+ if (this->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Consumer_Router\n"),
+ -1);
+ }
+ ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Consumer_Router\n"));
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via destructor.
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s).
+
+int
+Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ assert (this->is_reader ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+ // Queue up the message, which will be processed by
+ // Consumer_Router::svc().
+ return this->putq (mb);
+}
+
+// Return information about the Client_Router ACE_Module.
+
+int
+Consumer_Router::info (char **strp, size_t length) const
+{
+ char buf[BUFSIZ];
+ ACE_INET_Addr addr;
+ const char *mod_name = this->name ();
+ ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s",
+ mod_name, addr.get_port_number (), "tcp",
+ "# consumer router\n");
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
new file mode 100644
index 00000000000..efc5acf9e3d
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
@@ -0,0 +1,46 @@
+/* -*- C++ -*- */
+// @(#)Consumer_Router.h 1.1 10/18/96
+
+/* The interface between one or more consumers and an Event Server ACE_Stream */
+
+#if !defined (_CONSUMER_ROUTER_H)
+#define _CONSUMER_ROUTER_H
+
+#include "ace/Thread_Manager.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Consumer_Handler; /* Forward declaration... */
+
+typedef ACE_HANDLE CONSUMER_KEY;
+
+typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER;
+
+class Consumer_Handler : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>
+{
+public:
+ Consumer_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Consumer_Router : public CONSUMER_ROUTER
+{
+public:
+ Consumer_Router (ACE_Thread_Manager *thr_manager);
+
+protected:
+ /* ACE_Task hooks. */
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ /* Dynamic linking hooks */
+ virtual int info (char **info_string, size_t length) const;
+};
+#endif /* ACE_HAS_THREADS */
+#endif /* _CONSUMER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
new file mode 100644
index 00000000000..977e5c4af9d
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
@@ -0,0 +1,68 @@
+#include "Event_Analyzer.h"
+// @(#)Event_Analyzer.cpp 1.1 10/18/96
+
+
+#if defined (ACE_HAS_THREADS)
+
+int
+Event_Analyzer::open (void *)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::close (u_long)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd;
+
+ switch (cmd = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ }
+ return 0;
+}
+
+int
+Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ this->control (mb);
+
+ return this->put_next (mb);
+}
+
+int
+Event_Analyzer::init (int, char *[])
+{
+ return 0;
+}
+
+int
+Event_Analyzer::fini (void)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::info (char **strp, size_t length) const
+{
+ const char *mod_name = this->name ();
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
new file mode 100644
index 00000000000..9d38611e7aa
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -0,0 +1,34 @@
+/* -*- C++ -*- */
+// @(#)Event_Analyzer.h 1.1 10/18/96
+
+/* Signal router */
+
+#if !defined (_EVENT_ANALYZER_H)
+#define _EVENT_ANALYZER_H
+
+#include "ace/Stream.h"
+#include "ace/Module.h"
+#include "ace/Task.h"
+#include "ace/Synch.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void) { return 0; }
+
+ /* Dynamic linking hooks */
+ virtual int init (int argc, char *argv[]);
+ virtual int fini (void);
+ virtual int info (char **info_string, size_t length) const;
+
+private:
+ virtual int control (ACE_Message_Block *);
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _EVENT_ANALYZER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Makefile b/examples/ASX/Event_Server/Event_Server/Makefile
new file mode 100644
index 00000000000..947f50c5d7a
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Makefile
@@ -0,0 +1,433 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for the Event Server test
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = event_server
+
+FILES = Options \
+ Supplier_Router \
+ Event_Analyzer \
+ Consumer_Router \
+ Peer_Router
+
+LSRC = $(addsuffix .cpp,$(FILES))
+LOBJ = $(addsuffix .o,$(FILES))
+SHOBJ = $(addsuffix .so,$(FILES))
+
+LDLIBS = $(addprefix .shobj/,$(SHOBJ))
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/Options.o .shobj/Options.so: Options.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Supplier_Router.o .shobj/Supplier_Router.so: Supplier_Router.cpp Supplier_Router.h \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Event_Analyzer.o .shobj/Event_Analyzer.so: Event_Analyzer.cpp Event_Analyzer.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i
+.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ Consumer_Router.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i \
+ $(WRAPPER_ROOT)/ace/SPIPE.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Peer_Router.o .shobj/Peer_Router.so: Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ Peer_Router.cpp
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/ASX/Event_Server/Event_Server/Options.cpp b/examples/ASX/Event_Server/Event_Server/Options.cpp
new file mode 100644
index 00000000000..41658639a77
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Options.cpp
@@ -0,0 +1,186 @@
+#include "ace/Get_Opt.h"
+// @(#)Options.cpp 1.1 10/18/96
+
+#include "ace/Synch.h"
+#include "ace/Thread.h"
+#include "ace/Log_Msg.h"
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+Options::Options (void)
+ : debugging_ (0),
+ verbosity_ (0),
+ low_water_mark_ (1024),
+ high_water_mark_ (8 * 1024),
+ message_size_ (128),
+ thr_count_ (4),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ consumer_port_ ("-p " ACE_ITOA (10000)),
+ supplier_port_ ("-p " ACE_ITOA (10001)),
+ t_flags_ (THR_DETACHED)
+{
+}
+
+Options::~Options (void)
+{
+}
+
+void Options::print_results (void)
+{
+#if !defined (ACE_WIN32)
+ ACE_Profile_Timer::ACE_Elapsed_Time et;
+ ACE_Profile_Timer::Rusage rusage;
+
+ this->itimer_.elapsed_time (et);
+ this->itimer_.get_rusage (rusage);
+
+ if (options.verbose ())
+ {
+#if defined (ACE_HAS_PRUSAGE_T)
+ ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ());
+ ACE_OS::printf ("%8d = lwpid\n"
+ "%8d = lwp count\n"
+ "%8d = minor page faults\n"
+ "%8d = major page faults\n"
+ "%8d = input blocks\n"
+ "%8d = output blocks\n"
+ "%8d = messages sent\n"
+ "%8d = messages received\n"
+ "%8d = signals received\n"
+ "%8ds, %dms = wait-cpu (latency) time\n"
+ "%8ds, %dms = user lock wait sleep time\n"
+ "%8ds, %dms = all other sleep time\n"
+ "%8d = voluntary context switches\n"
+ "%8d = involuntary context switches\n"
+ "%8d = system calls\n"
+ "%8d = chars read/written\n",
+ rusage.pr_lwpid,
+ rusage.pr_count,
+ rusage.pr_minf,
+ rusage.pr_majf,
+ rusage.pr_inblk,
+ rusage.pr_oublk,
+ rusage.pr_msnd,
+ rusage.pr_mrcv,
+ rusage.pr_sigs,
+ rusage.pr_wtime.tv_sec, rusage.pr_wtime.tv_nsec / 1000000,
+ rusage.pr_ltime.tv_sec, rusage.pr_ltime.tv_nsec / 1000000,
+ rusage.pr_slptime.tv_sec, rusage.pr_slptime.tv_nsec / 1000000,
+ rusage.pr_vctx,
+ rusage.pr_ictx,
+ rusage.pr_sysc,
+ rusage.pr_ioch);
+#else
+ /* Someone needs to write the corresponding dump for rusage... */
+#endif /* ACE_HAS_PRUSAGE_T */
+ }
+
+ ACE_OS::printf ("---------------------\n"
+ "real time = %.3f\n"
+ "user time = %.3f\n"
+ "system time = %.3f\n"
+ "---------------------\n",
+ et.real_time, et.user_time, et.system_time);
+#endif /* ACE_WIN32 */
+}
+
+/* Manages the options */
+Options options;
+
+void
+Options::parse_args (int argc, char *argv[])
+{
+ ACE_LOG_MSG->open (argv[0]);
+
+ ACE_Get_Opt get_opt (argc, argv, "c:bdH:i:L:l:M:ns:t:T:v");
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ this->t_flags (THR_BOUND);
+ break;
+ case 'c':
+ this->consumer_port (get_opt.optarg);
+ break;
+ case 'd':
+ this->debugging_ = 1;
+ break;
+ case 'H':
+ this->high_water_mark (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'i':
+ this->iterations (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'L':
+ this->low_water_mark (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'l':
+ this->initial_queue_length (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'M':
+ this->message_size (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'n':
+ this->t_flags (THR_NEW_LWP);
+ break;
+ case 's':
+ this->supplier_port (get_opt.optarg);
+ break;
+ case 'T':
+ if (ACE_OS::strcasecmp (get_opt.optarg, "ON") == 0)
+ ACE_Trace::start_tracing ();
+ else if (ACE_OS::strcasecmp (get_opt.optarg, "OFF") == 0)
+ ACE_Trace::stop_tracing ();
+ break;
+ case 't':
+ this->thr_count (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'v':
+ this->verbosity_ = 1;
+ break;
+ default:
+ ::fprintf (stderr, "%s\n"
+ "\t[-b] (THR_BOUND)\n"
+ "\t[-c consumer port]\n"
+ "\t[-d] (enable debugging)\n"
+ "\t[-H high water mark]\n"
+ "\t[-i number of test iterations]\n"
+ "\t[-L low water mark]\n"
+ "\t[-M] message size \n"
+ "\t[-n] (THR_NEW_LWP)\n"
+ "\t[-q max queue size]\n"
+ "\t[-s supplier port]\n"
+ "\t[-t number of threads]\n"
+ "\t[-v] (verbose) \n",
+ argv[0]);
+ ::exit (1);
+ /* NOTREACHED */
+ break;
+ }
+
+ if (this->verbose ())
+ ACE_OS::printf ("%8d = initial concurrency hint\n"
+ "%8d = total iterations\n"
+ "%8d = thread count\n"
+ "%8d = low water mark\n"
+ "%8d = high water mark\n"
+ "%8d = message_size\n"
+ "%8d = initial queue length\n"
+ "%8d = THR_BOUND\n"
+ "%8d = THR_NEW_LWP\n",
+ ACE_Thread::getconcurrency (),
+ this->iterations (),
+ this->thr_count (),
+ this->low_water_mark (),
+ this->high_water_mark (),
+ this->message_size (),
+ this->initial_queue_length (),
+ (this->t_flags () & THR_BOUND) != 0,
+ (this->t_flags () & THR_NEW_LWP) != 0);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h
new file mode 100644
index 00000000000..054f0d834f6
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Options.h
@@ -0,0 +1,75 @@
+/* -*- C++ -*- */
+// @(#)Options.h 1.1 10/18/96
+
+/* Option manager for Event Server */
+
+#if !defined (DEVICE_OPTIONS_H)
+#define DEVICE_OPTIONS_H
+
+#include "ace/OS.h"
+#include "ace/Profile_Timer.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Options
+{
+public:
+ Options (void);
+ ~Options (void);
+ void parse_args (int argc, char *argv[]);
+
+ void stop_timer (void);
+ void start_timer (void);
+
+ void thr_count (size_t count);
+ size_t thr_count (void);
+
+ void initial_queue_length (size_t length);
+ size_t initial_queue_length (void);
+
+ void high_water_mark (size_t size);
+ size_t high_water_mark (void);
+
+ void low_water_mark (size_t size);
+ size_t low_water_mark (void);
+
+ void message_size (size_t size);
+ size_t message_size (void);
+
+ void iterations (size_t n);
+ size_t iterations (void);
+
+ void t_flags (long flag);
+ long t_flags (void);
+
+ void supplier_port (char *port);
+ char *supplier_port (void);
+
+ void consumer_port (char *port);
+ char *consumer_port (void);
+
+ int debug (void);
+ int verbose (void);
+
+ void print_results (void);
+
+private:
+ ACE_Profile_Timer itimer_; /* Time the process */
+ size_t thr_count_; /* Number of threads to spawn */
+ long t_flags_; /* Flags to thr_create() */
+ size_t high_water_mark_; /* ACE_Task high water mark */
+ size_t low_water_mark_; /* ACE_Task low water mark */
+ size_t message_size_; /* Size of a message */
+ size_t initial_queue_length_; /* Initial number of items in the queue */
+ size_t iterations_; /* Number of iterations to run the test program */
+ int debugging_; /* Extra debugging info */
+ int verbosity_; /* Extra verbose messages */
+ char *consumer_port_; /* Port that the Consumer_Router is using */
+ char *supplier_port_; /* Port that the Supplier_Router is using */
+};
+
+extern Options options;
+
+#include "Options.i"
+#endif /* ACE_HAS_THREADS */
+#endif /* DEVICE_OPTIONS_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.i b/examples/ASX/Event_Server/Event_Server/Options.i
new file mode 100644
index 00000000000..c538b94b46a
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Options.i
@@ -0,0 +1,137 @@
+/* -*- C++ -*- */
+// @(#)Options.i 1.1 10/18/96
+
+/* Option manager for ustreams */
+
+inline void
+Options::supplier_port (char *port)
+{
+ this->supplier_port_ = port;
+}
+
+inline char *
+Options::supplier_port (void)
+{
+ return this->supplier_port_;
+}
+
+inline void
+Options::consumer_port (char *port)
+{
+ this->consumer_port_ = port;
+}
+
+inline char *
+Options::consumer_port (void)
+{
+ return this->consumer_port_;
+}
+
+inline void
+Options::start_timer (void)
+{
+ this->itimer_.start ();
+}
+
+inline void
+Options::stop_timer (void)
+{
+ this->itimer_.stop ();
+}
+
+inline void
+Options::thr_count (size_t count)
+{
+ this->thr_count_ = count;
+}
+
+inline size_t
+Options::thr_count (void)
+{
+ return this->thr_count_;
+}
+
+inline void
+Options::initial_queue_length (size_t length)
+{
+ this->initial_queue_length_ = length;
+}
+
+inline size_t
+Options::initial_queue_length (void)
+{
+ return this->initial_queue_length_;
+}
+
+inline void
+Options::high_water_mark (size_t size)
+{
+ this->high_water_mark_ = size;
+}
+
+inline size_t
+Options::high_water_mark (void)
+{
+ return this->high_water_mark_;
+}
+
+inline void
+Options::low_water_mark (size_t size)
+{
+ this->low_water_mark_ = size;
+}
+
+inline size_t
+Options::low_water_mark (void)
+{
+ return this->low_water_mark_;
+}
+
+inline void
+Options::message_size (size_t size)
+{
+ this->message_size_ = size;
+}
+
+inline size_t
+Options::message_size (void)
+{
+ return this->message_size_;
+}
+
+inline void
+Options::iterations (size_t n)
+{
+ this->iterations_ = n;
+}
+
+inline size_t
+Options::iterations (void)
+{
+ return this->iterations_;
+}
+
+inline void
+Options::t_flags (long flag)
+{
+ this->t_flags_ |= flag;
+}
+
+inline long
+Options::t_flags (void)
+{
+ return this->t_flags_;
+}
+
+inline int
+Options::debug (void)
+{
+ return this->debugging_;
+}
+
+inline int
+Options::verbose (void)
+{
+ return this->verbosity_;
+}
+
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
new file mode 100644
index 00000000000..f0a77ed7103
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -0,0 +1,279 @@
+#if !defined (_PEER_ROUTER_C)
+// @(#)Peer_Router.cpp 1.1 10/18/96
+
+#define _PEER_ROUTER_C
+
+#include "ace/Service_Config.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "Options.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Define some short-hand macros to deal with verbose templates
+// names...
+
+#define PH PEER_HANDLER
+#define PA PEER_ACCEPTOR
+#define PAD PEER_ADDR
+#define PK PEER_KEY
+#define PM PEER_MAP
+
+template <class PH, class PK> int
+Acceptor_Factory<PH, PK>::init (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "dp:", 0);
+ ACE_INET_Addr addr;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'p':
+ addr.set (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 'd':
+ break;
+ default:
+ break;
+ }
+
+ if (this->open (addr, ACE_Service_Config::reactor ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
+ return 0;
+}
+
+template <class PH, class PK>
+Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
+ : pr_ (pr)
+{
+}
+
+template <class PH, class PK> Peer_Router<PH, PK> *
+Acceptor_Factory<PH, PK>::router (void)
+{
+ return this->pr_;
+}
+
+template <class ROUTER, class KEY>
+Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
+ : inherited (tm)
+{
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::svc (void)
+{
+#if 0
+ ACE_Thread_Control thread_control (tm);
+ // Just a try !! we're just reading from our Message_Queue
+ ACE_Message_Block *db, *hb;
+ int n;
+
+ // Do an endless loop
+ for (;;)
+ {
+ db = new Message_Block (BUFSIZ);
+ hb = new Message_Block (sizeof (KEY), Message_Block::MB_PROTO, db);
+
+ if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1)
+ LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1);
+ LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n"));
+ return -1; // We do not need to be deregistered by reactor
+ // as we were not registered at all
+ }
+ else
+ // Transform incoming buffer into a Message and pass
+ // downstream.
+ {
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
+ hb->wr_ptr (sizeof (long));
+
+ if (this->router_task_->reply (hb) == -1)
+ {
+ cout << "Peer_Handler.svc : router_task->reply failed" << endl ;
+ return -1;
+ }
+ }
+ }
+ return 0;
+#else
+ return -1;
+#endif
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ return this->peer ().send_n (mb->rd_ptr (), mb->length ());
+}
+
+// Create a new handler and point its ROUTER_TASK_ data member to the
+// corresponding router. Note that this router is extracted out of
+// the Acceptor_Factory * that is passed in via the
+// ACE_Acceptor::handle_input() method.
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::open (void *a)
+{
+ char buf[BUFSIZ], *p = buf;
+
+ if (this->router_task_->info (&p, sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n",
+ buf, this->get_handle (), a));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
+#if 0
+ if (this->activate (options.t_flags ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
+#endif
+ ACE_DEBUG ((LM_DEBUG,
+ "Peer_Handler::open registering with Reactor for handle_input\n"));
+
+ if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
+ else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
+ else if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable non-blocking I/O"), -1);
+ return 0;
+}
+
+// Receive a message from a supplier.
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
+
+ ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY),
+ ACE_Message_Block::MB_PROTO, db);
+ int n;
+
+ if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
+ return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
+ }
+ else // Transform incoming buffer into a Message and pass downstream.
+ {
+ db->wr_ptr (n);
+ *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment.
+ hb->wr_ptr (sizeof (ACE_HANDLE));
+ return this->router_task_->reply (hb) == -1 ? -1 : 0;
+ }
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
+ : ACE_Task<ACE_MT_SYNCH> (tm)
+{
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
+{
+ PEER_ITERATOR map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+ ACE_Message_Block *data_block = mb->cont ();
+
+ for (ACE_Map_Entry<PK, PH *> *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_));
+
+ iterations++;
+ bytes += ss->int_id_->put (data_block);
+ }
+
+ delete mb;
+ return bytes == 0 ? 0 : bytes / iterations;
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::~Peer_Router (void)
+{
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::fini (void)
+{
+ delete this->acceptor_;
+ return 0;
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
+
+ switch (command = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ return -1;
+ }
+ return 0;
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::unbind_peer (PK key)
+{
+ return this->peer_map_.unbind (key);
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
+{
+ PH *peer_handler = (PH *) ph;
+ return this->peer_map_.bind (key, peer_handler);
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::init (int argc, char *argv[])
+{
+ this->acceptor_ = new ACCEPTOR (this);
+
+ if (this->acceptor_->init (argc, argv) == -1
+ || this->peer_map_.open () == -1)
+ return -1;
+ else
+ {
+ ACE_INET_Addr addr;
+ ACE_SOCK_Acceptor &acceptor = this->acceptor_->acceptor();
+
+ if (acceptor.get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, port = %d, fd = %d, this = %u\n",
+ this->name (), addr.get_port_number (),
+ acceptor.get_handle (), this));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1);
+ }
+ return 0;
+}
+
+#undef PH
+#undef PA
+#undef PAD
+#undef PK
+#undef PM
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_C */
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
new file mode 100644
index 00000000000..5df444a985a
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
@@ -0,0 +1,121 @@
+/* -*- C++ -*- */
+// @(#)Peer_Router.h 1.1 10/18/96
+
+
+#if !defined (_PEER_ROUTER_H)
+#define _PEER_ROUTER_H
+
+#include "ace/Acceptor.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Map_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Forward declaration.
+template <class PEER_HANDLER, class KEY>
+class Peer_Router;
+
+template <class PEER_HANDLER, class KEY>
+class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_SOCK_ACCEPTOR>
+ // = TITLE
+ // Creates <PEER_HANDLERs>, which route events between peers.
+{
+public:
+ Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr);
+ Peer_Router<PEER_HANDLER, KEY> *router (void);
+
+ int init (int argc, char *argv[]);
+ // Initialize the acceptor when it's linked dynamically.
+
+private:
+ Peer_Router<PEER_HANDLER, KEY> *pr_;
+};
+
+template <class ROUTER, class KEY>
+class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
+ // = TITLE
+ // Receive input from a Peer.
+{
+public:
+ Peer_Handler (ACE_Thread_Manager * = 0);
+
+ virtual int open (void * = 0);
+ // Called by the ACE_Acceptor::handle_input() to activate this object.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive input from the peer.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ // Send output to a peer.
+
+protected:
+ typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
+
+ ROUTER *router_task_;
+ // Pointer to write task.
+
+private:
+ virtual int svc (void);
+ // Don't need this method here...
+};
+
+template <class PEER_HANDLER, class PEER_KEY>
+class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // This abstract base class provides mechanisms for routing
+ // messages to/from a ACE_Stream from/to one or more peers (which
+ // are typically running on remote hosts).
+ //
+ // = DESCRIPTION
+ // A subclass of Peer_Router overrides the open(), close(), and
+ // put() methods in order to specialize the behavior of the router
+ // to meet application-specific requirements.
+{
+public:
+ // = Initialization and termination methods.
+ Peer_Router (ACE_Thread_Manager * = 0);
+ ~Peer_Router (void);
+
+ typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER;
+
+ virtual int unbind_peer (PEER_KEY);
+ // Remove a PEER_HANDLER from the PEER_MAP.
+
+ virtual int bind_peer (PEER_KEY, HANDLER *);
+ // Add a PEER_HANDLER to the PEER_MAP
+
+ int send_peers (ACE_Message_Block *mb);
+ // Send the message block to the peer(s).
+
+protected:
+ virtual int control (ACE_Message_Block *);
+ // Handle control messages arriving from adjacent Modules.
+
+ // = Useful typedefs
+ typedef ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_MAP;
+ typedef ACE_Map_Iterator<PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_ITERATOR;
+
+ PEER_MAP peer_map_;
+ // Map used to keep track of active peers.
+
+ // = Dynamic linking initialization hooks inherited from ACE_Task
+ virtual int init (int argc, char *argv[]);
+ virtual int fini (void);
+
+ typedef Acceptor_Factory<PEER_HANDLER, PEER_KEY> ACCEPTOR;
+
+ ACCEPTOR *acceptor_;
+ // Factory for accepting new PEER_HANDLERs.
+
+private:
+ // = Prevent copies and pass-by-value.
+ Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
+ void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Peer_Router.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
new file mode 100644
index 00000000000..6d18dce66fa
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -0,0 +1,134 @@
+#include "Supplier_Router.h"
+// @(#)Supplier_Router.cpp 1.1 10/18/96
+
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
+
+int
+Supplier_Handler::open (void *a)
+{
+ SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
+}
+
+Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
+{
+}
+
+// Create a new router and associate it with the REACTOR parameter.
+
+Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
+ : SUPPLIER_ROUTER (tm)
+{
+}
+
+// Handle incoming messages in a separate thread.
+
+int
+Supplier_Router::svc (void)
+{
+ assert (this->is_writer ());
+
+ ACE_Thread_Control tc (this->thr_mgr ());
+ ACE_Message_Block *mb = 0;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
+
+ while (this->getq (mb) >= 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n"));
+ if (this->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Supplier_Router\n"),
+ -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Supplier_Router\n"));
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via ACE_Thread_Control's
+ // destructor.
+}
+
+// Initialize the Router.
+
+int
+Supplier_Router::open (void *)
+{
+ assert (this->is_writer ());
+
+ char *argv[4];
+
+ argv[0] = (char *) this->name ();
+ argv[1] = "-p";
+ argv[2] = options.supplier_port ();
+ argv[3] = 0;
+
+ if (this->init (2, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+ return this->activate (options.t_flags ());
+}
+
+// Close down the router.
+
+int
+Supplier_Router::close (u_long)
+{
+ assert (this->is_writer ());
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n"));
+ this->peer_map_.close ();
+
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+ return 0;
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s).
+
+int
+Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ assert (this->is_writer ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+ // Queue up the message, which will be processed by
+ // Supplier_Router::svc().
+ return this->putq (mb);
+}
+
+// Return information about the Supplier_Router ACE_Module.
+
+int
+Supplier_Router::info (char **strp, size_t length) const
+{
+ char buf[BUFSIZ];
+ ACE_INET_Addr addr;
+ const char *mod_name = this->name ();
+ ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s",
+ mod_name, addr.get_port_number (), "tcp",
+ "# supplier router\n");
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
new file mode 100644
index 00000000000..766e08c01e3
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
@@ -0,0 +1,51 @@
+/* -*- C++ -*- */
+// @(#)Supplier_Router.h 1.1 10/18/96
+
+/* The interface between a supplier and an Event Service ACE_Stream */
+
+#if !defined (_SUPPLIER_ROUTER_H)
+#define _SUPPLIER_ROUTER_H
+
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/Map_Manager.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+/* Forward declaration */
+class Supplier_Handler;
+
+/* Type of search key for SUPPLIER_MAP */
+typedef ACE_HANDLE SUPPLIER_KEY;
+
+/* Instantiated type for routing messages to suppliers */
+
+typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER;
+
+class Supplier_Handler : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>
+{
+public:
+ Supplier_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Supplier_Router : public SUPPLIER_ROUTER
+{
+public:
+ Supplier_Router (ACE_Thread_Manager *);
+
+protected:
+ /* ACE_Task hooks. */
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ /* Dynamic linking hooks inherited from Peer_Router */
+ virtual int info (char **info_string, size_t length) const;
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _SUPPLIER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp
new file mode 100644
index 00000000000..e54bb845d84
--- /dev/null
+++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp
@@ -0,0 +1,125 @@
+// Test the event server.
+// @(#)event_server.cpp 1.1 10/18/96
+
+#include "ace/Log_Msg.h"
+#include "ace/Stream.h"
+#include "ace/Service_Config.h"
+#include "Options.h"
+#include "Consumer_Router.h"
+#include "Event_Analyzer.h"
+#include "Supplier_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
+
+// Handle SIGINT and terminate the entire application.
+
+class Quit_Handler : public ACE_Sig_Adapter
+{
+public:
+ Quit_Handler (void);
+ virtual int handle_input (ACE_HANDLE fd);
+};
+
+Quit_Handler::Quit_Handler (void)
+ : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Service_Config::end_reactor_event_loop))
+{
+ // Register to trap input from the user.
+ if (ACE::register_stdin_handler (this,
+ ACE_Service_Config::reactor (),
+ ACE_Service_Config::thr_mgr ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
+ // Register to trap the SIGINT signal.
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (SIGINT, this) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
+}
+
+int
+Quit_Handler::handle_input (ACE_HANDLE)
+{
+ options.stop_timer ();
+ ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
+ options.print_results ();
+
+ ACE_Service_Config::end_reactor_event_loop ();
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon;
+
+ options.parse_args (argc, argv);
+
+ {
+ // Primary ACE_Stream for EVENT_SERVER application.
+ MT_Stream event_server;
+
+ // Enable graceful shutdowns...
+ Quit_Handler quit_handler;
+
+ // Create the Supplier Router module.
+
+ MT_Module *sr = new MT_Module ("Supplier_Router",
+ new Supplier_Router (ACE_Service_Config::thr_mgr ()));
+
+ // Create the Event Analyzer module.
+
+ MT_Module *ea = new MT_Module ("Event_Analyzer",
+ new Event_Analyzer,
+ new Event_Analyzer);
+
+ // Create the Consumer Router module.
+
+ MT_Module *cr = new MT_Module ("Consumer_Router",
+ 0, // 0 triggers the creation of a ACE_Thru_Task...
+ new Consumer_Router (ACE_Service_Config::thr_mgr ()));
+
+ // Push the Modules onto the event_server stream.
+
+ if (event_server.push (sr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
+
+ if (event_server.push (ea) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);
+
+ if (event_server.push (cr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);
+
+ // Set the high and low water marks appropriately.
+
+ int wm = options.low_water_mark ();
+
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);
+
+ wm = options.high_water_mark ();
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);
+
+ options.start_timer ();
+
+ // Perform the main event loop waiting for the user to type ^C or to
+ // enter a line on the ACE_STDIN.
+
+ daemon.run_reactor_event_loop ();
+ // The destructor of event_server will close down the stream and
+ // call the close() hooks on all the ACE_Tasks.
+ }
+
+ // Wait for the threads to exit.
+ ACE_Service_Config::thr_mgr ()->wait ();
+ ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
+ return 0;
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Makefile b/examples/ASX/Event_Server/Makefile
new file mode 100644
index 00000000000..6c8d3f443f5
--- /dev/null
+++ b/examples/ASX/Event_Server/Makefile
@@ -0,0 +1,23 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for the Event Server tests
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+DIRS = Event_Server \
+ Transceiver
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nolocal.GNU
+
diff --git a/examples/ASX/Event_Server/README b/examples/ASX/Event_Server/README
new file mode 100644
index 00000000000..f97e767cdd8
--- /dev/null
+++ b/examples/ASX/Event_Server/README
@@ -0,0 +1,38 @@
+The subdirectory illustrates a number of the ACE ASX framework
+features using an ACE_Stream application called the Event Server. The
+Event Server works as follows:
+
+1. When the ./Event_Server/event_server executable is run it
+ creates two SOCK_Acceptors, which listen for and accept
+ incoming connections from consumers and suppliers.
+
+2. The ./Event_Server/Transceiver/transceiver application may be
+ started multiple times. Each call should be either:
+
+ % transceiver -p XYZ -h hostname
+
+ or
+
+ % transceiver -p ABC -h hostname
+
+ where XYZ and ABC are the consumer port and supplier port,
+ respectively, on the event server and "hostname" is the name of the
+ machine the event_server is running. I typically open up multiple
+ windows.
+
+3. Once the consumer(s) and supplier(s) are connected, you can type
+ data from any supplier windows. This data will be routed
+ through the Modules/Tasks in an event_server's Stream and
+ be forwarded to the consumer(s).
+
+4. When you want to shut down the tranceivers or event server
+ just type ^C (which generates a SIGINT).
+
+What makes this example particularly interesting is that
+once you've got the hang of this basic architecture, you can
+"push" new filtering Modules onto the event_server Stream
+ and modify the application's behavior.
+
+For more information on the design and use of the ACE ASX framework
+please see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
+http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz
diff --git a/examples/ASX/Event_Server/Transceiver/Makefile b/examples/ASX/Event_Server/Transceiver/Makefile
new file mode 100644
index 00000000000..7fb95dc617d
--- /dev/null
+++ b/examples/ASX/Event_Server/Transceiver/Makefile
@@ -0,0 +1,135 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for the transceiver portion of the Event Server test
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = transceiver
+
+LSRC = $(addsuffix .cpp,$(BIN))
+
+LDLIBS =
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VBIN)
+
+INSTALL =
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/transceiver.o .shobj/transceiver.so: transceiver.cpp \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Connector.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Connector.i \
+ $(WRAPPER_ROOT)/ace/Connector.cpp \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
new file mode 100644
index 00000000000..7321e02dd9c
--- /dev/null
+++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
@@ -0,0 +1,187 @@
+// Test program for the event transceiver. This program can play the
+// @(#)transceiver.cpp 1.1 10/18/96
+
+// role of either Consumer or Supplier. You can terminate this
+// program by typing ^C....
+
+#include "ace/Log_Msg.h"
+#include "ace/Service_Config.h"
+#include "ace/Connector.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Get_Opt.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+ // = TITLE
+ // Generate and receives messages from the event server.
+ //
+ // = DESCRIPTION
+ // This class is both a consumer and supplier of events, i.e.,
+ // it is a ``transceiver.''
+{
+public:
+ Event_Transceiver (void);
+
+ // = Svc_Handler hook called by the <ACE_Connector>.
+ virtual int open (void *);
+ // Initialize the transceiver when we are connected.
+
+ // = Demultplexing hooks from the <ACE_Reactor>.
+ virtual int handle_input (ACE_HANDLE);
+ // Receive data from STDIN or socket.
+
+ virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
+ // Close down via SIGINT.
+
+private:
+ int receiver (void);
+ // Reads data from socket and writes to ACE_STDOUT.
+
+ int forwarder (void);
+ // Writes data from ACE_STDIN to socket.
+};
+
+// Close down via SIGINT.
+
+int
+Event_Transceiver::handle_signal (int signum,
+ siginfo_t *,
+ ucontext_t *)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) received signal %S\n", signum));
+
+ ACE_Service_Config::end_reactor_event_loop ();
+ return 0;
+}
+
+Event_Transceiver::Event_Transceiver (void)
+{
+ ACE_Sig_Set sig_set;
+
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ if (ACE_Service_Config::reactor ()->register_handler
+ (sig_set, this) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
+}
+
+int
+Event_Transceiver::open (void *)
+{
+ if (ACE_Service_Config::reactor ()->register_handler
+ (this->peer ().get_handle (),
+ this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
+ else if (ACE::register_stdin_handler (this,
+ ACE_Service_Config::reactor (),
+ ACE_Service_Config::thr_mgr ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_stdin_handler"), -1);
+ return 0;
+}
+
+int
+Event_Transceiver::handle_input (ACE_HANDLE handle)
+{
+ if (handle == ACE_STDIN)
+ return this->forwarder ();
+ else
+ return this->receiver ();
+}
+
+
+int
+Event_Transceiver::forwarder (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver forwarder\n"));
+
+ char buf[BUFSIZ];
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+ int result = 0;
+
+ if (n <= 0 || this->peer ().send_n (buf, n) != n)
+ result = -1;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver forwarder\n"));
+ return result;
+}
+
+int
+Event_Transceiver::receiver (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver receiver\n"));
+
+ char buf[BUFSIZ];
+
+ ssize_t n = this->peer ().recv (buf, sizeof buf);
+ int result = 0;
+
+ if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n)
+ result = -1;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver receiver\n"));
+ return result;
+}
+
+// Port number of event server.
+static u_short port_number;
+
+// Name of event server.
+static char *host_name;
+
+// Handle the command-line arguments.
+
+static void
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "h:p:");
+
+ port_number = ACE_DEFAULT_SERVER_PORT;
+ host_name = ACE_DEFAULT_SERVER_HOST;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'h':
+ host_name = get_opt.optarg;
+ break;
+ case 'p':
+ port_number = ACE_OS::atoi (get_opt.optarg);
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR,
+ "usage: %n [-p portnum] [-h host_name]\n%a", 1));
+ /* NOTREACHED */
+ break;
+ }
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon (argv[0]);
+
+ parse_args (argc, argv);
+
+ // Establish the connection.
+ ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
+ Event_Transceiver transceiver;
+
+ if (connector.connect (&transceiver, ACE_INET_Addr (port_number, host_name)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1);
+
+ // Run event loop until either the event server shuts down or we get
+ // a SIGINT.
+ ACE_Service_Config::run_reactor_event_loop ();
+ return 0;
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR ((LM_ERROR, "test not defined for this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Makefile b/examples/ASX/Makefile
new file mode 100644
index 00000000000..38b94b51626
--- /dev/null
+++ b/examples/ASX/Makefile
@@ -0,0 +1,25 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for the ASX test directory
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+DIRS = CCM_App \
+ Event_Server \
+ Message_Queue \
+ UPIPE_Event_Server
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nolocal.GNU
+
diff --git a/examples/ASX/Message_Queue/Makefile b/examples/ASX/Message_Queue/Makefile
new file mode 100644
index 00000000000..77e32813526
--- /dev/null
+++ b/examples/ASX/Message_Queue/Makefile
@@ -0,0 +1,218 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for Message_Queue tests
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = buffer_stream \
+ bounded_buffer \
+ priority_buffer
+
+LSRC = $(addsuffix .cpp,$(BIN))
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/buffer_stream.o .shobj/buffer_stream.so: buffer_stream.cpp \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i
+.obj/bounded_buffer.o .shobj/bounded_buffer.so: bounded_buffer.cpp \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h
+.obj/priority_buffer.o .shobj/priority_buffer.so: priority_buffer.cpp \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Read_Buffer.h \
+ $(WRAPPER_ROOT)/ace/Read_Buffer.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/ASX/Message_Queue/bounded_buffer.cpp b/examples/ASX/Message_Queue/bounded_buffer.cpp
new file mode 100644
index 00000000000..ec5abd1d7e1
--- /dev/null
+++ b/examples/ASX/Message_Queue/bounded_buffer.cpp
@@ -0,0 +1,130 @@
+// This short program copies stdin to stdout via the use of an ASX
+// @(#)bounded_buffer.cpp 1.1 10/18/96
+
+// Message_Queue. It illustrates an implementation of the classic
+// "bounded buffer" program.
+
+#include "ace/Log_Msg.h"
+#include "ace/Message_Queue.h"
+#include "ace/Thread_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Global thread manager.
+static ACE_Thread_Manager thr_mgr;
+
+// Message list.
+static ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Insert thread into thr_mgr.
+ ACE_Thread_Control thread_control (&thr_mgr);
+
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+
+ n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ());
+
+ if (n <= 0)
+ {
+ // Send a shutdown message to the other thread and exit.
+ mb->length (0);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ break;
+ }
+
+ // Send the message to the other thread.
+ else
+ {
+ mb->msg_priority (n);
+ mb->wr_ptr (n);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ }
+ }
+
+ // The destructor of ACE_Thread_Control removes the exiting thread
+ // from the thr_mgr automatically.
+ return 0;
+}
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Insert thread into thr_mgr.
+ ACE_Thread_Control thread_control (&thr_mgr);
+ int result = 0;
+
+ // Keep looping, reading a message out of the queue, until we timeout
+ // or get a message with a length == 0, which signals us to quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
+
+ result = msg_queue->dequeue_head (mb, &timeout);
+
+ if (result == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
+
+ delete mb;
+
+ if (length == 0)
+ break;
+ }
+
+ if (result == -1 && errno == EWOULDBLOCK)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n%a", "timed out waiting for message", 1));
+
+ // The destructor of ACE_Thread_Control removes the exiting thread
+ // from the thr_mgr automatically.
+ return 0;
+}
+
+/* Spawn off two threads that copy stdin to stdout. */
+
+int main (int, char *[])
+{
+ if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+ else if (thr_mgr.spawn (ACE_THR_FUNC (consumer), (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ // Wait for producer and consumer threads to exit.
+ thr_mgr.wait ();
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp
new file mode 100644
index 00000000000..0cc96cd16c5
--- /dev/null
+++ b/examples/ASX/Message_Queue/buffer_stream.cpp
@@ -0,0 +1,215 @@
+// This short program copies stdin to stdout via the use of an ASX
+// @(#)buffer_stream.cpp 1.1 10/18/96
+
+// STREAM. It illustrates an implementation of the classic "bounded
+// buffer" program using an ASX STREAM containing two Modules. Each
+// ACE_Module contains two Tasks. Each ACE_Task contains a
+// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how
+// the use of these reusable components reduces the reliance on global
+// variables, as compared with the bounded_buffer.C example.
+
+#include "ace/Log_Msg.h"
+#include "ace/Synch.h"
+#include "ace/Service_Config.h"
+#include "ace/Stream.h"
+#include "ace/Module.h"
+#include "ace/Task.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
+typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
+
+class Common_Task : public MT_Task
+ // = TITLE
+ // Methods that are common to the producer and consumer.
+{
+public:
+ Common_Task (void) {}
+ // ACE_Task hooks
+ virtual int open (void * = 0);
+ virtual int close (u_long = 0);
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; }
+
+ // ACE_Service_Object hooks
+ virtual int init (int, char **) { return 0; }
+ virtual int fini (void) { return 0; }
+ virtual int info (char **, size_t) const { return 0; }
+};
+
+// Define the Producer interface.
+
+class Producer : public Common_Task
+{
+public:
+ Producer (void) {}
+
+ // Read data from stdin and pass to consumer.
+ virtual int svc (void);
+};
+
+class Consumer : public Common_Task
+ // = TITLE
+ // Define the Consumer interface.
+{
+public:
+ Consumer (void) {}
+
+ // Enqueue the message on the ACE_Message_Queue for subsequent
+ // handling in the svc() method.
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
+
+ // Receive message from producer and print to stdout.
+ virtual int svc (void);
+
+private:
+
+ ACE_Time_Value timeout_;
+};
+
+// Spawn off a new thread.
+
+int
+Common_Task::open (void *)
+{
+ if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
+ return 0;
+}
+
+int
+Common_Task::close (u_long exit_status)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting with status %d in module %s\n",
+ exit_status, this->name ()));
+
+ // Can do anything here that is required when a thread exits, e.g.,
+ // storing thread-specific information in some other storage
+ // location, etc.
+ return 0;
+}
+
+// The Consumer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+int
+Producer::svc (void)
+{
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+
+ n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ());
+
+ if (n <= 0)
+ {
+ // Send a shutdown message to the other thread and exit.
+ mb->length (0);
+
+ if (this->put_next (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ break;
+ }
+
+ // Send the message to the other thread.
+ else
+ {
+ mb->wr_ptr (n);
+
+ if (this->put_next (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ }
+ }
+
+ return 0;
+}
+
+// Simply enqueue the Message_Block into the end of the queue.
+
+int
+Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+{
+ return this->putq (mb, tv);
+}
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// Consumer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+int
+Consumer::svc (void)
+{
+ int result = 0;
+
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ this->timeout_.sec (ACE_OS::time (0) + 4); // Wait for upto 4 seconds
+
+ result = this->getq (mb, &this->timeout_);
+
+ if (result == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
+
+ delete mb;
+
+ if (length == 0)
+ break;
+ }
+
+ if (result == -1 && errno == EWOULDBLOCK)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n%a", "timed out waiting for message", 1));
+ return 0;
+}
+
+// Main driver function.
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon (argv[0]);
+
+ // Control hierachically-related active objects
+ MT_Stream stream;
+ MT_Module *pm = new MT_Module ("Consumer", new Consumer);
+ MT_Module *cm = new MT_Module ("Producer", new Producer);
+
+ // Create Producer and Consumer Modules and push them onto the
+ // STREAM. All processing is performed in the STREAM.
+
+ if (stream.push (pm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1);
+ else if (stream.push (cm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1);
+
+ // Barrier synchronization: wait for the threads to exit, then exit
+ // ourselves.
+ ACE_Service_Config::thr_mgr ()->wait ();
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Message_Queue/priority_buffer.cpp b/examples/ASX/Message_Queue/priority_buffer.cpp
new file mode 100644
index 00000000000..ef59d76b355
--- /dev/null
+++ b/examples/ASX/Message_Queue/priority_buffer.cpp
@@ -0,0 +1,139 @@
+// This short program prints the contents of stdin to stdout sorted by
+// @(#)priority_buffer.cpp 1.1 10/18/96
+
+// the length of each line via the use of an ASX Message_Queue. It
+// illustrates how priorities can be used for ACE Message_Queues.
+
+#include "ace/Log_Msg.h"
+#include "ace/Message_Queue.h"
+#include "ace/Read_Buffer.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Service_Config.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Global thread manager.
+static ACE_Thread_Manager thr_mgr;
+
+// Make the queue be capable of being *very* large.
+static const long max_queue = LONG_MAX;
+
+// Message queue.
+static ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *
+consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ if (msg_queue->dequeue_head (mb) == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::puts (mb->rd_ptr ());
+
+ // Free up the buffer memory and the Message_Block.
+ ACE_Service_Config::allocator ()->free (mb->rd_ptr ());
+ delete mb;
+
+ if (length == 0)
+ break;
+ }
+
+ return 0;
+}
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Insert thread into thr_mgr.
+ ACE_Thread_Control thread_control (&thr_mgr);
+
+ ACE_Read_Buffer rb (ACE_STDIN);
+
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new buffer.
+ char *buffer = rb.read ('\n');
+
+ if (buffer == 0)
+ {
+ // Send a 0-sized shutdown message to the other thread and
+ // exit.
+ if (msg_queue->enqueue_tail (new ACE_Message_Block ((size_t) 0)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ break;
+ }
+
+ // Enqueue the message in priority order.
+ else
+ {
+ // Allocate a new message, but have it "borrow" its memory
+ // from the buffer.
+ ACE_Message_Block *mb = new ACE_Message_Block (rb.size (),
+ ACE_Message_Block::MB_DATA,
+ 0,
+ buffer);
+ mb->msg_priority (rb.size ());
+ mb->wr_ptr (rb.size ());
+
+ ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n",
+ mb->msg_priority ()));
+ // Enqueue in priority order.
+ if (msg_queue->enqueue (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ }
+ }
+
+ // Now read all the items out in priority order (i.e., ordered by
+ // the size of the lines!).
+ consumer (msg_queue);
+
+ // The destructor of ACE_Thread_Control removes the exiting thread
+ // from the thr_mgr automatically.
+ return 0;
+}
+
+// Spawn off one thread that copies stdin to stdout in order of the
+// size of each line.
+
+int
+main (int, char *[])
+{
+ if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ // Wait for producer and consumer threads to exit.
+ thr_mgr.wait ();
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
new file mode 100644
index 00000000000..e679de5390e
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
@@ -0,0 +1,126 @@
+#include "Consumer_Router.h"
+// @(#)Consumer_Router.cpp 1.1 10/18/96
+
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
+
+int
+Consumer_Handler::open (void *a)
+{
+ CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
+}
+
+Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
+{
+}
+
+// Create a new handler that will interact with a consumer and point
+// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
+
+Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
+ : CONSUMER_ROUTER (tm)
+{
+}
+
+// Initialize the Router..
+
+int
+Consumer_Router::open (void *)
+{
+ ACE_ASSERT (this->is_reader ());
+ char *argv[3];
+
+ argv[0] = (char *) this->name ();
+ argv[1] = options.consumer_file ();
+ argv[2] = 0;
+
+ if (this->init (1, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+// return this->activate (options.t_flags ());
+}
+
+int
+Consumer_Router::close (u_long)
+{
+ ACE_ASSERT (this->is_reader ());
+ this->peer_map_.close ();
+ this->msg_queue ()->deactivate();
+ return 0;
+}
+
+
+// Handle incoming messages in a separate thread..
+
+int
+Consumer_Router::svc (void)
+{
+ ACE_Thread_Control tc (this->thr_mgr ());
+ ACE_Message_Block *mb = 0;
+
+ ACE_ASSERT (this->is_reader ());
+
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ()));
+
+ while (this->getq (mb) > 0)
+ {
+ if (this->put_next (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1);
+
+ }
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via destructor.
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s)..
+
+int
+Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ ACE_ASSERT (this->is_reader ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+{
+//printf("consumer-Router is routing : send_peers\n");
+ return this->send_peers (mb);
+}
+}
+
+// Return information about the Client_Router ACE_Module..
+
+int
+Consumer_Router::info (char **strp, size_t length) const
+{
+ char buf[BUFSIZ];
+ ACE_UPIPE_Addr addr;
+ const char *mod_name = this->name ();
+ ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf, "%s\t /%s %s",
+ mod_name, "upipe",
+ "# consumer router\n");
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/UPIPE_Event_Server/Consumer_Router.h b/examples/ASX/UPIPE_Event_Server/Consumer_Router.h
new file mode 100644
index 00000000000..66d3f28ba07
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Consumer_Router.h
@@ -0,0 +1,48 @@
+/* -*- C++ -*- */
+// @(#)Consumer_Router.h 1.1 10/18/96
+
+// The interface between one or more consumers and an Event Server
+// ACE_Stream.
+
+#if !defined (_CONSUMER_ROUTER_H)
+#define _CONSUMER_ROUTER_H
+
+#include "ace/Thread_Manager.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Addr.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Consumer_Handler; // Forward declaration....
+
+typedef long CONSUMER_KEY;
+
+typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER;
+
+class Consumer_Handler
+ : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>
+{
+public:
+ Consumer_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Consumer_Router : public CONSUMER_ROUTER
+{
+public:
+ Consumer_Router (ACE_Thread_Manager *thr_manager);
+
+protected:
+ // ACE_Task hooks..
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ // Dynamic linking hooks.
+ virtual int info (char **info_string, size_t length) const;
+};
+#endif /* ACE_HAS_THREADS */
+#endif /* _CONSUMER_ROUTER_H */
diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp
new file mode 100644
index 00000000000..977e5c4af9d
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp
@@ -0,0 +1,68 @@
+#include "Event_Analyzer.h"
+// @(#)Event_Analyzer.cpp 1.1 10/18/96
+
+
+#if defined (ACE_HAS_THREADS)
+
+int
+Event_Analyzer::open (void *)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::close (u_long)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd;
+
+ switch (cmd = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ }
+ return 0;
+}
+
+int
+Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ this->control (mb);
+
+ return this->put_next (mb);
+}
+
+int
+Event_Analyzer::init (int, char *[])
+{
+ return 0;
+}
+
+int
+Event_Analyzer::fini (void)
+{
+ return 0;
+}
+
+int
+Event_Analyzer::info (char **strp, size_t length) const
+{
+ const char *mod_name = this->name ();
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
new file mode 100644
index 00000000000..30053d5f45d
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
@@ -0,0 +1,34 @@
+/* -*- C++ -*- */
+// @(#)Event_Analyzer.h 1.1 10/18/96
+
+// Signal router.
+
+#if !defined (_EVENT_ANALYZER_H)
+#define _EVENT_ANALYZER_H
+
+#include "ace/Stream.h"
+#include "ace/Module.h"
+#include "ace/Task.h"
+#include "ace/Synch.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void) { return 0; }
+
+ // Dynamic linking hooks.
+ virtual int init (int argc, char *argv[]);
+ virtual int fini (void);
+ virtual int info (char **info_string, size_t length) const;
+
+private:
+ virtual int control (ACE_Message_Block *);
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _EVENT_ANALYZER_H */
diff --git a/examples/ASX/UPIPE_Event_Server/Makefile b/examples/ASX/UPIPE_Event_Server/Makefile
new file mode 100644
index 00000000000..bb0cdc00ed4
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Makefile
@@ -0,0 +1,455 @@
+#----------------------------------------------------------------------------
+# @(#)Makefile 1.1 10/18/96
+#
+# Makefile for the Event Server test
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = event_server
+
+FILES = Options \
+ Supplier_Router \
+ Event_Analyzer \
+ Consumer_Router \
+ Peer_Router
+
+LSRC = $(addsuffix .cpp,$(FILES))
+LOBJ = $(addsuffix .o,$(FILES))
+SHOBJ = $(addsuffix .so,$(FILES))
+
+LDLIBS = $(addprefix .shobj/,$(SHOBJ))
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
+include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/Options.o .shobj/Options.so: Options.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Supplier_Router.o .shobj/Supplier_Router.so: Supplier_Router.cpp Supplier_Router.h \
+ $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i \
+ $(WRAPPER_ROOT)/ace/SPIPE.h \
+ $(WRAPPER_ROOT)/ace/SPIPE.i \
+ $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Event_Analyzer.o .shobj/Event_Analyzer.so: Event_Analyzer.cpp Event_Analyzer.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i
+.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp Consumer_Router.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i \
+ $(WRAPPER_ROOT)/ace/SPIPE.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+.obj/Peer_Router.o .shobj/Peer_Router.so: Peer_Router.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Synch_T.cpp \
+ $(WRAPPER_ROOT)/ace/Synch_T.i \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Set.cpp \
+ $(WRAPPER_ROOT)/ace/Set.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.cpp \
+ $(WRAPPER_ROOT)/ace/Malloc_T.i \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.i \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ Peer_Router.h \
+ $(WRAPPER_ROOT)/ace/Acceptor.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.cpp \
+ $(WRAPPER_ROOT)/ace/Message_Queue.i \
+ $(WRAPPER_ROOT)/ace/Task.cpp \
+ $(WRAPPER_ROOT)/ace/Module.h \
+ $(WRAPPER_ROOT)/ace/Module.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.h \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \
+ $(WRAPPER_ROOT)/ace/Stream_Modules.i \
+ $(WRAPPER_ROOT)/ace/Module.i \
+ $(WRAPPER_ROOT)/ace/Task.i \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \
+ $(WRAPPER_ROOT)/ace/Dynamic.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.i \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies.cpp \
+ $(WRAPPER_ROOT)/ace/Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Acceptor.cpp \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/Stream.h \
+ $(WRAPPER_ROOT)/ace/Stream.cpp \
+ $(WRAPPER_ROOT)/ace/Stream.i \
+ $(WRAPPER_ROOT)/ace/SPIPE.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \
+ $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \
+ $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.cpp \
+ $(WRAPPER_ROOT)/ace/Map_Manager.i \
+ Peer_Router.cpp Options.h \
+ $(WRAPPER_ROOT)/ace/Profile_Timer.h \
+ Options.i
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/ASX/UPIPE_Event_Server/Options.cpp b/examples/ASX/UPIPE_Event_Server/Options.cpp
new file mode 100644
index 00000000000..2a334d7ff2a
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Options.cpp
@@ -0,0 +1,191 @@
+#include "ace/Get_Opt.h"
+// @(#)Options.cpp 1.1 10/18/96
+
+#include "ace/Synch.h"
+#include "ace/Log_Msg.h"
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+Options::Options (void)
+ : debugging_ (0),
+ verbosity_ (0),
+ low_water_mark_ (1024),
+ high_water_mark_ (8 * 1024),
+ message_size_ (128),
+ thr_count_ (4),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ consumer_port_ ("-p 10000"),
+ supplier_port_ ("-p 10001"),
+ consumer_file_ ("-f/tmp/conupipe"),
+ supplier_file_ ("-f/tmp/supupipe"),
+ t_flags_ (THR_DETACHED)
+{
+}
+
+Options::~Options (void)
+{
+}
+
+void Options::print_results (void)
+{
+ ACE_Profile_Timer::ACE_Elapsed_Time et;
+ this->itimer_.elapsed_time (et);
+
+#if defined (ACE_HAS_PRUSAGE_T)
+ prusage_t rusage;
+ this->itimer_.get_rusage (rusage);
+
+ if (options.verbose ())
+ {
+ ACE_OS::printf ("final concurrency hint = %d\n", ACE_OS::thr_getconcurrency ());
+ ACE_OS::printf ("%8d = lwpid\n"
+ "%8d = lwp count\n"
+ "%8d = minor page faults\n"
+ "%8d = major page faults\n"
+ "%8d = input blocks\n"
+ "%8d = output blocks\n"
+ "%8d = messages sent\n"
+ "%8d = messages received\n"
+ "%8d = signals received\n"
+ "%8ds, %dms = wait-cpu (latency) time\n"
+ "%8ds, %dms = user lock wait sleep time\n"
+ "%8ds, %dms = all other sleep time\n"
+ "%8d = voluntary context switches\n"
+ "%8d = involuntary context switches\n"
+ "%8d = system calls\n"
+ "%8d = chars read/written\n",
+ rusage.pr_lwpid,
+ rusage.pr_count,
+ rusage.pr_minf,
+ rusage.pr_majf,
+ rusage.pr_inblk,
+ rusage.pr_oublk,
+ rusage.pr_msnd,
+ rusage.pr_mrcv,
+ rusage.pr_sigs,
+ rusage.pr_wtime.tv_sec, rusage.pr_wtime.tv_nsec / 1000000,
+ rusage.pr_ltime.tv_sec, rusage.pr_ltime.tv_nsec / 1000000,
+ rusage.pr_slptime.tv_sec, rusage.pr_slptime.tv_nsec / 1000000,
+ rusage.pr_vctx,
+ rusage.pr_ictx,
+ rusage.pr_sysc,
+ rusage.pr_ioch);
+ }
+#endif /* ACE_HAS_PRUSAGE_T */
+
+ ACE_OS::printf ("---------------------\n"
+ "real time = %.3f\n"
+ "user time = %.3f\n"
+ "system time = %.3f\n"
+ "---------------------\n",
+ et.real_time, et.user_time, et.system_time);
+}
+
+// Manages the options.
+Options options;
+
+void
+Options::parse_args (int argc, char *argv[])
+{
+ ACE_LOG_MSG->open (argv[0]);
+
+ ACE_Get_Opt getopt (argc, argv, "C:c:bdH:i:L:l:M:nS:s:t:T:v");
+ int c;
+
+ while ((c = getopt ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ this->t_flags (THR_BOUND);
+ break;
+ case 'C':
+ this->consumer_file (getopt.optarg);
+ break;
+ case 'c':
+ this->consumer_port (getopt.optarg);
+ break;
+ case 'd':
+ this->debugging_ = 1;
+ break;
+ case 'H':
+ this->high_water_mark (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'i':
+ this->iterations (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'L':
+ this->low_water_mark (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'l':
+ this->initial_queue_length (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'M':
+ this->message_size (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'n':
+ this->t_flags (THR_NEW_LWP);
+ break;
+ case 'S':
+ this->supplier_file (getopt.optarg);
+ break;
+ case 's':
+ this->supplier_port (getopt.optarg);
+ break;
+ case 'T':
+ if (ACE_OS::strcasecmp (getopt.optarg, "ON") == 0)
+ ACE_Trace::start_tracing ();
+ else if (ACE_OS::strcasecmp (getopt.optarg, "OFF") == 0)
+ ACE_Trace::stop_tracing ();
+ break;
+ case 't':
+ this->thr_count (ACE_OS::atoi (getopt.optarg));
+ break;
+ case 'v':
+ this->verbosity_ = 1;
+ break;
+ default:
+ ::fprintf (stderr, "%s\n"
+ "\t[-b] (THR_BOUND)\n"
+ "\t[-C consumer file]\n"
+ "\t[-c consumer port]\n"
+ "\t[-d] (enable debugging)\n"
+ "\t[-H high water mark]\n"
+ "\t[-i number of test iterations]\n"
+ "\t[-L low water mark]\n"
+ "\t[-M] message size \n"
+ "\t[-n] (THR_NEW_LWP)\n"
+ "\t[-q max queue size]\n"
+ "\t[-S supplier file]\n"
+ "\t[-s supplier port]\n"
+ "\t[-t number of threads]\n"
+ "\t[-v] (verbose) \n",
+ argv[0]);
+ ::exit (1);
+ /* NOTREACHED */
+ break;
+ }
+
+ if (this->verbose ())
+ ACE_OS::printf ("%8d = initial concurrency hint\n"
+ "%8d = total iterations\n"
+ "%8d = thread count\n"
+ "%8d = low water mark\n"
+ "%8d = high water mark\n"
+ "%8d = message_size\n"
+ "%8d = initial queue length\n"
+ "%8d = THR_BOUND\n"
+ "%8d = THR_NEW_LWP\n",
+ ACE_OS::thr_getconcurrency (),
+ this->iterations (),
+ this->thr_count (),
+ this->low_water_mark (),
+ this->high_water_mark (),
+ this->message_size (),
+ this->initial_queue_length (),
+ (this->t_flags () & THR_BOUND) != 0,
+ (this->t_flags () & THR_NEW_LWP) != 0);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/UPIPE_Event_Server/Options.h b/examples/ASX/UPIPE_Event_Server/Options.h
new file mode 100644
index 00000000000..bd73eae2bae
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Options.h
@@ -0,0 +1,83 @@
+/* -*- C++ -*- */
+// @(#)Options.h 1.1 10/18/96
+
+// Option manager for Event Server.
+
+#if !defined (DEVICE_OPTIONS_H)
+#define DEVICE_OPTIONS_H
+
+#include "ace/OS.h"
+#include "ace/Profile_Timer.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Options
+{
+public:
+ Options (void);
+ ~Options (void);
+ void parse_args (int argc, char *argv[]);
+
+ void stop_timer (void);
+ void start_timer (void);
+
+ void thr_count (size_t count);
+ size_t thr_count (void);
+
+ void initial_queue_length (size_t length);
+ size_t initial_queue_length (void);
+
+ void high_water_mark (size_t size);
+ size_t high_water_mark (void);
+
+ void low_water_mark (size_t size);
+ size_t low_water_mark (void);
+
+ void message_size (size_t size);
+ size_t message_size (void);
+
+ void iterations (size_t n);
+ size_t iterations (void);
+
+ void t_flags (long flag);
+ long t_flags (void);
+
+ void supplier_port (char *port);
+ char *supplier_port (void);
+
+ void consumer_port (char *port);
+ char *consumer_port (void);
+
+ void supplier_file (char *file);
+ char *supplier_file (void);
+
+ void consumer_file (char *file);
+ char *consumer_file (void);
+
+ int debug (void);
+ int verbose (void);
+
+ void print_results (void);
+
+private:
+ ACE_Profile_Timer itimer_; // Time the process.
+ size_t thr_count_; // Number of threads to spawn.
+ long t_flags_; // Flags to thr_create().
+ size_t high_water_mark_; // ACE_Task high water mark.
+ size_t low_water_mark_; // ACE_Task low water mark.
+ size_t message_size_; // Size of a message.
+ size_t initial_queue_length_; // Initial number of items in the queue.
+ size_t iterations_; // Number of iterations to run the test program.
+ int debugging_; // Extra debugging info.
+ int verbosity_; // Extra verbose messages.
+ char *consumer_port_; // Port that the Consumer_Router is using.
+ char *supplier_port_; // Port that the Supplier_Router is using.
+ char *consumer_file_; // file that the Consumer_Router is using.
+ char *supplier_file_; // file that the Supplier_Router is using.
+};
+
+extern Options options;
+
+#include "Options.i"
+#endif /* ACE_HAS_THREADS */
+#endif /* DEVICE_OPTIONS_H */
diff --git a/examples/ASX/UPIPE_Event_Server/Options.i b/examples/ASX/UPIPE_Event_Server/Options.i
new file mode 100644
index 00000000000..226e46b1548
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Options.i
@@ -0,0 +1,161 @@
+/* -*- C++ -*- */
+// @(#)Options.i 1.1 10/18/96
+
+// Option manager for ustreams.
+
+inline void
+Options::supplier_port (char *port)
+{
+ this->supplier_port_ = port;
+}
+
+inline char *
+Options::supplier_port (void)
+{
+ return this->supplier_port_;
+}
+
+inline void
+Options::supplier_file (char *file)
+{
+ this->supplier_file_ = file;
+}
+
+inline char *
+Options::supplier_file (void)
+{
+ return this->supplier_file_;
+}
+
+inline void
+Options::consumer_file (char *file)
+{
+ this->consumer_file_ = file;
+}
+
+inline char *
+Options::consumer_file (void)
+{
+ return this->consumer_file_;
+}
+
+inline void
+Options::consumer_port (char *port)
+{
+ this->consumer_port_ = port;
+}
+
+inline char *
+Options::consumer_port (void)
+{
+ return this->consumer_port_;
+}
+
+inline void
+Options::start_timer (void)
+{
+ this->itimer_.start ();
+}
+
+inline void
+Options::stop_timer (void)
+{
+ this->itimer_.stop ();
+}
+
+inline void
+Options::thr_count (size_t count)
+{
+ this->thr_count_ = count;
+}
+
+inline size_t
+Options::thr_count (void)
+{
+ return this->thr_count_;
+}
+
+inline void
+Options::initial_queue_length (size_t length)
+{
+ this->initial_queue_length_ = length;
+}
+
+inline size_t
+Options::initial_queue_length (void)
+{
+ return this->initial_queue_length_;
+}
+
+inline void
+Options::high_water_mark (size_t size)
+{
+ this->high_water_mark_ = size;
+}
+
+inline size_t
+Options::high_water_mark (void)
+{
+ return this->high_water_mark_;
+}
+
+inline void
+Options::low_water_mark (size_t size)
+{
+ this->low_water_mark_ = size;
+}
+
+inline size_t
+Options::low_water_mark (void)
+{
+ return this->low_water_mark_;
+}
+
+inline void
+Options::message_size (size_t size)
+{
+ this->message_size_ = size;
+}
+
+inline size_t
+Options::message_size (void)
+{
+ return this->message_size_;
+}
+
+inline void
+Options::iterations (size_t n)
+{
+ this->iterations_ = n;
+}
+
+inline size_t
+Options::iterations (void)
+{
+ return this->iterations_;
+}
+
+inline void
+Options::t_flags (long flag)
+{
+ this->t_flags_ |= flag;
+}
+
+inline long
+Options::t_flags (void)
+{
+ return this->t_flags_;
+}
+
+inline int
+Options::debug (void)
+{
+ return this->debugging_;
+}
+
+inline int
+Options::verbose (void)
+{
+ return this->verbosity_;
+}
+
diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
new file mode 100644
index 00000000000..6aba899f4ea
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
@@ -0,0 +1,273 @@
+#if !defined (_PEER_ROUTER_C)
+// @(#)Peer_Router.cpp 1.1 10/18/96
+
+#define _PEER_ROUTER_C
+
+#include "ace/Get_Opt.h"
+#include "ace/Service_Config.h"
+#include "ace/Log_Msg.h"
+#include "Peer_Router.h"
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Define some short-hand macros to deal with long templates
+// names...
+
+#define PH PEER_HANDLER
+#define PA PEER_ACCEPTOR
+#define PAD PEER_ADDR
+#define PK PEER_KEY
+#define PM PEER_MAP
+
+template <class PH, class PK> int
+Acceptor_Factory<PH, PK>::init (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "df:", 0);
+ ACE_UPIPE_Addr addr;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'f':
+ addr.set (get_opt.optarg);
+ break;
+ case 'd':
+ break;
+ default:
+ break;
+ }
+
+ if (this->open (addr, ACE_Service_Config::reactor ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
+ return 0;
+}
+
+template <class PH, class PK>
+Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
+ : pr_ (pr)
+{
+}
+
+template <class PH, class PK> Peer_Router<PH, PK> *
+Acceptor_Factory<PH, PK>::router (void)
+{
+ return this->pr_;
+}
+
+template <class ROUTER, class KEY>
+Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
+ : ACE_Svc_Handler<ACE_UPIPE_Stream, ACE_UPIPE_Addr, ACE_MT_SYNCH> (tm)
+{
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::svc (void)
+{
+ ACE_Thread_Control thread_control (tm);
+ // just a try !!
+ // we're just reading from our ACE_Message_Queue
+ ACE_Message_Block *db, *hb;
+ int n;
+ // do an endless loop
+ for (;;)
+ {
+ db = new ACE_Message_Block (BUFSIZ);
+ hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
+
+ if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down \n"));
+ return -1; // We do not need to be deregistered by reactor
+ // as we were not registered at all
+ }
+ else // Transform incoming buffer into a Message and pass downstream.
+ {
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
+ hb->wr_ptr (sizeof (long));
+ if (this->router_task_->reply (hb) == -1)
+ {
+ cout << "Peer_Handler.svc : router_task->reply failed" << endl ;
+ return -1;
+ }
+
+ // return this->router_task_->reply (hb) == -1 ? -1 : 0;
+ }
+ }
+ return 0;
+}
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ return this->peer ().send_n (mb->rd_ptr (), mb->length ());
+}
+
+// Create a new handler and point its ROUTER_TASK_ data member to the
+// corresponding router. Note that this router is extracted out of
+// the Acceptor_Factory * that is passed in via the
+// ACE_Acceptor::handle_input() method.
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::open (void *a)
+{
+ char buf[BUFSIZ], *p = buf;
+
+ if (this->router_task_->info (&p, sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n",
+ buf, this->get_handle (), a));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
+
+ if ( this->activate (options.t_flags ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
+ else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
+ return 0;
+}
+
+// Receive a message from a supplier..
+
+template <class ROUTER, class KEY> int
+Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
+{
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
+// ACE_Service_Config::reactor ()->remove_handler(h,
+// ACE_Event_Handler::RWE_MASK
+// |ACE_Event_Handler::DONT_CALL);
+// this method should be called only if the peer shuts down
+// so we deactivate our ACE_Message_Queue to awake our svc thread
+
+ return 0;
+
+#if 0
+ ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
+ int n;
+
+ if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
+ return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
+ }
+ else // Transform incoming buffer into a Message and pass downstream.
+ {
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
+ hb->wr_ptr (sizeof (long));
+ return this->router_task_->reply (hb) == -1 ? -1 : 0;
+ }
+#endif
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
+ : ACE_Task<ACE_MT_SYNCH> (tm)
+{
+}
+
+template <class PH, class PK> int
+Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
+{
+ ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+ ACE_Message_Block *data_block = mb->cont ();
+ for (ACE_Map_Entry<PK, PH *> *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_));
+
+ iterations++;
+ bytes += ss->int_id_->put (data_block);
+ }
+
+ delete mb;
+ return bytes == 0 ? 0 : bytes / iterations;
+}
+
+template <class PH, class PK>
+Peer_Router<PH, PK>::~Peer_Router (void)
+{
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::fini (void)
+{
+ delete this->acceptor_;
+ return 0;
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
+{
+ ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
+ ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
+
+ switch (command = ioc->cmd ())
+ {
+ case ACE_IO_Cntl_Msg::SET_LWM:
+ case ACE_IO_Cntl_Msg::SET_HWM:
+ this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
+ break;
+ default:
+ return -1;
+ }
+ return 0;
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::unbind_peer (PK key)
+{
+ return this->peer_map_.unbind (key);
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
+{
+ PH *peer_handler = (PH *) ph;
+ return this->peer_map_.bind (key, peer_handler);
+}
+
+template <class PH, class PK> ACE_INLINE int
+Peer_Router<PH, PK>::init (int argc, char *argv[])
+{
+ this->acceptor_ = new Acceptor_Factory <PH, PK> (this);
+
+ if (this->acceptor_->init (argc, argv) == -1
+ || this->peer_map_.open () == -1)
+ return -1;
+ else
+ {
+ ACE_UPIPE_Addr addr;
+ ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor ();
+
+ if (pa.get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, file = %s, fd = %d, this = %u\n",
+ this->name (), addr.get_path_name (), pa.get_handle (), this));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1);
+ }
+ return 0;
+}
+
+#undef PH
+#undef PA
+#undef PAD
+#undef PK
+#undef PM
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_C */
diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.h b/examples/ASX/UPIPE_Event_Server/Peer_Router.h
new file mode 100644
index 00000000000..b344497d4b1
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.h
@@ -0,0 +1,116 @@
+/* -*- C++ -*- */
+// @(#)Peer_Router.h 1.1 10/18/96
+
+// The interface between one or more peers and a stream. A peer
+// typically runs remotely on another machine.
+
+#if !defined (_PEER_ROUTER_H)
+#define _PEER_ROUTER_H
+
+#include "ace/Acceptor.h"
+#include "ace/Svc_Handler.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Addr.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Map_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Forward declaration.
+template <class PEER_HANDLER, class KEY>
+class Peer_Router;
+
+template <class PEER_HANDLER, class KEY>
+class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_UPIPE_ACCEPTOR>
+{
+public:
+ Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr);
+ Peer_Router<PEER_HANDLER, KEY> *router (void);
+
+ int init (int argc, char *argv[]);
+ // Initialize the acceptor when it's linked dynamically.
+
+private:
+ Peer_Router<PEER_HANDLER, KEY> *pr_;
+};
+
+// Receive input from a Peer..
+template <class ROUTER, class KEY>
+class Peer_Handler : public ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH>
+{
+public:
+ Peer_Handler (ACE_Thread_Manager * = 0);
+
+ virtual int open (void * = 0);
+ // Called by the ACE_Acceptor::handle_input() to activate this object.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive input from the peer..
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ // Send output to a peer.
+
+protected:
+ ROUTER *router_task_;
+ // Pointer to write task..
+
+private:
+ // Don't need this method here...
+ virtual int svc (void);
+};
+
+// This abstract base class provides mechanisms for routing messages
+// to/from a ACE_Stream from/to one or more peers (which are typically
+// running on remote hosts). A subclass of Peer_Router overrides the
+// open(), close(), and put() methods in order to specialize the
+// behavior of the router to meet application-specific requirements.
+
+template <class PEER_HANDLER, class PEER_KEY>
+class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Peer_Router (ACE_Thread_Manager * = 0);
+ ~Peer_Router (void);
+
+ typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER;
+
+ // Remove a PEER_HANDLER from the PEER_MAP.
+ virtual int unbind_peer (PEER_KEY);
+
+ // Add a PEER_HANDLER to the PEER_MAP.
+ virtual int bind_peer (PEER_KEY, HANDLER *);
+
+ // Send the message block to the peer(s)..
+ int send_peers (ACE_Message_Block *mb);
+
+protected:
+// Handle control messages arriving from adjacent Modules.
+ virtual int control (ACE_Message_Block *);
+
+ // Map used to keep track of active peers.
+ ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> peer_map_;
+
+ // Dynamic linking initialization hooks inherited from ACE_Task.
+ virtual int init (int argc, char *argv[]);
+ virtual int fini (void);
+
+ // Factory for accepting new PEER_HANDLERs.
+ Acceptor_Factory<PEER_HANDLER, PEER_KEY> *acceptor_;
+
+private:
+// Prevent copies and pass-by-value.
+ Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &);
+ void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &);
+};
+
+#if defined (__ACE_INLINE__)
+#define ACE_INLINE inline
+#else
+#define ACE_INLINE
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Peer_Router.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+#endif /* ACE_HAS_THREADS */
+#endif /* _PEER_ROUTER_H */
diff --git a/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp b/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp
new file mode 100644
index 00000000000..414fc5c9ccf
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp
@@ -0,0 +1,126 @@
+#include "Supplier_Router.h"
+// @(#)Supplier_Router.cpp 1.1 10/18/96
+
+#include "Options.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
+
+int
+Supplier_Handler::open (void *a)
+{
+ SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
+}
+
+Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
+{
+}
+
+// Create a new router and associate it with the REACTOR parameter..
+
+Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
+ : SUPPLIER_ROUTER (tm)
+{
+}
+
+// Handle incoming messages in a separate thread..
+
+int
+Supplier_Router::svc (void)
+{
+ ACE_ASSERT (this->is_writer ());
+
+ ACE_Thread_Control tc (this->thr_mgr ());
+ ACE_Message_Block *message_block = 0;
+
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ()));
+
+ while (this->getq (message_block) > 0)
+ {
+ if (this->put_next (message_block) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1);
+ }
+
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via ACE_Thread_Control's destructor.
+}
+
+// Initialize the Router..
+
+int
+Supplier_Router::open (void *)
+{
+ ACE_ASSERT (this->is_writer ());
+
+ char *argv[3];
+
+ argv[0] = (char *) this->name ();
+ argv[1] = options.supplier_file ();
+ argv[2] = 0;
+
+ if (this->init (1, &argv[1]) == -1)
+ return -1;
+ // Make this an active object. Return this->activate
+ // (options.t_flags ());
+}
+
+// Close down the router..
+
+int
+Supplier_Router::close (u_long)
+{
+ ACE_ASSERT (this->is_writer ());
+ this->peer_map_.close ();
+ this->msg_queue ()->deactivate();
+ return 0;
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s)..
+
+int
+Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ ACE_ASSERT (this->is_writer ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+ {
+//printf("supplier-Router is routing: send_peers\n");
+ return this->send_peers (mb);
+ }
+}
+
+// Return information about the Supplier_Router ACE_Module..
+
+int
+Supplier_Router::info (char **strp, size_t length) const
+{
+ char buf[BUFSIZ];
+ ACE_UPIPE_Addr addr;
+ const char *mod_name = this->name ();
+ ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf, "%s\t %s/ %s",
+ mod_name, "upipe",
+ "# supplier router\n");
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/UPIPE_Event_Server/Supplier_Router.h b/examples/ASX/UPIPE_Event_Server/Supplier_Router.h
new file mode 100644
index 00000000000..bb304042586
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/Supplier_Router.h
@@ -0,0 +1,52 @@
+/* -*- C++ -*- */
+// @(#)Supplier_Router.h 1.1 10/18/96
+
+// The interface between a supplier and an Event Service ACE_Stream.
+
+#if !defined (_SUPPLIER_ROUTER_H)
+#define _SUPPLIER_ROUTER_H
+
+#include "ace/UPIPE_Addr.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/Map_Manager.h"
+#include "ace/Svc_Handler.h"
+#include "Peer_Router.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Forward declaration.
+class Supplier_Handler;
+
+// Type of search key for SUPPLIER_MAP.
+typedef long SUPPLIER_KEY;
+
+// Instantiated type for routing messages to suppliers.
+
+typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER;
+
+class Supplier_Handler
+ : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>
+{
+public:
+ Supplier_Handler (ACE_Thread_Manager *tm = 0);
+ virtual int open (void *);
+};
+
+class Supplier_Router : public SUPPLIER_ROUTER
+{
+public:
+ Supplier_Router (ACE_Thread_Manager *);
+
+protected:
+ // ACE_Task hooks..
+ virtual int open (void *a = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ virtual int svc (void);
+
+ // Dynamic linking hooks inherited from Peer_Router.
+ virtual int info (char **info_string, size_t length) const;
+};
+
+#endif /* ACE_HAS_THREADS */
+#endif /* _SUPPLIER_ROUTER_H */
diff --git a/examples/ASX/UPIPE_Event_Server/event_server.cpp b/examples/ASX/UPIPE_Event_Server/event_server.cpp
new file mode 100644
index 00000000000..bdafdb23de7
--- /dev/null
+++ b/examples/ASX/UPIPE_Event_Server/event_server.cpp
@@ -0,0 +1,252 @@
+// Test the event server.
+// @(#)event_server.cpp 1.1 10/18/96
+
+#include "ace/Log_Msg.h"
+#include "ace/Stream.h"
+#include "ace/Service_Config.h"
+#include "Options.h"
+#include "Consumer_Router.h"
+#include "Event_Analyzer.h"
+#include "Supplier_Router.h"
+#include "ace/UPIPE_Acceptor.h"
+#include "ace/UPIPE_Connector.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
+typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
+
+// Handle SIGINT and terminate the entire application.
+
+class Quit_Handler : public ACE_Sig_Adapter
+{
+public:
+ Quit_Handler (void);
+ virtual int handle_input (ACE_HANDLE fd);
+};
+
+Quit_Handler::Quit_Handler (void)
+ : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Service_Config::end_reactor_event_loop))
+{
+ // Register to trap input from the user.
+ if (ACE::register_stdin_handler (this,
+ ACE_Service_Config::reactor (),
+ ACE_Service_Config::thr_mgr ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
+ // Register to trap the SIGINT signal.
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (SIGINT, this) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
+}
+
+int
+Quit_Handler::handle_input (ACE_HANDLE)
+{
+ options.stop_timer ();
+ ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n"));
+ options.print_results ();
+
+ ACE_Service_Config::end_reactor_event_loop ();
+ return 0;
+}
+
+static void *
+consumer (void *)
+{
+ ACE_UPIPE_Stream c_stream;
+ ACE_UPIPE_Addr c_addr ("/tmp/conupipe");
+
+ int iter = options.iterations ();
+ int verb = options.verbose ();
+ int msiz = options.message_size ();
+ int secs, par1, par2, i;
+ time_t currsec;
+
+ if (verb)
+ cout << "consumer starting connect" << endl;
+
+ ACE_UPIPE_Connector con;
+
+ if (con.connect (c_stream, c_addr) == -1)
+ ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
+ else
+ cout << "consumer :we're connected" << endl;
+
+ char buf[BUFSIZ];
+ int n;
+ ACE_Message_Block *mb_p;
+
+ int done = 0;
+ int cnt = 0;
+ ACE_OS::time (&currsec);
+
+ par1= (time_t) currsec;
+
+ while (done == 0
+ && ((n = c_stream.recv (mb_p)) != -1))
+ if (mb_p->length () > 1)
+ {
+ cnt++;
+ if (verb)
+ cout << " consumer received message !!!!!! "
+ << mb_p->rd_ptr () << endl;
+ }
+ else
+ {
+ if (verb)
+ cout << "consumer got last mb"
+ << (char) * (mb_p->rd_ptr ()) << endl;
+ c_stream.close ();
+ done = 1;
+ }
+
+ ACE_OS::time (&currsec);
+ par2 = (time_t) currsec;
+
+ secs = par2 - par1;
+
+ if (secs <= 0)
+ secs=1;
+
+ cout << "consumer got " << cnt << " messages of size " << msiz
+ << "within " << secs << " seconds" << endl;
+
+ ACE_OS::sleep (2);
+ cout << "consumer terminating " << endl;
+ return 0;
+}
+
+static void *
+supplier (void *dummy)
+{
+ ACE_UPIPE_Stream s_stream;
+ ACE_UPIPE_Addr serv_addr ("/tmp/supupipe");
+ ACE_UPIPE_Connector con;
+
+ int iter = options.iterations ();
+ int verb = options.verbose ();
+ int msiz = options.message_size ();
+ cout << "supplier starting connect" << endl;
+
+ if (con.connect (s_stream, serv_addr) == -1)
+ ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
+
+ cout << "supplier : we're connected" << endl;
+ int n;
+ n = 0;
+ ACE_Message_Block * mb_p;
+
+ while (n < iter)
+ {
+ mb_p = new ACE_Message_Block (msiz);
+ strcpy (mb_p->rd_ptr (), (char *) dummy);
+ mb_p->length (msiz);
+ if (verb)
+ cout << "supplier sending 1 message_block" << endl;
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send failed" << endl;
+ return (void *) -1;
+ }
+ n++;
+ }
+
+ mb_p = new ACE_Message_Block (10);
+ mb_p->length (1);
+ *mb_p->rd_ptr () = 'g';
+
+ cout << "supplier sending last message_block" << endl;
+
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send last mb failed" << endl;
+ return (void *) -1;
+ }
+ mb_p = new ACE_Message_Block (10);
+ mb_p->length (0);
+
+ if (verb)
+ cout << "supplier sending very last message_block" << endl;
+
+ if (s_stream.send (mb_p) == -1)
+ {
+ cout << "supplier send very last mb failed" << endl;
+ return (void *) -1;
+ }
+
+ ACE_OS::sleep (2);
+ cout << "supplier terminating" << endl;
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon;
+
+ options.parse_args (argc, argv);
+ options.start_timer ();
+
+ // Primary ACE_Stream for EVENT_SERVER application.
+ MT_Stream event_server;
+
+ // Enable graceful shutdowns....
+ Quit_Handler quit_handler;
+
+ // Create the modules..
+
+ MT_Module *sr = new MT_Module ("Supplier_Router",
+ new Supplier_Router (ACE_Service_Config::thr_mgr ()));
+ MT_Module *ea = new MT_Module ("Event_Analyzer",
+ new Event_Analyzer,
+ new Event_Analyzer);
+ MT_Module *cr = new MT_Module ("Consumer_Router",
+ 0, // 0 triggers the creation of a ACE_Thru_Task...
+ new Consumer_Router (ACE_Service_Config::thr_mgr ()));
+
+ // Push the modules onto the event_server stream.
+
+ if (event_server.push (sr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
+
+ if (event_server.push (ea) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);
+
+ if (event_server.push (cr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);
+
+ // Set the high and low water marks appropriately.
+
+ int wm = options.low_water_mark ();
+
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);
+
+ wm = options.high_water_mark ();
+ if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);
+
+ // spawn the two threads.
+
+ if (ACE_Service_Config::thr_mgr ()->spawn (ACE_THR_FUNC (consumer), (void *) 0,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ else if (ACE_Service_Config::thr_mgr ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello",
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ // Perform the main event loop waiting for the user to type ^C or to
+ // enter a line on the ACE_STDIN.
+
+ daemon.run_reactor_event_loop ();
+
+ ACE_DEBUG ((LM_DEBUG, "main exiting\n"));
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1);
+}
+#endif /* ACE_HAS_THREADS */