diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
commit | 99aa8c60282c7b8072eb35eb9ac815702f5bf586 (patch) | |
tree | bda96bf8c3a4c2875a083d7b16720533c8ffeaf4 /ACE/examples/ASX | |
parent | c4078c377d74290ebe4e66da0b4975da91732376 (diff) | |
download | ATCD-99aa8c60282c7b8072eb35eb9ac815702f5bf586.tar.gz |
undoing accidental deletion
Diffstat (limited to 'ACE/examples/ASX')
47 files changed, 5184 insertions, 0 deletions
diff --git a/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc b/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc new file mode 100644 index 00000000000..b5c40beb84c --- /dev/null +++ b/ACE/examples/ASX/CCM_App/ASX_CCM_App.mpc @@ -0,0 +1,26 @@ +// -*- MPC -*- +// $Id$ + +project(*Lib) : acelib { + sharedname = ccm_app + Source_Files { + CCM_App.cpp + } +} + +project(*Server) : aceexe { + exename = server + after += ASX_CCM_App_Lib + Source_Files { + SC_Server.cpp + } +} + +project(*Client) : aceexe { + exename = client + after += ASX_CCM_App_Server + Source_Files { + SC_Client.cpp + } +} + diff --git a/ACE/examples/ASX/CCM_App/CCM_App.cpp b/ACE/examples/ASX/CCM_App/CCM_App.cpp new file mode 100644 index 00000000000..540dd545d90 --- /dev/null +++ b/ACE/examples/ASX/CCM_App/CCM_App.cpp @@ -0,0 +1,122 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/Stream.h" +#include "ace/Task.h" +#include "ace/Module.h" +#include "ace/svc_export.h" + +ACE_RCSID(CCM_App, CCM_App, "$Id$") + +typedef ACE_Task<ACE_SYNCH> MT_Task; +typedef ACE_Stream<ACE_SYNCH> MT_Stream; +typedef ACE_Module<ACE_SYNCH> MT_Module; + +class ACE_Svc_Export Test_Task : public MT_Task +{ +public: + //FUZZ: disable check_for_lack_ACE_OS + virtual int open (void *); + virtual int close (u_long); + //FUZZ: enable check_for_lack_ACE_OS + + virtual int init (int, ACE_TCHAR *[]); + virtual int fini (void); + virtual int suspend (void); + virtual int resume (void); +}; + +int +Test_Task::open (void *) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("opening %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + return 0; +} + +int +Test_Task::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("closing %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + return 0; +} + +int +Test_Task::suspend (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("suspending in %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + return 0; +} + +int +Test_Task::resume (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("resuming in %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + return 0; +} + +int +Test_Task::init (int, ACE_TCHAR *[]) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("initializing %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + + return 0; +} + +int +Test_Task::fini (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("finalizing %s\n"), + this->name () ? this->name () : ACE_TEXT ("task"))); + return 0; +} + +// Factories used to control configuration. + +ACE_SVC_FACTORY_DECLARE (Test_Task) +ACE_SVC_FACTORY_DEFINE (Test_Task) + +// Dynamically linked functions used to control configuration. + +extern "C" ACE_Svc_Export MT_Stream *make_stream (void); +extern "C" ACE_Svc_Export MT_Module *make_da (void); +extern "C" ACE_Svc_Export MT_Module *make_ea (void); +extern "C" ACE_Svc_Export MT_Module *make_mr (void); + +MT_Stream * +make_stream (void) +{ + return new MT_Stream; +} + +MT_Module * +make_da (void) +{ + return new MT_Module (ACE_TEXT ("Device_Adapter"), + new Test_Task, new Test_Task); +} + +MT_Module * +make_ea (void) +{ + return new MT_Module (ACE_TEXT ("Event_Analyzer"), + new Test_Task, new Test_Task); +} + +MT_Module * +make_mr (void) +{ + return new MT_Module (ACE_TEXT ("Multicast_Router"), + new Test_Task, new Test_Task); +} diff --git a/ACE/examples/ASX/CCM_App/Makefile.am b/ACE/examples/ASX/CCM_App/Makefile.am new file mode 100644 index 00000000000..78ded02a046 --- /dev/null +++ b/ACE/examples/ASX/CCM_App/Makefile.am @@ -0,0 +1,59 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.ASX_CCM_App_Lib.am + +noinst_LTLIBRARIES = libccm_app.la + +libccm_app_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +libccm_app_la_SOURCES = \ + CCM_App.cpp + +## Makefile.ASX_CCM_App_Server.am + +noinst_PROGRAMS = server + +server_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +server_SOURCES = \ + SC_Server.cpp + +server_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.ASX_CCM_App_Client.am + +noinst_PROGRAMS += client + +client_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +client_SOURCES = \ + SC_Client.cpp + +client_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/CCM_App/SC_Client.cpp b/ACE/examples/ASX/CCM_App/SC_Client.cpp new file mode 100644 index 00000000000..fbd4439784a --- /dev/null +++ b/ACE/examples/ASX/CCM_App/SC_Client.cpp @@ -0,0 +1,13 @@ +// $Id$ + +#include "ace/ACE.h" + +ACE_RCSID(CCM_App, SC_Client, "$Id$") + +// Pretty simple, eh? ;-) + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + return 0; +} diff --git a/ACE/examples/ASX/CCM_App/SC_Server.cpp b/ACE/examples/ASX/CCM_App/SC_Server.cpp new file mode 100644 index 00000000000..b93e49005bf --- /dev/null +++ b/ACE/examples/ASX/CCM_App/SC_Server.cpp @@ -0,0 +1,86 @@ +// $Id$ + +// Simple driver program for the server. This driver dynamically +// links in all the services in the <svc.conf> file. + +#include "ace/OS_NS_unistd.h" +#include "ace/OS_main.h" +#include "ace/Service_Config.h" +#include "ace/Thread_Manager.h" +#include "ace/Reactor.h" +#include "ace/Sig_Adapter.h" + +ACE_RCSID(CCM_App, SC_Server, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + virtual int handle_input (ACE_HANDLE handle); + virtual int handle_close (ACE_HANDLE, + ACE_Reactor_Mask); +}; + +int +Event_Handler::handle_input (ACE_HANDLE handle) +{ + char buf[BUFSIZ]; + ssize_t n = ACE_OS::read (handle, buf, sizeof buf); + + if (n == -1) + return -1; + else if (n == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("shutting down on EOF\n")), + -1); + else if (ACE_OS::write (ACE_STDOUT, buf, n) != n) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("%p\n"), ACE_TEXT ("write failed")), + -1); + else + return 0; +} + +int +Event_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("closing Event_Handler\n"))); + ACE_Reactor::instance ()->end_reactor_event_loop (); + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_Service_Config loggerd; + Event_Handler handler; + ACE_Sig_Adapter shutdown_handler ((ACE_Sig_Handler_Ex) ACE_Reactor::end_event_loop); + + if (ACE_Event_Handler::register_stdin_handler (&handler, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_stdin_handler"))); + + if (loggerd.open (argc, + argv, + ACE_DEFAULT_LOGGER_KEY, + // Don't ignore static services! + 0) == -1 && errno != ENOENT) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n%a"), + ACE_TEXT ("open"), + 1)); + else if (ACE_Reactor::instance ()->register_handler + (SIGINT, &shutdown_handler) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n%a"), + ACE_TEXT ("register_handler"), + 1)); + + // Perform logging service until we receive SIGINT. + + ACE_Reactor::instance ()->run_reactor_event_loop (); + return 0; +} diff --git a/ACE/examples/ASX/CCM_App/svc.conf b/ACE/examples/ASX/CCM_App/svc.conf new file mode 100644 index 00000000000..b894e6e69e5 --- /dev/null +++ b/ACE/examples/ASX/CCM_App/svc.conf @@ -0,0 +1,21 @@ +static ACE_Service_Manager "-d -p 4911" + +dynamic Test_Task Service_Object *CCM_App:_make_Test_Task() "-p 3000" + +stream dynamic CCM_App STREAM *CCM_App:make_stream() active +{ + dynamic Device_Adapter Module *CCM_App:make_da() + dynamic Event_Analyzer Module *CCM_App:make_ea() + dynamic Multicast_Router Module *CCM_App:make_mr() "-p 3001" +} + +stream CCM_App +{ + remove Device_Adapter +# remove Event_Analyzer +# remove Multicast_Router +} + +# remove CCM_App +remove Test_Task + diff --git a/ACE/examples/ASX/CCM_App/svc.conf.xml b/ACE/examples/ASX/CCM_App/svc.conf.xml new file mode 100644 index 00000000000..e743d4ed986 --- /dev/null +++ b/ACE/examples/ASX/CCM_App/svc.conf.xml @@ -0,0 +1,33 @@ +<?xml version='1.0'?> +<!-- Converted from svc.conf by svcconf-convert.pl --> +<ACE_Svc_Conf> + <static id="ACE_Service_Manager" params="-d -p 4911"/> + <dynamic id="Test_Task" type="Service_Object"> + <initializer path="CCM_App" init="_make_Test_Task" params="-p 3000"/> + </dynamic> + <streamdef> + <dynamic id="CCM_App" type="STREAM"> + <initializer path="CCM_App" init="make_stream"/> + </dynamic> + <module> + <dynamic id="Device_Adapter" type="Module"> + <initializer path="CCM_App" init="make_da"/> + </dynamic> + <dynamic id="Event_Analyzer" type="Module"> + <initializer path="CCM_App" init="make_ea"/> + </dynamic> + <dynamic id="Multicast_Router" type="Module"> + <initializer path="CCM_App" init="make_mr" params="-p 3001"/> + </dynamic> + </module> + </streamdef> + <stream id="CCM_App"> + <module> + <remove id="Device_Adapter"/> + <!-- remove Event_Analyzer --> + <!-- remove Multicast_Router --> + </module> + </stream> + <!-- remove CCM_App --> + <remove id="Test_Task"/> +</ACE_Svc_Conf> diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp new file mode 100644 index 00000000000..d5215ffcd26 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp @@ -0,0 +1,159 @@ +// $Id$ + +#include "ace/os_include/os_assert.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "Consumer_Router.h" +#include "Options.h" + +ACE_RCSID(Event_Server, Consumer_Router, "$Id$") + +Consumer_Router::Consumer_Router (Peer_Router_Context *prc) + : Peer_Router (prc) +{ + this->context ()->duplicate (); +} + +// Initialize the Router. + +int +Consumer_Router::open (void *) +{ + if (this->is_writer ()) + { + // Set the <Peer_Router_Context> to point back to us so that if + // any Consumer's "accidentally" send us data we'll be able to + // handle it by passing it down the stream. + this->context ()->peer_router (this); + + // Increment the reference count. + this->context ()->duplicate (); + + // Make this an active object to handle the error cases in a + // separate thread. This is mostly just for illustration, i.e., + // it's probably overkill to use a thread for this! + return this->activate (Options::instance ()->t_flags ()); + } + else // if (this->is_reader ()) + + // Nothing to do since this side is primarily used to transmit to + // Consumers, rather than receive. + return 0; +} + +int +Consumer_Router::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) closing Consumer_Router %s\n"), + this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"))); + + if (this->is_writer ()) + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + + // Both writer and reader call <release>, so the context knows when + // to clean itself up. + this->context ()->release (); + return 0; +} + +// Handle incoming messages in a separate thread. + +int +Consumer_Router::svc (void) +{ + assert (this->is_writer ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) starting svc in Consumer_Router\n"))); + + for (ACE_Message_Block *mb = 0; + this->getq (mb) >= 0; + ) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) warning: Consumer_Router is ") + ACE_TEXT ("forwarding a message to Supplier_Router\n"))); + + // Pass this message down to the next Module's writer Task. + if (this->put_next (mb) == -1) + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")), + -1); + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) stopping svc in Consumer_Router\n"))); + return 0; + // Note the implicit ACE_OS::thr_exit() via destructor. +} + +// Send a <Message_Block> to the supplier(s). + +int +Consumer_Router::put (ACE_Message_Block *mb, + ACE_Time_Value *) +{ + // Perform the necessary control operations before passing the + // message down the stream. + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + + // If we're the reader then we're responsible for broadcasting + // messages to Consumers. + + else if (this->is_reader ()) + { + if (this->context ()->send_peers (mb) == -1) + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")), + -1); + else + return 0; + } + else // if (this->is_writer ()) + + // Queue up the message to processed by <Consumer_Router::svc> + // Since we don't expect to be getting many of these messages, we + // queue them up and run them in a separate thread to avoid taxing + // the main thread. + return this->putq (mb); +} + +// Return information about the <Consumer_Router>. +#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR) +# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n") +#else +# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n") +#endif /* ACE_WIN32 || !ACE_USES_WCHAR */ + +int +Consumer_Router::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_INET_Addr addr; + const ACE_TCHAR *mod_name = this->name (); + + if (this->context ()->acceptor ().get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, + FMTSTR, + mod_name, + addr.get_port_number (), + ACE_TEXT ("tcp"), + ACE_TEXT ("# consumer router"), + this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")); + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + + return ACE_OS::strlen (mod_name); +} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h new file mode 100644 index 00000000000..062a07116ea --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.h @@ -0,0 +1,71 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef _CONSUMER_ROUTER_H +#define _CONSUMER_ROUTER_H + +#include "ace/SOCK_Acceptor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/UPIPE_Acceptor.h" +#include "ace/Svc_Handler.h" +#include "ace/RW_Thread_Mutex.h" +#include "Peer_Router.h" + +class Consumer_Router : public Peer_Router +{ + // = TITLE + // Provides the interface between one or more Consumers and the + // Event Server <ACE_Stream>. + // + // = DESCRIPTION + // This class normally sits on "top" of the Stream and routes + // messages coming from "downstream" to all the Consumers + // connected to it via its "read" <Task>. Normally, the messages + // flow up the stream from <Supplier_Router>s. However, if + // Consumers transmit data to the <Consumer_Router>, we dutifully + // push it out to the Suppliers via the <Supplier_Router>. + // + // When used on the "reader" side of a Stream, the + // <Consumer_Router> simply forwards all messages up the stream. + // When used on the "writer" side, the <Consumer_Router> queues + // up outgoing messages to suppliers and sends them down to the + // <Supplier_Router> in a separate thread. The reason for this + // is that it's really an "error" for a <Consumer_Router> to + // send messages to Suppliers, so we don't expect this to happen + // very much. When it does we use a separate thread to avoid + // taxing the main thread, which processes "normal" messages. + // + // All of the methods in this class except the constructor are + // called via base class pointers by the <ACE_Stream>. + // Therefore, we can put them in the protected section. +public: + Consumer_Router (Peer_Router_Context *prc); + // Initialization method. + +protected: + // = ACE_Task hooks. + virtual int open (void *a = 0); + // Called by the Stream to initialize the router. + + virtual int close (u_long flags = 0); + // Called by the Stream to shutdown the router. + + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + // Called by the <Peer_Handler> to pass a message to the + // <Consumer_Router>. The <Consumer_Router> queues up this message, + // which is then processed in the <svc> method in a separate thread. + + virtual int svc (void); + // Runs in a separate thread to dequeue messages and pass them up + // the stream. + + // = Dynamic linking hooks. + virtual int info (ACE_TCHAR **info_string, size_t length) const; + // Returns information about this service. +}; + +#endif /* _CONSUMER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc new file mode 100644 index 00000000000..f99e912ce04 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Event.mpc @@ -0,0 +1,15 @@ +// -*- MPC -*- +// $Id$ + +project(*Server) : aceexe { + avoids += ace_for_tao + exename = Event_Server + Source_Files { + Consumer_Router.cpp + Event_Analyzer.cpp + Options.cpp + Peer_Router.cpp + Supplier_Router.cpp + event_server.cpp + } +} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp new file mode 100644 index 00000000000..a064da6459a --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp @@ -0,0 +1,80 @@ +// $Id$ + +#include "ace/OS_NS_string.h" +#include "Options.h" +#include "Event_Analyzer.h" + +ACE_RCSID(Event_Server, Event_Analyzer, "$Id$") + +int +Event_Analyzer::open (void *) +{ + // No-op for now... + return 0; +} + +int +Event_Analyzer::close (u_long) +{ + // No-op for now... + return 0; +} + +int +Event_Analyzer::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd; + + switch (cmd = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + break; + } + return 0; +} + +int +Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) passing through Event_Analyser::put() (%s)\n"), + this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"))); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + this->control (mb); + + // Just pass the message along to the next Module in the stream... + return this->put_next (mb); +} + +int +Event_Analyzer::init (int, ACE_TCHAR *[]) +{ + // No-op for now. + return 0; +} + +int +Event_Analyzer::fini (void) +{ + // No-op for now. + return 0; +} + +int +Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const +{ + const ACE_TCHAR *mod_name = this->name (); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h new file mode 100644 index 00000000000..d4f88c8b68d --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h @@ -0,0 +1,44 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef _EVENT_ANALYZER_H +#define _EVENT_ANALYZER_H + +#include "ace/Stream.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Module.h" +#include "ace/Task.h" + +class Event_Analyzer : public ACE_Task<ACE_SYNCH> +{ + // = TITLE + // This class forwards all the <ACE_Message_Block>s it receives + // onto its neighboring Module in the Stream. + // + // = DESCRIPTION + // In a "real" event service, application-specific processing + // would be done in the <put> (or <svc>) method in this class. +public: + // = Initialization hooks called by <ACE_Stream> (not used). + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + + virtual int put (ACE_Message_Block *msg, + ACE_Time_Value * = 0); + // Entry point into this task. + + // Dynamic linking hooks (not used). + virtual int init (int argc, ACE_TCHAR *argv[]); + virtual int fini (void); + virtual int info (ACE_TCHAR **info_string, + size_t length) const; +private: + virtual int control (ACE_Message_Block *); + // Implements the watermark control processing. +}; + +#endif /* _EVENT_ANALYZER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am new file mode 100644 index 00000000000..aefe1873d9d --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Makefile.am @@ -0,0 +1,50 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.Event_Server.am + +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS = Event_Server + +Event_Server_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +Event_Server_SOURCES = \ + Consumer_Router.cpp \ + Event_Analyzer.cpp \ + Options.cpp \ + Peer_Router.cpp \ + Supplier_Router.cpp \ + event_server.cpp \ + Consumer_Router.h \ + Event_Analyzer.h \ + Options.h \ + Options.inl \ + Peer_Router.h \ + Supplier_Router.h + +Event_Server_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp new file mode 100644 index 00000000000..8683f48d153 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.cpp @@ -0,0 +1,208 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Thread.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_stdio.h" +#if defined (ACE_HAS_TRACE) +# include "ace/OS_NS_strings.h" +#endif /* ACE_HAS_TRACE */ + +#include "Options.h" + +ACE_RCSID(Event_Server, Options, "$Id$") + +/* static */ +Options *Options::instance_ = 0; + +Options * +Options::instance (void) +{ + if (Options::instance_ == 0) + Options::instance_ = new Options; + + return Options::instance_; +} + +Options::Options (void) + : thr_count_ (4), + t_flags_ (0), + high_water_mark_ (8 * 1024), + low_water_mark_ (1024), + message_size_ (128), + initial_queue_length_ (0), + iterations_ (100000), + debugging_ (0), + verbosity_ (0), + consumer_port_ (ACE_DEFAULT_SERVER_PORT), + supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1) +{ +} + +Options::~Options (void) +{ +} + +void Options::print_results (void) +{ +#if !defined (ACE_WIN32) + ACE_Profile_Timer::ACE_Elapsed_Time et; + + this->itimer_.elapsed_time (et); + + if (this->verbose ()) + { +#if defined (ACE_HAS_PRUSAGE_T) + ACE_Profile_Timer::Rusage rusage; + this->itimer_.get_rusage (rusage); + + ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ()); + ACE_OS::printf ("%8d = lwpid\n" + "%8d = lwp count\n" + "%8d = minor page faults\n" + "%8d = major page faults\n" + "%8d = input blocks\n" + "%8d = output blocks\n" + "%8d = messages sent\n" + "%8d = messages received\n" + "%8d = signals received\n" + "%8ds, %dms = wait-cpu (latency) time\n" + "%8ds, %dms = user lock wait sleep time\n" + "%8ds, %dms = all other sleep time\n" + "%8d = voluntary context switches\n" + "%8d = involuntary context switches\n" + "%8d = system calls\n" + "%8d = chars read/written\n", + (int) rusage.pr_lwpid, + (int) rusage.pr_count, + (int) rusage.pr_minf, + (int) rusage.pr_majf, + (int) rusage.pr_inblk, + (int) rusage.pr_oublk, + (int) rusage.pr_msnd, + (int) rusage.pr_mrcv, + (int) rusage.pr_sigs, + (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000, + (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000, + (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000, + (int) rusage.pr_vctx, + (int) rusage.pr_ictx, + (int) rusage.pr_sysc, + (int) rusage.pr_ioch); +#else + /* Someone needs to write the corresponding dump for rusage... */ +#endif /* ACE_HAS_PRUSAGE_T */ + } + + ACE_OS::printf ("---------------------\n" + "real time = %.3f\n" + "user time = %.3f\n" + "system time = %.3f\n" + "---------------------\n", + et.real_time, et.user_time, et.system_time); +#endif /* ACE_WIN32 */ +} + +void +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + //FUZZ: disable check_for_lack_ACE_OS + ACE_LOG_MSG->open (argv[0]); + + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("c:bdH:i:L:l:M:ns:t:T:v")); + int c; + + while ((c = get_opt ()) != EOF) + //FUZZ: enable check_for_lack_ACE_OS + switch (c) + { + case 'b': + this->t_flags (THR_BOUND); + break; + case 'c': + this->consumer_port (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'd': + this->debugging_ = 1; + break; + case 'H': + this->high_water_mark (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'i': + this->iterations (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'L': + this->low_water_mark (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'l': + this->initial_queue_length (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'M': + this->message_size (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'n': + this->t_flags (THR_NEW_LWP); + break; + case 's': + this->supplier_port (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'T': + #if defined (ACE_HAS_TRACE) + if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("ON")) == 0) + ACE_Trace::start_tracing (); + else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT ("OFF")) == 0) + ACE_Trace::stop_tracing (); + #endif /* ACE_HAS_TRACE */ + break; + case 't': + this->thr_count (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 'v': + this->verbosity_ = 1; + break; + default: + ACE_OS::fprintf (stderr, "%s\n" + "\t[-b] (THR_BOUND)\n" + "\t[-c consumer port]\n" + "\t[-d] (enable debugging)\n" + "\t[-H high water mark]\n" + "\t[-i number of test iterations]\n" + "\t[-L low water mark]\n" + "\t[-M] message size \n" + "\t[-n] (THR_NEW_LWP)\n" + "\t[-q max queue size]\n" + "\t[-s supplier port]\n" + "\t[-t number of threads]\n" + "\t[-v] (verbose) \n", + ACE_TEXT_ALWAYS_CHAR (argv[0])); + ACE_OS::exit (1); + /* NOTREACHED */ + break; + } + + // This is a major hack to get the size_t format spec to be a narrow + // char, same as the other strings for printf() here. It only works + // because this is the end of the source file. It makes the + // ACE_SIZE_T_FORMAT_SPECIFIER not use ACE_TEXT, effectively. +#undef ACE_TEXT +#define ACE_TEXT(A) A + if (this->verbose ()) + ACE_OS::printf ("%8d = initial concurrency hint\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n" + "%8d = THR_BOUND\n" + "%8d = THR_NEW_LWP\n", + ACE_Thread::getconcurrency (), + this->iterations (), + this->thr_count (), + this->low_water_mark (), + this->high_water_mark (), + this->message_size (), + this->initial_queue_length (), + (this->t_flags () & THR_BOUND) != 0, + (this->t_flags () & THR_NEW_LWP) != 0); +} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.h b/ACE/examples/ASX/Event_Server/Event_Server/Options.h new file mode 100644 index 00000000000..4c935434d26 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.h @@ -0,0 +1,122 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef OPTIONS_H +#define OPTIONS_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Profile_Timer.h" + +class Options +{ + // = TITLE + // Option Singleton for Event Server. +public: + static Options *instance (void); + // Singleton access point. + + void parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the command-line arguments and set the options. + + // = Timer management. + void stop_timer (void); + void start_timer (void); + + // = Set/get the number of threads. + void thr_count (size_t count); + size_t thr_count (void); + + // = Set/get the size of the queue. + void initial_queue_length (size_t length); + size_t initial_queue_length (void); + + // = Set/get the high water mark. + void high_water_mark (size_t size); + size_t high_water_mark (void); + + // = Set/get the high water mark. + void low_water_mark (size_t size); + size_t low_water_mark (void); + + // = Set/get the size of a message. + void message_size (size_t size); + size_t message_size (void); + + // = Set/get the number of iterations. + void iterations (size_t n); + size_t iterations (void); + + // Set/get threading flags. + void t_flags (long flag); + long t_flags (void); + + // Set/get supplier port number. + void supplier_port (u_short port); + u_short supplier_port (void); + + // Set/get consumer port number. + void consumer_port (u_short port); + u_short consumer_port (void); + + // Enabled if we're in debugging mode. + int debug (void); + + // Enabled if we're in verbose mode. + int verbose (void); + + // Print the results to the STDERR. + void print_results (void); + +private: + // = Ensure we're a Singleton. + Options (void); + ~Options (void); + + ACE_Profile_Timer itimer_; + // Time the process. + + size_t thr_count_; + // Number of threads to spawn. + + long t_flags_; + // Flags to <thr_create>. + + size_t high_water_mark_; + // ACE_Task high water mark. + + size_t low_water_mark_; + // ACE_Task low water mark. + + size_t message_size_; + // Size of a message. + + size_t initial_queue_length_; + // Initial number of items in the queue. + + size_t iterations_; + // Number of iterations to run the test program. + + int debugging_; + // Extra debugging info. + + int verbosity_; + // Extra verbose messages. + + u_short consumer_port_; + // Port that the Consumer_Router is using. + + u_short supplier_port_; + // Port that the Supplier_Router is using. + + static Options *instance_; + // Static Singleton. + +}; + +#include "Options.inl" +#endif /* OPTIONS_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Options.inl b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl new file mode 100644 index 00000000000..87ff395c503 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Options.inl @@ -0,0 +1,141 @@ +/* -*- C++ -*- */ +// $Id$ + +/* Option manager for ustreams */ + +// Since this is only included in Options.h these should stay +// inline, not ACE_INLINE. +// FUZZ: disable check_for_inline + +inline void +Options::supplier_port (u_short port) +{ + this->supplier_port_ = port; +} + +inline u_short +Options::supplier_port (void) +{ + return this->supplier_port_; +} + +inline void +Options::consumer_port (u_short port) +{ + this->consumer_port_ = port; +} + +inline u_short +Options::consumer_port (void) +{ + return this->consumer_port_; +} + +inline void +Options::start_timer (void) +{ + this->itimer_.start (); +} + +inline void +Options::stop_timer (void) +{ + this->itimer_.stop (); +} + +inline void +Options::thr_count (size_t count) +{ + this->thr_count_ = count; +} + +inline size_t +Options::thr_count (void) +{ + return this->thr_count_; +} + +inline void +Options::initial_queue_length (size_t length) +{ + this->initial_queue_length_ = length; +} + +inline size_t +Options::initial_queue_length (void) +{ + return this->initial_queue_length_; +} + +inline void +Options::high_water_mark (size_t size) +{ + this->high_water_mark_ = size; +} + +inline size_t +Options::high_water_mark (void) +{ + return this->high_water_mark_; +} + +inline void +Options::low_water_mark (size_t size) +{ + this->low_water_mark_ = size; +} + +inline size_t +Options::low_water_mark (void) +{ + return this->low_water_mark_; +} + +inline void +Options::message_size (size_t size) +{ + this->message_size_ = size; +} + +inline size_t +Options::message_size (void) +{ + return this->message_size_; +} + +inline void +Options::iterations (size_t n) +{ + this->iterations_ = n; +} + +inline size_t +Options::iterations (void) +{ + return this->iterations_; +} + +inline void +Options::t_flags (long flag) +{ + this->t_flags_ |= flag; +} + +inline long +Options::t_flags (void) +{ + return this->t_flags_; +} + +inline int +Options::debug (void) +{ + return this->debugging_; +} + +inline int +Options::verbose (void) +{ + return this->verbosity_; +} + diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp new file mode 100644 index 00000000000..cb82eec16df --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -0,0 +1,435 @@ +// $Id$ + +#if !defined (_PEER_ROUTER_C) +#define _PEER_ROUTER_C + +#include "ace/Service_Config.h" +#include "ace/Get_Opt.h" +#include "Options.h" +#include "Peer_Router.h" + +ACE_RCSID(Event_Server, Peer_Router, "$Id$") + +// Send the <ACE_Message_Block> to all the peers. Note that in a +// "real" application this logic would most likely be more selective, +// i.e., it would actually do "routing" based on addressing +// information passed in the <ACE_Message_Block>. + +int +Peer_Router_Context::send_peers (ACE_Message_Block *mb) +{ + PEER_ITERATOR map_iter = this->peer_map_; + int bytes = 0; + int iterations = 0; + + // Skip past the header and get the message to send. + ACE_Message_Block *data_block = mb->cont (); + + // Use an iterator to "multicast" the data to *all* the registered + // peers. Note that this doesn't really multicast, it just makes a + // "logical" copy of the <ACE_Message_Block> and enqueues it in the + // appropriate <Peer_Handler> corresponding to each peer. Note that + // a "real" application would probably "route" the data to a subset + // of connected peers here, rather than send it to all the peers. + + for (PEER_ENTRY *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) sending to peer via handle %d\n"), + ss->ext_id_)); + + iterations++; + + // Increment reference count before sending since the + // <Peer_Handler> might be running in its own thread of control. + bytes += ss->int_id_->put (data_block->duplicate ()); + } + + mb->release (); + return bytes == 0 ? 0 : bytes / iterations; +} + +// Remove the <Peer_Handler> from the peer connection map. + +int +Peer_Router_Context::unbind_peer (ROUTING_KEY key) +{ + return this->peer_map_.unbind (key); +} + +// Add the <Peer_Handler> to the peer connection map. + +int +Peer_Router_Context::bind_peer (ROUTING_KEY key, + Peer_Handler *peer_handler) +{ + return this->peer_map_.bind (key, peer_handler); +} + +void +Peer_Router_Context::duplicate (void) +{ + this->reference_count_++; +} + +void +Peer_Router_Context::release (void) +{ + ACE_ASSERT (this->reference_count_ > 0); + this->reference_count_--; + + if (this->reference_count_ == 0) + delete this; +} + +Peer_Router_Context::Peer_Router_Context (u_short port) + : reference_count_ (0) +{ + // Initialize the Acceptor's "listen-mode" socket. + ACE_INET_Addr endpoint (port); + if (this->open (endpoint) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Acceptor::open"))); + + // Initialize the connection map. + else if (this->peer_map_.open () == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Map_Manager::open"))); + else + { + ACE_INET_Addr addr; + + if (this->acceptor ().get_local_addr (addr) != -1) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) initializing %C on port = %d, handle = %d, this = %u\n"), + addr.get_port_number () == Options::instance ()->supplier_port () + ? "Supplier_Handler" : "Consumer_Handler", + addr.get_port_number (), + this->acceptor().get_handle (), + this)); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr"))); + } +} + +Peer_Router_Context::~Peer_Router_Context (void) +{ + // Free up the handle and close down the listening socket. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) closing down Peer_Router_Context\n"))); + + // Close down the Acceptor and take ourselves out of the Reactor. + this->handle_close (); + + PEER_ITERATOR map_iter = this->peer_map_; + + // Make sure to take all the handles out of the map to avoid + // "resource leaks." + + for (PEER_ENTRY *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) closing down peer on handle %d\n"), + ss->ext_id_)); + + if (ACE_Reactor::instance ()->remove_handler + (ss->ext_id_, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) p\n"), + ACE_TEXT ("remove_handle"))); + } + + // Close down the map. + this->peer_map_.close (); +} + +Peer_Router * +Peer_Router_Context::peer_router (void) +{ + return this->peer_router_; +} + +void +Peer_Router_Context::peer_router (Peer_Router *pr) +{ + this->peer_router_ = pr; +} + +// Factory Method that creates a new <Peer_Handler> for each +// connection. + +int +Peer_Router_Context::make_svc_handler (Peer_Handler *&sh) +{ + ACE_NEW_RETURN (sh, + Peer_Handler (this), + -1); + return 0; +} + +Peer_Handler::Peer_Handler (Peer_Router_Context *prc) + : peer_router_context_ (prc) +{ +} + +// Send output to a peer. Note that this implementation "blocks" if +// flow control occurs. This is undesirable for "real" applications. +// The best way around this is to make the <Peer_Handler> an Active +// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway +// application. + +int +Peer_Handler::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ +#if 0 + // If we're running as Active Objects just enqueue the message here. + return this->putq (mb, tv); +#else + ACE_UNUSED_ARG (tv); + + int result = this->peer ().send_n (mb->rd_ptr (), + mb->length ()); + // Release the memory. + mb->release (); + + return result; +#endif /* 0 */ +} + +// Initialize a newly connected handler. + +int +Peer_Handler::open (void *) +{ + ACE_TCHAR buf[BUFSIZ], *p = buf; + + if (this->peer_router_context_->peer_router ()->info (&p, + sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) creating handler for %s, handle = %d\n"), + buf, + this->get_handle ())); + else + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("info")), + -1); +#if 0 + // If we're running as an Active Object activate the Peer_Handler + // here. + if (this->activate (Options::instance ()->t_flags ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("activation of thread failed")), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Peer_Handler::open registering with Reactor for handle_input\n"))); +#else + + // Register with the Reactor to receive messages from our Peer. + if (ACE_Reactor::instance ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler")), + -1); +#endif /* 0 */ + + // Insert outselves into the routing map. + else if (this->peer_router_context_->bind_peer (this->get_handle (), + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("bind_peer")), + -1); + else + return 0; +} + +// Receive a message from a Peer. + +int +Peer_Handler::handle_input (ACE_HANDLE h) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) input arrived on handle %d\n"), + h)); + + ACE_Message_Block *db; + + ACE_NEW_RETURN (db, ACE_Message_Block (BUFSIZ), -1); + + ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY), + ACE_Message_Block::MB_PROTO, db); + // Check for memory failures. + if (hb == 0) + { + db->release (); + errno = ENOMEM; + return -1; + } + + ssize_t n = this->peer ().recv (db->rd_ptr (), + db->size ()); + + if (n == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p"), + ACE_TEXT ("recv failed")), + -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p"), + ACE_TEXT ("unbind failed")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) shutting down handle %d\n"), h)); + // Instruct the <ACE_Reactor> to deregister us by returning -1. + return -1; + } + else + { + // Transform incoming buffer into an <ACE_Message_Block>. + + // First, increment the write pointer to the end of the newly + // read data block. + db->wr_ptr (n); + + // Second, copy the "address" into the header block. Note that + // for this implementation the HANDLE we receive the message on + // is considered the "address." A "real" application would want + // to do something more sophisticated. + *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); + + // Third, update the write pointer in the header block. + hb->wr_ptr (sizeof (ACE_HANDLE)); + + // Finally, pass the message through the stream. Note that we + // use <Task::put> here because this gives the method at *our* + // level in the stream a chance to do something with the message + // before it is sent up the other side. For instance, if we + // receive messages in the <Supplier_Router>, it will just call + // <put_next> and send them up the stream to the + // <Consumer_Router> (which broadcasts them to consumers). + // However, if we receive messages in the <Consumer_Router>, it + // could reply to the Consumer with an error since it's not + // correct for Consumers to send messages (we don't do this in + // the current implementation, but it could be done in a "real" + // application). + + if (this->peer_router_context_->peer_router ()->put (hb) == -1) + return -1; + else + return 0; + } +} + +Peer_Router::Peer_Router (Peer_Router_Context *prc) + : peer_router_context_ (prc) +{ +} + +Peer_Router_Context * +Peer_Router::context (void) const +{ + return this->peer_router_context_; +} + +int +Peer_Router::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; + + switch (command = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + return -1; + } + return 0; +} + +#if 0 + +// Right now, Peer_Handlers are purely Reactive, i.e., they all run in +// a single thread of control. It would be easy to make them Active +// Objects by calling activate() in Peer_Handler::open(), making +// Peer_Handler::put() enqueue each message on the message queue, and +// (3) then running the following svc() routine to route each message +// to its final destination within a separate thread. Note that we'd +// want to move the svc() call up to the Consumer_Router and +// Supplier_Router level in order to get the right level of control +// for input and output. + +Peer_Handler::svc (void) +{ + ACE_Message_Block *db, *hb; + + // Do an endless loop + for (;;) + { + db = new Message_Block (BUFSIZ); + hb = new Message_Block (sizeof (ROUTING_KEY), + Message_Block::MB_PROTO, + db); + + ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ()); + + if (n == -1) + LM_ERROR_RETURN ((LOG_ERROR, + ACE_TEXT ("%p"), + ACE_TEXT ("recv failed")), + -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1) + LM_ERROR_RETURN ((LOG_ERROR, + ACE_TEXT ("%p"), + ACE_TEXT ("unbind failed")), + -1); + LM_DEBUG ((LOG_DEBUG, + ACE_TEXT ("(%t) shutting down \n"))); + + // We do not need to be deregistered by reactor + // as we were not registered at all. + return -1; + } + else + { + // Transform incoming buffer into a Message. + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. + hb->wr_ptr (sizeof (long)); + + // Pass the message to the stream. + if (this->peer_router_context_->peer_router ()->reply (hb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Peer_Handler.svc : peer_router->reply failed")), + -1); + } + } + return 0; +} +#endif /* 0 */ +#endif /* _PEER_ROUTER_C */ + diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h new file mode 100644 index 00000000000..044ef07ea07 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Peer_Router.h @@ -0,0 +1,158 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef _PEER_ROUTER_H +#define _PEER_ROUTER_H + +#include "ace/Acceptor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SOCK_Acceptor.h" +#include "ace/Svc_Handler.h" +#include "ace/Map_Manager.h" +#include "ace/RW_Thread_Mutex.h" + +// Type of search key for CONSUMER_MAP +typedef ACE_HANDLE ROUTING_KEY; + +// Forward declarations. +class Peer_Router; +class Peer_Router_Context; + +class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +{ + // = TITLE + // Receive input from a Peer and forward to the appropriate + // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>). +public: + Peer_Handler (Peer_Router_Context * = 0); + // Initialization method. + + virtual int open (void * = 0); + // Called by the ACE_Acceptor::handle_input() to activate this + // object. + + virtual int handle_input (ACE_HANDLE); + // Receive input from a peer. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send output to a peer. Note that this implementation "blocks" if + // flow control occurs. This is undesirable for "real" + // applications. The best way around this is to make the + // <Peer_Handler> an Active Object, e.g., as done in the + // $ACE_ROOT/apps/Gateway/Gateway application. + +protected: + Peer_Router_Context *peer_router_context_; + // Pointer to router context. This maintains the state that is + // shared by both Tasks in a <Peer_Router> Module. +}; + +class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> +{ + // = TITLE + // Defines state and behavior shared between both Tasks in a + // <Peer_Router> Module. + // + // = DESCRIPTION + // This class also serves as an <ACE_Acceptor>, which creates + // <Peer_Handlers> when Peers connect. +public: + // = Initialization and termination methods. + Peer_Router_Context (u_short port); + // Constructor. + + virtual int unbind_peer (ROUTING_KEY); + // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds + // to the <ROUTING_KEY>. + + virtual int bind_peer (ROUTING_KEY, Peer_Handler *); + // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the + // <ROUTING_KEY>. + + int send_peers (ACE_Message_Block *mb); + // Send the <ACE_Message_Block> to all the peers. Note that in a + // "real" application this logic would most likely be more + // selective, i.e., it would actually do "routing" based on + // addressing information passed in the <ACE_Message_Block>. + + int make_svc_handler (Peer_Handler *&sh); + // Factory Method that creates a new <Peer_Handler> for each + // connection. This method overrides the default behavior in + // <ACE_Acceptor>. + + // = Set/Get Router Task. + Peer_Router *peer_router (void); + void peer_router (Peer_Router *); + + void release (void); + // Decrement the reference count and delete <this> when count == 0; + + void duplicate (void); + // Increment the reference count. + +private: + Peer_Router *peer_router_; + // Pointer to the <Peer_Router> that we are accepting for. + + // = Useful typedefs. + typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> + PEER_MAP; + typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> + PEER_ITERATOR; + typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> + PEER_ENTRY; + + PEER_MAP peer_map_; + // Map used to keep track of active peers. + + int reference_count_; + // Keep track of when we can delete ourselves. + + ~Peer_Router_Context (void); + // Private to ensure dynamic allocation. + + friend class Friend_Of_Peer_Router_Context; + // Declare a friend class to avoid compiler warnings because the + // destructor is private. +}; + +class Peer_Router : public ACE_Task<ACE_SYNCH> +{ + // = TITLE + // This abstract base class provides mechanisms for routing + // messages to/from a <ACE_Stream> from/to one or more peers (which + // are typically running on remote hosts). + // + // = DESCRIPTION + // Subclasses of <Peer_Router> (such as <Consumer_Router> or + // <Supplier_Router>) override the <open>, <close>, and + // <put> methods to specialize the behavior of the router to + // meet application-specific requirements. +protected: + Peer_Router (Peer_Router_Context *prc); + // Initialization method. + + virtual int control (ACE_Message_Block *); + // Handle control messages arriving from adjacent Modules. + + Peer_Router_Context *context (void) const; + // Returns the routing context. + + typedef ACE_Task<ACE_SYNCH> inherited; + // Helpful typedef. + +private: + Peer_Router_Context *peer_router_context_; + // Reference to the context shared by the writer and reader Tasks, + // e.g., in the <Consumer_Router> and <Supplier_Router> Modules. + + // = Prevent copies and pass-by-value. + Peer_Router (const Peer_Router &); + void operator= (const Peer_Router &); +}; + +#endif /* _PEER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp new file mode 100644 index 00000000000..72c43ee6312 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp @@ -0,0 +1,165 @@ +// $Id$ + +#include "ace/os_include/os_assert.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "Supplier_Router.h" +#include "Options.h" + +ACE_RCSID(Event_Server, Supplier_Router, "$Id$") + +// Handle outgoing messages in a separate thread. + +int +Supplier_Router::svc (void) +{ + assert (this->is_writer ()); + + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n")); + + for (ACE_Message_Block *mb = 0; + this->getq (mb) >= 0; + ) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) warning: Supplier_Router is " + "forwarding a message via send_peers\n")); + + // Broadcast the message to the Suppliers, even though this is + // "incorrect" (assuming a oneway flow of events from Suppliers + // to Consumers)! + + if (this->context ()->send_peers (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) send_peers failed in Supplier_Router\n"), + -1); + } + + ACE_DEBUG ((LM_DEBUG, + "(%t) stopping svc in Supplier_Router\n")); + return 0; +} + +Supplier_Router::Supplier_Router (Peer_Router_Context *prc) + : Peer_Router (prc) +{ + // Increment the reference count. + this->context ()->duplicate (); +} + +// Initialize the Supplier Router. + +int +Supplier_Router::open (void *) +{ + if (this->is_reader ()) + { + // Set the <Peer_Router_Context> to point back to us so that all + // the Peer_Handler's <put> their incoming <Message_Blocks> to + // our reader Task. + this->context ()->peer_router (this); + return 0; + } + + else // if (this->is_writer () + { + // Increment the reference count. + this->context ()->duplicate (); + + // Make this an active object to handle the error cases in a + // separate thread. + return this->activate (Options::instance ()->t_flags ()); + } +} + +// Close down the router. + +int +Supplier_Router::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) closing Supplier_Router %s\n", + this->is_reader () ? "reader" : "writer")); + + if (this->is_writer ()) + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + + // Both writer and reader call release(), so the context knows when + // to clean itself up. + this->context ()->release (); + return 0; +} + +// Send an <ACE_Message_Block> to the supplier(s). + +int +Supplier_Router::put (ACE_Message_Block *mb, + ACE_Time_Value *) +{ + // Perform the necessary control operations before passing + // the message up the stream. + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + + // If we're the reader then we are responsible for pass messages up + // to the next Module's reader Task. Note that in a "real" + // application this is likely where we'd take a look a the actual + // information that was in the message, e.g., in order to figure out + // what operation it was and what it's "parameters" where, etc. + else if (this->is_reader ()) + return this->put_next (mb); + + else // if (this->is_writer ()) + { + // Someone is trying to write to the Supplier. In this + // implementation this is considered an "error." However, we'll + // just go ahead and forward the message to the Supplier (who + // hopefully is prepared to receive it). + ACE_DEBUG ((LM_WARNING, + "(%t) warning: sending to a Supplier\n")); + + // Queue up the message to processed by <Supplier_Router::svc>. + // Since we don't expect to be getting many of these messages, + // we queue them up and run them in a separate thread to avoid + // taxing the main thread. + return this->putq (mb); + } +} + +// Return information about the <Supplier_Router>. +#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR) +# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n") +#else +# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n") +#endif /* ACE_WIN32 || !ACE_USES_WCHAR */ + +int +Supplier_Router::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_INET_Addr addr; + const ACE_TCHAR *mod_name = this->name (); + + if (this->context ()->acceptor ().get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, + FMTSTR, + mod_name, + addr.get_port_number (), + ACE_TEXT ("tcp"), + ACE_TEXT ("# supplier router"), + this->is_reader () ? + ACE_TEXT ("reader") : ACE_TEXT ("writer")); + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + + return ACE_OS::strlen (mod_name); +} diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h new file mode 100644 index 00000000000..8a42943c147 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.h @@ -0,0 +1,72 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef _SUPPLIER_ROUTER_H +#define _SUPPLIER_ROUTER_H + +#include "ace/INET_Addr.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SOCK_Acceptor.h" +#include "ace/Map_Manager.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +class Supplier_Router : public Peer_Router +{ + // = TITLE + // Provides the interface between one or more Suppliers and the + // Event Server ACE_Stream. + // + // = DESCRIPTION + // This class normally sits on "bottom" of the Stream and sends + // all messages coming from Suppliers via its "write" <Task> + // "upstream" to all the Consumers connected to the + // <Consumer_Router>. Normally, the messages flow up the + // stream to <Consumer_Router>s. However, if Consumers + // transmit data to the <Consumer_Router>, we dutifully push it + // out to the Suppliers via the <Supplier_Router>. + // + // When used on the "reader" side of a Stream, the + // <Supplier_Router> simply forwards all messages up the stream. + // When used on the "writer" side, the <Supplier_Router> queues + // up outgoing messages to suppliers and sends them in a + // separate thread. The reason for this is that it's really an + // "error" for a <Supplier_Router> to send messages to + // Suppliers, so we don't expect this to happen very much. When + // it does we use a separate thread to avoid taxing the main + // thread, which processes "normal" messages. + // + // All of these methods are called via base class pointers by + // the <ACE_Stream> apparatus. Therefore, we can put them in + // the protected section. +public: + Supplier_Router (Peer_Router_Context *prc); + // Initialization method. + +protected: + // = ACE_Task hooks. + + virtual int open (void *a = 0); + // Called by the Stream to initialize the router. + + virtual int close (u_long flags = 0); + // Called by the Stream to shutdown the router. + + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + // Called by the <SUPPLIER_HANDLER> to pass a message to the Router. + // The Router queues up this message, which is then processed in the + // <svc> method in a separate thread. + + virtual int svc (void); + // Runs in a separate thread to dequeue messages and pass them up + // the stream. + + virtual int info (ACE_TCHAR **info_string, size_t length) const; + // Dynamic linking hook. +}; + +#endif /* _SUPPLIER_ROUTER_H */ diff --git a/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp new file mode 100644 index 00000000000..3858bad1fc2 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Event_Server/event_server.cpp @@ -0,0 +1,258 @@ +// $Id$ + +// Main driver program for the event server example. + +#include "ace/OS_main.h" +#include "ace/Service_Config.h" +#include "ace/OS_NS_unistd.h" +#include "Options.h" +#include "Consumer_Router.h" +#include "Event_Analyzer.h" +#include "Supplier_Router.h" +#include "ace/Sig_Adapter.h" +#include "ace/Stream.h" + +ACE_RCSID (Event_Server, + event_server, + "$Id$") + +// Typedef these components to handle multi-threading correctly. +typedef ACE_Stream<ACE_SYNCH> MT_Stream; +typedef ACE_Module<ACE_SYNCH> MT_Module; + + +class Event_Server : public ACE_Sig_Adapter +{ + // = TITLE + // Run the logic for the <Event_Server>. + + // + // = DESCRIPTION + // In addition to packaging the <Event_Server> components, this + // class also handles SIGINT and terminate the entire + // application process. There are several ways to terminate + // this application process: + // + // 1. Send a SIGINT signal (e.g., via ^C) + // 2. Type any character on the STDIN. + // + // Note that by inheriting from the <ACE_Sig_Adapter> we can + // shutdown the <ACE_Reactor> cleanly when a SIGINT is + // generated. +public: + Event_Server (void); + // Constructor. + + int svc (void); + // Run the event-loop for the event server. + +private: + virtual int handle_input (ACE_HANDLE handle); + // Hook method called back when a user types something into the + // STDIN in order to shut down the program. + + int configure_stream (void); + // Setup the plumbing in the stream. + + int set_watermarks (void); + // Set the high and low queue watermarks. + + int run_event_loop (void); + // Run the event-loop for the <Event_Server>. + + MT_Stream event_server_; + // The <ACE_Stream> that contains the <Event_Server> application + // <Modules>. +}; + +Event_Server::Event_Server (void) + : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop)) + // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is + // received. +{ + // Register to trap STDIN from the user. + if (ACE_Event_Handler::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_stdin_handler"))); + // Register to trap the SIGINT signal. + else if (ACE_Reactor::instance ()->register_handler + (SIGINT, this) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler"))); +} + +int +Event_Server::handle_input (ACE_HANDLE) +{ + // This code here will make sure we actually wait for the user to + // type something. On platforms like Win32, <handle_input> is called + // prematurely (even when there is no data). + char temp_buffer [BUFSIZ]; + + ssize_t n = ACE_OS::read (ACE_STDIN, + temp_buffer, + sizeof (temp_buffer)); + // This ought to be > 0, otherwise something very strange has + // happened!! + ACE_ASSERT (n > 0); + ACE_UNUSED_ARG (n); // To avoid compile warning with ACE_NDEBUG. + + Options::instance ()->stop_timer (); + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("(%t) closing down the test\n"))); + Options::instance ()->print_results (); + + ACE_Reactor::instance ()->end_reactor_event_loop (); + return -1; +} + +int +Event_Server::configure_stream (void) +{ + Peer_Router_Context *src; + // Create the <Supplier_Router>'s routing context. This contains a + // context shared by both the write-side and read-side of the + // <Supplier_Router> Module. + ACE_NEW_RETURN (src, + Peer_Router_Context (Options::instance ()->supplier_port ()), + -1); + + MT_Module *srm = 0; + // Create the <Supplier_Router> module. + ACE_NEW_RETURN (srm, + MT_Module + (ACE_TEXT ("Supplier_Router"), + new Supplier_Router (src), + new Supplier_Router (src)), + -1); + + MT_Module *eam = 0; + // Create the <Event_Analyzer> module. + ACE_NEW_RETURN (eam, + MT_Module + (ACE_TEXT ("Event_Analyzer"), + new Event_Analyzer, + new Event_Analyzer), + -1); + + Peer_Router_Context *crc; + // Create the <Consumer_Router>'s routing context. This contains a + // context shared by both the write-side and read-side of the + // <Consumer_Router> Module. + ACE_NEW_RETURN (crc, + Peer_Router_Context (Options::instance ()->consumer_port ()), + -1); + + MT_Module *crm = 0; + // Create the <Consumer_Router> module. + ACE_NEW_RETURN (crm, + MT_Module + (ACE_TEXT ("Consumer_Router"), + new Consumer_Router (crc), + new Consumer_Router (crc)), + -1); + + // Push the Modules onto the event_server stream. + + if (this->event_server_.push (srm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Supplier_Router)")), + -1); + else if (this->event_server_.push (eam) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Event_Analyzer)")), + -1); + else if (this->event_server_.push (crm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Consumer_Router)")), + -1); + return 0; +} + +int +Event_Server::set_watermarks (void) +{ + // Set the high and low water marks appropriately. The water marks + // control how much data can be buffered before the queues are + // considered "full." + size_t wm = Options::instance ()->low_water_mark (); + + if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM, + &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (setting low watermark)")), + -1); + + wm = Options::instance ()->high_water_mark (); + if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM, + &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (setting high watermark)")), + -1); + return 0; +} + +int +Event_Server::run_event_loop (void) +{ + // Begin the timer. + Options::instance ()->start_timer (); + + // Perform the main event loop waiting for the user to type ^C or to + // enter a line on the ACE_STDIN. + + ACE_Reactor::instance ()->run_reactor_event_loop (); + + // Close down the stream and call the <close> hooks on all the + // <ACE_Task>s in the various Modules in the Stream. + this->event_server_.close (); + + // Wait for the threads in the <Consumer_Router> and + // <Supplier_Router> to exit. + return ACE_Thread_Manager::instance ()->wait (); +} + +int +Event_Server::svc (void) +{ + if (this->configure_stream () == -1) + return -1; + else if (this->set_watermarks () == -1) + return -1; + else if (this->run_event_loop () == -1) + return -1; + else + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ +#if defined (ACE_HAS_THREADS) + Options::instance ()->parse_args (argc, argv); + + // Initialize the <Event_Server>. + Event_Server event_server; + + // Run the event server's event-loop. + int result = event_server.svc (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("exiting main\n"))); + + return result; +#else + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("threads not supported on this platform\n")), + 1); +#endif /* ACE_HAS_THREADS */ +} diff --git a/ACE/examples/ASX/Event_Server/Makefile.am b/ACE/examples/ASX/Event_Server/Makefile.am new file mode 100644 index 00000000000..f00f4c3dd09 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Makefile.am @@ -0,0 +1,14 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +SUBDIRS = \ + Event_Server \ + Transceiver + diff --git a/ACE/examples/ASX/Event_Server/README b/ACE/examples/ASX/Event_Server/README new file mode 100644 index 00000000000..262b7ee9633 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/README @@ -0,0 +1,79 @@ +This subdirectory illustrates a number of the ACE ASX framework +features using an ACE_Stream application called the Event Server. For +more information on the design and use of the ACE ASX framework please +see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and +http://www.cs.wustl.edu/~schmidt/ACE-concurrency.ps.gz. For more +information on the Event Server, please see +http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz. + +The Event Server example works as follows: + +1. When the ./Event_Server/event_server executable is run it + creates two SOCK_Acceptors, which listen for and accept incoming + connections from Consumers and Suppliers. + +2. The ./Event_Server/Transceiver/transceiver application plays + the role of either a Consumer or a Supplier (with the current + implementation it can only play one role at a time). The + transceiver process can be started multiple times. Each call + should be either: + + # Consumer + % transceiver -p 10002 -h hostname -C + + or + + # Supplier + % transceiver -p 10003 -h hostname -S + + where 10002 and 10003 are the default Consumer listening port and + the Supplier listening port, respectively, on the event server, + "hostname" is the name of the machine the event_server is running, + and -C and -S indicate that the transceiver plays the role of a + Consumer or Supplier, respectively. I typically run the + Consumer(s) and Supplier(s) in different windows to make it easier + to understand the output. + +3. Once the Consumer(s) and Supplier(s) are connected, you can + type data from any Supplier window. This data will be routed + through the Modules/Tasks in the event_server's Stream and be + forwarded to the Consumer(s). + + Since the transceivers are full-duplex you can also send messages + from the Consumer(s) to Supplier(s). However, the Event Server will + warn you about this since it's not really kosher to have Consumers + sending to Suppliers... + +4. When you want to shut down the tranceivers or event server + just type ^C (which generates a SIGINT) or type any input in the + window running the Event Server application. + +What makes this example particularly interesting is that once you've +got the hang of the ASX Streams architecture, you can "push" new +filtering Modules onto the event_server Stream and modify the +application's behavior transparently to the other components. + +There are a bunch of features that aren't implemented in this +prototype that you'd probably want to do for a "real" application. +Some of the more interesting things to add would be: + +0. Complete "full-duplex" support, i.e., Peers could play the + role of Suppliers and Consumers simultaneously. + +1. Support for "commands", which would change the behavior + of the Event_Server based on messages it got from Suppliers + (or Consumers). + +3. Support for "pull" operations, as well as "push" operations. + This would basically involve adding a "MIB Module" to get/set + the "values" associated with "names" passed in by Peers. This + could probably replace the Event_Analysis Module. + +4. Filtering and correlation (this should probably be done + via a separate Module that handles filtering and correlation). + +5. More flexible concurrency model(s), e.g., "Active Object per-Consumer". + This would enable the Event Server process to handle flow control + more gracefully than it does not (it currently "hangs," which isn't + desirable). + diff --git a/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am b/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am new file mode 100644 index 00000000000..f4ba3b5bf56 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Transceiver/Makefile.am @@ -0,0 +1,35 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.Transceiver.am + +noinst_PROGRAMS = Transceiver + +Transceiver_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +Transceiver_SOURCES = \ + transceiver.cpp \ + transceiver.h + +Transceiver_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc b/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc new file mode 100644 index 00000000000..a6a3309727c --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Transceiver/Transceiver.mpc @@ -0,0 +1,9 @@ +// -*- MPC -*- +// $Id$ + +project(*) : aceexe { + exename = Transceiver + Source_Files { + transceiver.cpp + } +} diff --git a/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp new file mode 100644 index 00000000000..37bbaad7d3d --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.cpp @@ -0,0 +1,238 @@ +// $Id$ + +// Test program for the event transceiver. This program can play the +// role of either Consumer or Supplier. You can terminate this +// program by typing ^C.... + +#include "ace/OS_main.h" +#include "ace/OS_NS_string.h" +#include "ace/Service_Config.h" +#include "ace/SOCK_Connector.h" +#include "ace/Connector.h" +#include "ace/Get_Opt.h" +#include "ace/Signal.h" +#include "ace/OS_NS_unistd.h" + +#include "transceiver.h" + +ACE_RCSID (Transceiver, + transceiver, + "$Id$") + +// Handle the command-line arguments. + +int +Event_Transceiver::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("Ch:p:S")); + + this->port_number_ = ACE_DEFAULT_SERVER_PORT; + this->host_name_ = ACE_DEFAULT_SERVER_HOST; + this->role_ = ACE_TEXT ("Supplier"); + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'C': + this->role_ = ACE_TEXT ("Consumer"); + break; + case 'h': + this->host_name_ = get_opt.opt_arg (); + break; + case 'p': + this->port_number_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'S': + this->role_ = ACE_TEXT ("Supplier"); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("usage: %n [-CS] [-h host_name] [-p portnum] \n")), + -1); + /* NOTREACHED */ + break; + } + + // Increment by 1 if we're the supplier to mirror the default + // behavior of the Event_Server (which sets the Consumer port to + // ACE_DEFAULT_SERVER_PORT and the Supplier port to + // ACE_DEFAULT_SERVER_PORT + 1). Note that this is kind of a + // hack... + if (ACE_OS::strcmp (this->role_, ACE_TEXT ("Supplier")) == 0 + && this->port_number_ == ACE_DEFAULT_SERVER_PORT) + this->port_number_++; + return 0; +} + +int +Event_Transceiver::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + ACE_Reactor::instance ()->end_reactor_event_loop (); + return 0; +} + +// Close down via SIGINT or SIGQUIT. + +int +Event_Transceiver::handle_signal (int, + siginfo_t *, + ucontext_t *) +{ + ACE_Reactor::instance ()->end_reactor_event_loop (); + return 0; +} + +Event_Transceiver::Event_Transceiver (void) +{ +} + +Event_Transceiver::Event_Transceiver (int argc, ACE_TCHAR *argv[]) +{ + if (this->parse_args (argc, argv) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("parse_args"))); + else + { + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + // Register to handle the SIGINT and SIGQUIT signals. + if (ACE_Reactor::instance ()->register_handler + (sig_set, + this) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler"))); + + // We need to register <this> here before we're connected since + // otherwise <get_handle> will return the connection socket + // handle for the peer. + else if (ACE_Event_Handler::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_stdin_handler"))); + + // Address of the server. + ACE_INET_Addr server_addr (this->port_number_, + this->host_name_); + + ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector; + + // We need a pointer here because connect takes a reference to a + // pointer! + Event_Transceiver *etp = this; + + // Establish the connection to the Event Server. + if (connector.connect (etp, + server_addr) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + this->host_name_)); + ACE_Reactor::instance()->remove_handler (sig_set); + ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance(), + ACE_Thread_Manager::instance()); + } + } +} + +int +Event_Transceiver::open (void *) +{ + // Register ourselves to be notified when there's data to read on + // the socket. + if (ACE_Reactor::instance ()->register_handler + (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler")), + -1); + return 0; +} + +int +Event_Transceiver::handle_input (ACE_HANDLE handle) +{ + // Determine whether we play the role of a consumer or a supplier. + if (handle == ACE_STDIN) + return this->transmitter (); + else + return this->receiver (); +} + +int +Event_Transceiver::transmitter (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) entering %s transmitter\n"), + this->role_)); + + char buf[BUFSIZ]; + ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); + int result = 0; + + if (n <= 0 || this->peer ().send_n (buf, n) != n) + result = -1; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) leaving %s transmitter\n"), + this->role_)); + return result; +} + +int +Event_Transceiver::receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) entering %s receiver\n"), + this->role_)); + + char buf[BUFSIZ]; + + ssize_t n = this->peer ().recv (buf, sizeof buf); + int result = 0; + + if (n <= 0 + || ACE_OS::write (ACE_STDOUT, buf, n) != n) + result = -1; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) leaving %s receiver\n"), + this->role_)); + return result; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (ACE_Service_Config::open (argv[0]) == -1 + && errno != ENOENT) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + -1); + + // Create and initialize the transceiver. + Event_Transceiver transceiver (argc, argv); + + // Demonstrate how we can check if a constructor failed... + if (ACE_LOG_MSG->op_status () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Event_Transceiver constructor failed")), + -1); + + + // Run event loop until either the event server shuts down or we get + // a SIGINT. + ACE_Reactor::instance ()->run_reactor_event_loop (); + return 0; +} + diff --git a/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h new file mode 100644 index 00000000000..864b88a0b48 --- /dev/null +++ b/ACE/examples/ASX/Event_Server/Transceiver/transceiver.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef ACE_TRANSCEIVER_H +#define ACE_TRANSCEIVER_H + +#include "ace/SOCK_Stream.h" +#include "ace/Svc_Handler.h" + +class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ + // = TITLE + // Generate and receives messages from the event server. + // + // = DESCRIPTION + // This class is both a consumer and supplier of events, i.e., + // it's a ``transceiver.'' +public: + // = Initialization method. + Event_Transceiver (int argc, ACE_TCHAR *argv[]); + // Performs the actual initialization. + + Event_Transceiver (void); + // No-op constructor (required by the <ACE_Connector>). + + // = Svc_Handler hook called by the <ACE_Connector>. + virtual int open (void *); + // Initialize the transceiver when we are connected. + + // = Demultplexing hooks from the <ACE_Reactor>. + virtual int handle_input (ACE_HANDLE); + // Receive data from STDIN or socket. + + virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); + // Close down via SIGINT. + + virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + // Close down the event loop. + +private: + int receiver (void); + // Reads data from socket and writes to ACE_STDOUT. + + int transmitter (void); + // Writes data from ACE_STDIN to socket. + + int parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the command-line arguments. + + u_short port_number_; + // Port number of event server. + + const ACE_TCHAR *host_name_; + // Name of event server. + + const ACE_TCHAR *role_; + // Are we playing the Consumer or Supplier role? +}; + +#endif /* ACE_TRANSCEIVER_H */ diff --git a/ACE/examples/ASX/Makefile.am b/ACE/examples/ASX/Makefile.am new file mode 100644 index 00000000000..22df464dcfc --- /dev/null +++ b/ACE/examples/ASX/Makefile.am @@ -0,0 +1,16 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +SUBDIRS = \ + CCM_App \ + Event_Server \ + Message_Queue \ + UPIPE_Event_Server + diff --git a/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc b/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc new file mode 100644 index 00000000000..8c1853bcee7 --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/ASX_Message_Queue.mpc @@ -0,0 +1,25 @@ +// -*- MPC -*- +// $Id$ + +project(*Bounded_Buffer) : aceexe { + exename = bounded_buffer + Source_Files { + bounded_buffer.cpp + } +} + +project(*Buffer_Stream) : aceexe { + avoids += uses_wchar + exename = buffer_stream + Source_Files { + buffer_stream.cpp + } +} + +project(*Priority_Buffer) : aceexe { + exename = priority_buffer + Source_Files { + priority_buffer.cpp + } +} + diff --git a/ACE/examples/ASX/Message_Queue/Makefile.am b/ACE/examples/ASX/Message_Queue/Makefile.am new file mode 100644 index 00000000000..3305b17bf1f --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/Makefile.am @@ -0,0 +1,67 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.ASX_Message_Queue_Bounded_Buffer.am + +noinst_PROGRAMS = bounded_buffer + +bounded_buffer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +bounded_buffer_SOURCES = \ + bounded_buffer.cpp + +bounded_buffer_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.ASX_Message_Queue_Buffer_Stream.am + +if !BUILD_USES_WCHAR + +noinst_PROGRAMS += buffer_stream + +buffer_stream_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +buffer_stream_SOURCES = \ + buffer_stream.cpp + +buffer_stream_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_USES_WCHAR + +## Makefile.ASX_Message_Queue_Priority_Buffer.am + +noinst_PROGRAMS += priority_buffer + +priority_buffer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +priority_buffer_SOURCES = \ + priority_buffer.cpp + +priority_buffer_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp new file mode 100644 index 00000000000..2cdc50f2116 --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp @@ -0,0 +1,140 @@ +// $Id$ + +// This short program copies stdin to stdout via the use of an ASX +// Message_Queue. It illustrates an implementation of the classic +// "bounded buffer" program. + +#include "ace/Message_Queue.h" +#include "ace/Thread_Manager.h" +#include "ace/OS_NS_time.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(Message_Queue, bounded_buffer, "$Id$") + +#if defined (ACE_HAS_THREADS) + +// The producer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +static void * +producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new message. + ACE_Message_Block *mb; + + ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0); + + n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ()); + + if (n <= 0) + { + // Send a shutdown message to the other thread and exit. + mb->length (0); + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put_next")); + break; + } + + // Send the message to the other thread. + else + { + mb->msg_priority (n); + mb->wr_ptr (n); + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put_next")); + } + } + + return 0; +} + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// producer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + int result = 0; + + // Keep looping, reading a message out of the queue, until we timeout + // or get a message with a length == 0, which signals us to quit. + + for (;;) + { + ACE_Message_Block *mb; + + ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds + + result = msg_queue->dequeue_head (mb, &timeout); + + if (result == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); + + mb->release (); + + if (length == 0) + break; + } + + if (result == -1 && errno == EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n%a", + "timed out waiting for message", + 1)); + return 0; +} + +// Spawn off two threads that copy stdin to stdout. + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + // Message list. + ACE_Message_Queue<ACE_MT_SYNCH> msg_queue; + + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (producer), + (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "spawn"), + 1); + else if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (consumer), + (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "spawn"), + 1); + + // Wait for producer and consumer threads to exit. + ACE_Thread_Manager::instance ()->wait (); + return 0; +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/Message_Queue/buffer_stream.cpp b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp new file mode 100644 index 00000000000..12f2a0303a1 --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp @@ -0,0 +1,314 @@ +// $Id$ + +// This short program copies stdin to stdout via the use of an ASX +// Stream. It illustrates an implementation of the classic "bounded +// buffer" program using an ASX Stream containing two Modules. Each +// ACE_Module contains two Tasks. Each ACE_Task contains a +// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how +// the use of these reusable components reduces the reliance on global +// variables, as compared with the bounded_buffer.C example. + +#include "ace/OS_main.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_time.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Service_Config.h" +#include "ace/Stream.h" +#include "ace/Module.h" +#include "ace/Task.h" + +ACE_RCSID(Message_Queue, buffer_stream, "$Id$") + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; +typedef ACE_Task<ACE_MT_SYNCH> MT_Task; + +class Common_Task : public MT_Task + // = TITLE + // Methods that are common to the producer and consumer. +{ +public: + Common_Task (void) {} + + //FUZZ: disable check_for_lack_ACE_OS + // ACE_Task hooks + virtual int open (void * = 0); + virtual int close (u_long = 0); + //FUZZ: enable check_for_lack_ACE_OS +}; + +// Define the Producer interface. + +class Producer : public Common_Task +{ +public: + Producer (void) {} + + // Read data from stdin and pass to consumer. + virtual int svc (void); +}; + +class Consumer : public Common_Task + // = TITLE + // Define the Consumer interface. +{ +public: + Consumer (void) {} + + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); + // Enqueue the message on the ACE_Message_Queue for subsequent + // handling in the svc() method. + + virtual int svc (void); + // Receive message from producer and print to stdout. + +private: + + ACE_Time_Value timeout_; +}; + +class Filter : public MT_Task + // = TITLE + // Defines a Filter that prepends a line number in front of each + // line. +{ +public: + Filter (void): count_ (1) {} + + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); + // Change the size of the message before passing it downstream. + +private: + size_t count_; + // Count the number of lines passing through the filter. +}; + +// Spawn off a new thread. + +int +Common_Task::open (void *) +{ + if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawn")), + -1); + return 0; +} + +int +Common_Task::close (u_long exit_status) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"), + exit_status, + this->name ())); + + // Can do anything here that is required when a thread exits, e.g., + // storing thread-specific information in some other storage + // location, etc. + return 0; +} + +// The Consumer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +int +Producer::svc (void) +{ + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new message (add one to avoid nasty boundary + // conditions). + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ); + + if (n <= 0) + { + // Send a shutdown message to the other thread and exit. + mb->length (0); + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("put_next"))); + break; + } + + // Send the message to the other thread. + else + { + mb->wr_ptr (n); + // NUL-terminate the string (since we use strlen() on it + // later). + mb->rd_ptr ()[n] = '\0'; + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("put_next"))); + } + } + + return 0; +} + +// Simply enqueue the Message_Block into the end of the queue. + +int +Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +{ + return this->putq (mb, tv); +} + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// Consumer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +int +Consumer::svc (void) +{ + int result = 0; + + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + + for (;;) + { + ACE_Message_Block *mb = 0; + + // Wait for upto 4 seconds. + this->timeout_.sec (ACE_OS::time (0) + 4); + + result = this->getq (mb, &this->timeout_); + + if (result == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::write (ACE_STDOUT, + mb->rd_ptr (), + ACE_OS::strlen (mb->rd_ptr ())); + + mb->release (); + + if (length == 0) + break; + } + + if (result == -1 && errno == EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n%a"), + ACE_TEXT ("timed out waiting for message"), + 1)); + return 0; +} + +int +Filter::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + if (mb->length () == 0) + return this->put_next (mb, tv); + else + { + char buf[BUFSIZ]; + + // Stash a copy of the buffer away. + ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf); + + // Increase the size of the buffer large enough that it will be + // reallocated (in order to test the reallocation mechanisms). + + mb->size (mb->length () + BUFSIZ); + mb->length (mb->size ()); + + // Prepend the line count in front of the buffer. + ACE_OS::sprintf (mb->rd_ptr (), + ACE_SIZE_T_FORMAT_SPECIFIER + ": %s", + this->count_++, + buf); + return this->put_next (mb, tv); + } +} + +// Main driver function. + +int +ACE_TMAIN (int, ACE_TCHAR *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + // This Stream controls hierachically-related active objects. + MT_Stream stream; + + MT_Module *pm = 0; + MT_Module *fm = 0; + MT_Module *cm = 0; + + ACE_NEW_RETURN (cm, + MT_Module (ACE_TEXT ("Consumer"), + new Consumer), + -1); + ACE_NEW_RETURN (fm, + MT_Module (ACE_TEXT ("Filter"), + new Filter), + -1); + ACE_NEW_RETURN (pm, + MT_Module (ACE_TEXT ("Producer"), + new Producer), + -1); + + // Create Consumer, Filter, and Producer Modules and push them onto + // the Stream. All processing is performed in the Stream. + + if (stream.push (cm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + else if (stream.push (fm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + else if (stream.push (pm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + // Barrier synchronization: wait for the threads to exit, then exit + // ourselves. + ACE_Thread_Manager::instance ()->wait (); + return 0; +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("threads not supported on this platform\n"))); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/Message_Queue/priority_buffer.cpp b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp new file mode 100644 index 00000000000..db60a33bcae --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp @@ -0,0 +1,145 @@ +// $Id$ + +// This short program prints the contents of stdin to stdout sorted by +// the length of each line via the use of an ASX Message_Queue. It +// illustrates how priorities can be used for ACE Message_Queues. + +#include "ace/OS_NS_stdio.h" +#include "ace/Malloc_Base.h" // To get ACE_Allocator +#include "ace/Message_Queue.h" +#include "ace/Read_Buffer.h" +#include "ace/Thread_Manager.h" +#include "ace/Service_Config.h" + +ACE_RCSID(Message_Queue, priority_buffer, "$Id$") + +#if defined (ACE_HAS_THREADS) + +// Global thread manager. +static ACE_Thread_Manager thr_mgr; + +// Make the queue be capable of being *very* large. +static const long max_queue = LONG_MAX; + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// producer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +static void * +consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + + for (;;) + { + ACE_Message_Block *mb; + + if (msg_queue->dequeue_head (mb) == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::puts (mb->rd_ptr ()); + + // Free up the buffer memory and the Message_Block. + ACE_Allocator::instance ()->free (mb->rd_ptr ()); + mb->release (); + + if (length == 0) + break; + } + + return 0; +} + +// The producer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +static void * +producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + ACE_Read_Buffer rb (ACE_STDIN); + + // Keep reading stdin, until we reach EOF. + + for (;;) + { + // Allocate a new buffer. + char *buffer = rb.read ('\n'); + + ACE_Message_Block *mb; + + if (buffer == 0) + { + // Send a 0-sized shutdown message to the other thread and + // exit. + + ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0); + + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + break; + } + + // Enqueue the message in priority order. + else + { + // Allocate a new message, but have it "borrow" its memory + // from the buffer. + ACE_NEW_RETURN (mb, ACE_Message_Block (rb.size (), + ACE_Message_Block::MB_DATA, + 0, + buffer), + 0); + mb->msg_priority (rb.size ()); + mb->wr_ptr (rb.size ()); + + ACE_DEBUG ((LM_DEBUG, + "enqueueing message of size %d\n", + mb->msg_priority ())); + + // Enqueue in priority order. + if (msg_queue->enqueue_prio (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + } + } + + // Now read all the items out in priority order (i.e., ordered by + // the size of the lines!). + consumer (msg_queue); + + return 0; +} + +// Spawn off one thread that copies stdin to stdout in order of the +// size of each line. + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + // Message queue. + ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue); + + if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + // Wait for producer and consumer threads to exit. + thr_mgr.wait (); + return 0; +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp new file mode 100644 index 00000000000..b9c9c0cf2bd --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp @@ -0,0 +1,138 @@ +// $Id$ + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "Consumer_Router.h" +#include "Options.h" + +ACE_RCSID(UPIPE_Event_Server, Consumer_Router, "$Id$") + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY; + +int +Consumer_Handler::open (void *a) +{ + CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a); +} + +Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm) +{ +} + +// Create a new handler that will interact with a consumer and point +// its ROUTER_TASK_ data member to the CONSUMER_ROUTER. + +Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm) + : CONSUMER_ROUTER (tm) +{ +} + +// Initialize the Router.. + +int +Consumer_Router::open (void *) +{ + ACE_ASSERT (this->is_reader ()); + ACE_TCHAR *argv[3]; + + argv[0] = (ACE_TCHAR *) this->name (); + argv[1] = (ACE_TCHAR *) options.consumer_file (); + argv[2] = 0; + + if (this->init (1, &argv[1]) == -1) + return -1; + + // Make this an active object. + // return this->activate (options.t_flags ()); + + // Until that's done, return 1 to indicate that the object wasn't activated. + return 1; +} + +int +Consumer_Router::close (u_long) +{ + ACE_ASSERT (this->is_reader ()); + this->peer_map_.close (); + this->msg_queue ()->deactivate(); + return 0; +} + + +// Handle incoming messages in a separate thread.. + +int +Consumer_Router::svc (void) +{ + ACE_Message_Block *mb = 0; + + ACE_ASSERT (this->is_reader ()); + + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting svc in %s\n"), + this->name ())); + + while (this->getq (mb) > 0) + if (this->put_next (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) put_next failed in %s\n"), + this->name ()), -1); + + return 0; + // Note the implicit ACE_OS::thr_exit() via destructor. +} + +// Send a MESSAGE_BLOCK to the supplier(s).. + +int +Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + ACE_ASSERT (this->is_reader ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else +{ +//printf("consumer-Router is routing : send_peers\n"); + return this->send_peers (mb); +} +} + +// Return information about the Client_Router ACE_Module.. + +int +Consumer_Router::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_UPIPE_Addr addr; + const ACE_TCHAR *mod_name = this->name (); + ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_; + + if (sa.get_local_addr (addr) == -1) + return -1; + +#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR) +# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls") +#else +# define FMTSTR ACE_TEXT ("%s\t %s/ %s") +#endif + + ACE_OS::sprintf (buf, FMTSTR, + mod_name, ACE_TEXT ("upipe"), + ACE_TEXT ("# consumer router\n")); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h new file mode 100644 index 00000000000..93a1220dc11 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.h @@ -0,0 +1,53 @@ +/* -*- C++ -*- */ +// $Id$ + +// The interface between one or more consumers and an Event Server +// ACE_Stream. + +#ifndef _CONSUMER_ROUTER_H +#define _CONSUMER_ROUTER_H + +#include "ace/Thread_Manager.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Addr.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +class Consumer_Handler; // Forward declaration.... + +typedef ACE_HANDLE CONSUMER_KEY; + +typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER; + +class Consumer_Handler + : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> +{ +public: + Consumer_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Consumer_Router : public CONSUMER_ROUTER +{ +public: + Consumer_Router (ACE_Thread_Manager *thr_manager); + +protected: + // ACE_Task hooks.. + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + // Dynamic linking hooks. + virtual int info (ACE_TCHAR **info_string, size_t length) const; +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _CONSUMER_ROUTER_H */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp new file mode 100644 index 00000000000..689a9280766 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp @@ -0,0 +1,73 @@ +// $Id$ + +#include "ace/OS_NS_string.h" +#include "Event_Analyzer.h" + +ACE_RCSID(UPIPE_Event_Server, Event_Analyzer, "$Id$") + +#if defined (ACE_HAS_THREADS) + +int +Event_Analyzer::open (void *) +{ + return 0; +} + +int +Event_Analyzer::close (u_long) +{ + return 0; +} + +int +Event_Analyzer::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd; + + switch (cmd = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + break; + } + return 0; +} + +int +Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + this->control (mb); + + return this->put_next (mb); +} + +int +Event_Analyzer::init (int, ACE_TCHAR *[]) +{ + return 0; +} + +int +Event_Analyzer::fini (void) +{ + return 0; +} + +int +Event_Analyzer::info (ACE_TCHAR **strp, size_t length) const +{ + const ACE_TCHAR *mod_name = this->name (); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h new file mode 100644 index 00000000000..01bc3028964 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h @@ -0,0 +1,37 @@ +/* -*- C++ -*- */ +// $Id$ + +// Signal router. + +#ifndef _EVENT_ANALYZER_H +#define _EVENT_ANALYZER_H + +#include "ace/Stream.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Module.h" +#include "ace/Task.h" + +#if defined (ACE_HAS_THREADS) + +class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH> +{ +public: + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + + // Dynamic linking hooks. + virtual int init (int argc, ACE_TCHAR *argv[]); + virtual int fini (void); + virtual int info (ACE_TCHAR **info_string, size_t length) const; + +private: + virtual int control (ACE_Message_Block *); +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _EVENT_ANALYZER_H */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am b/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am new file mode 100644 index 00000000000..f8ac89a7479 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Makefile.am @@ -0,0 +1,50 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.UPIPE_Event_Server.am + +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS = UPIPE_Event_Server + +UPIPE_Event_Server_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +UPIPE_Event_Server_SOURCES = \ + Consumer_Router.cpp \ + Event_Analyzer.cpp \ + Options.cpp \ + Peer_Router.cpp \ + Supplier_Router.cpp \ + event_server.cpp \ + Consumer_Router.h \ + Event_Analyzer.h \ + Options.h \ + Options.inl \ + Peer_Router.h \ + Supplier_Router.h + +UPIPE_Event_Server_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp new file mode 100644 index 00000000000..033132610aa --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.cpp @@ -0,0 +1,209 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_Thread.h" +#include "ace/OS_NS_stdio.h" +#if defined (ACE_HAS_TRACE) +# include "ace/OS_NS_strings.h" +#endif /* ACE_HAS_TRACE */ + +#include "Options.h" + +ACE_RCSID(UPIPE_Event_Server, Options, "$Id$") + +#if defined (ACE_HAS_THREADS) + +Options::Options (void) + : thr_count_ (4), + t_flags_ (THR_DETACHED), + high_water_mark_ (8 * 1024), + low_water_mark_ (1024), + message_size_ (128), + initial_queue_length_ (0), + iterations_ (100000), + debugging_ (0), + verbosity_ (0), + consumer_port_ (ACE_TEXT ("-p 10000")), + supplier_port_ (ACE_TEXT ("-p 10001")), + consumer_file_ (ACE_TEXT ("-f/tmp/conupipe")), + supplier_file_ (ACE_TEXT ("-f/tmp/supupipe")) +{ +} + +Options::~Options (void) +{ +} + +void Options::print_results (void) +{ + ACE_Profile_Timer::ACE_Elapsed_Time et; + this->itimer_.elapsed_time (et); + +#if defined (ACE_HAS_PRUSAGE_T) + prusage_t rusage; + this->itimer_.get_rusage (rusage); + + if (options.verbose ()) + { + ACE_OS::printf ("final concurrency hint = %d\n", ACE_OS::thr_getconcurrency ()); + ACE_OS::printf ("%8d = lwpid\n" + "%8d = lwp count\n" + "%8d = minor page faults\n" + "%8d = major page faults\n" + "%8d = input blocks\n" + "%8d = output blocks\n" + "%8d = messages sent\n" + "%8d = messages received\n" + "%8d = signals received\n" + "%8ds, %dms = wait-cpu (latency) time\n" + "%8ds, %dms = user lock wait sleep time\n" + "%8ds, %dms = all other sleep time\n" + "%8d = voluntary context switches\n" + "%8d = involuntary context switches\n" + "%8d = system calls\n" + "%8d = chars read/written\n", + (int) rusage.pr_lwpid, + (int) rusage.pr_count, + (int) rusage.pr_minf, + (int) rusage.pr_majf, + (int) rusage.pr_inblk, + (int) rusage.pr_oublk, + (int) rusage.pr_msnd, + (int) rusage.pr_mrcv, + (int) rusage.pr_sigs, + (int) rusage.pr_wtime.tv_sec, (int) rusage.pr_wtime.tv_nsec / 1000000, + (int) rusage.pr_ltime.tv_sec, (int) rusage.pr_ltime.tv_nsec / 1000000, + (int) rusage.pr_slptime.tv_sec, (int) rusage.pr_slptime.tv_nsec / 1000000, + (int) rusage.pr_vctx, + (int) rusage.pr_ictx, + (int) rusage.pr_sysc, + (int) rusage.pr_ioch); + } +#endif /* ACE_HAS_PRUSAGE_T */ + + ACE_OS::printf ("---------------------\n" + "real time = %.3f\n" + "user time = %.3f\n" + "system time = %.3f\n" + "---------------------\n", + et.real_time, et.user_time, et.system_time); +} + +// Manages the options. +Options options; + +void +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + + //FUZZ: disable check_for_lack_ACE_OS + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("C:c:bdH:i:L:l:M:nS:s:t:T:v")); + int c; + + while ((c = getopt ()) != -1) + //FUZZ: enable check_for_lack_ACE_OS + switch (c) + { + case 'b': + this->t_flags (THR_BOUND); + break; + case 'C': + this->consumer_file (getopt.opt_arg ()); + break; + case 'c': + this->consumer_port (getopt.opt_arg ()); + break; + case 'd': + this->debugging_ = 1; + break; + case 'H': + this->high_water_mark (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'i': + this->iterations (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'L': + this->low_water_mark (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'l': + this->initial_queue_length (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'M': + this->message_size (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'n': + this->t_flags (THR_NEW_LWP); + break; + case 'S': + this->supplier_file (getopt.opt_arg ()); + break; + case 's': + this->supplier_port (getopt.opt_arg ()); + break; + case 'T': + #if defined (ACE_HAS_TRACE) + if (ACE_OS::strcasecmp (getopt.opt_arg (), ACE_TEXT ("ON")) == 0) + ACE_Trace::start_tracing (); + else if (ACE_OS::strcasecmp (getopt.opt_arg (), ACE_TEXT ("OFF")) == 0) + ACE_Trace::stop_tracing (); + #endif /* ACE_HAS_TRACE */ + break; + case 't': + this->thr_count (ACE_OS::atoi (getopt.opt_arg ())); + break; + case 'v': + this->verbosity_ = 1; + break; + default: + ACE_OS::fprintf (stderr, "%s\n" + "\t[-b] (THR_BOUND)\n" + "\t[-C consumer file]\n" + "\t[-c consumer port]\n" + "\t[-d] (enable debugging)\n" + "\t[-H high water mark]\n" + "\t[-i number of test iterations]\n" + "\t[-L low water mark]\n" + "\t[-M] message size \n" + "\t[-n] (THR_NEW_LWP)\n" + "\t[-q max queue size]\n" + "\t[-S supplier file]\n" + "\t[-s supplier port]\n" + "\t[-t number of threads]\n" + "\t[-v] (verbose) \n", + ACE_TEXT_ALWAYS_CHAR (argv[0])); + ACE_OS::exit (1); + /* NOTREACHED */ + break; + } + + // HACK! This needs to be done to avoid the mismatch from ACE_TEXT + // in ACE_SIZE_T_FORMAT_SPECIFIER to narrow-char on wide-char builds. + // It only works because it's at the end of the file. +# if defined (ACE_TEXT) +# undef ACE_TEXT +# endif +# define ACE_TEXT(X) X + if (this->verbose ()) + ACE_OS::printf ("%8d = initial concurrency hint\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = total iterations\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = thread count\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = low water mark\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = high water mark\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = message_size\n" + ACE_SIZE_T_FORMAT_SPECIFIER " = initial queue length\n" + "%8d = THR_BOUND\n" + "%8d = THR_NEW_LWP\n", + ACE_OS::thr_getconcurrency (), + this->iterations (), + this->thr_count (), + this->low_water_mark (), + this->high_water_mark (), + this->message_size (), + this->initial_queue_length (), + (this->t_flags () & THR_BOUND) != 0, + (this->t_flags () & THR_NEW_LWP) != 0); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.h b/ACE/examples/ASX/UPIPE_Event_Server/Options.h new file mode 100644 index 00000000000..d775a49a928 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.h @@ -0,0 +1,88 @@ +/* -*- C++ -*- */ +// $Id$ + +// Option manager for Event Server. + +#ifndef DEVICE_OPTIONS_H +#define DEVICE_OPTIONS_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Profile_Timer.h" + +#if defined (ACE_HAS_THREADS) + +class Options +{ +public: + Options (void); + ~Options (void); + void parse_args (int argc, ACE_TCHAR *argv[]); + + void stop_timer (void); + void start_timer (void); + + void thr_count (size_t count); + size_t thr_count (void); + + void initial_queue_length (size_t length); + size_t initial_queue_length (void); + + void high_water_mark (size_t size); + size_t high_water_mark (void); + + void low_water_mark (size_t size); + size_t low_water_mark (void); + + void message_size (size_t size); + size_t message_size (void); + + void iterations (size_t n); + size_t iterations (void); + + void t_flags (long flag); + long t_flags (void); + + void supplier_port (const ACE_TCHAR *port); + const ACE_TCHAR *supplier_port (void); + + void consumer_port (const ACE_TCHAR *port); + const ACE_TCHAR *consumer_port (void); + + void supplier_file (const ACE_TCHAR *file); + const ACE_TCHAR *supplier_file (void); + + void consumer_file (const ACE_TCHAR *file); + const ACE_TCHAR *consumer_file (void); + + int debug (void); + int verbose (void); + + void print_results (void); + +private: + ACE_Profile_Timer itimer_; // Time the process. + size_t thr_count_; // Number of threads to spawn. + long t_flags_; // Flags to thr_create(). + size_t high_water_mark_; // ACE_Task high water mark. + size_t low_water_mark_; // ACE_Task low water mark. + size_t message_size_; // Size of a message. + size_t initial_queue_length_; // Initial number of items in the queue. + size_t iterations_; // Number of iterations to run the test program. + int debugging_; // Extra debugging info. + int verbosity_; // Extra verbose messages. + const ACE_TCHAR *consumer_port_; // Port that the Consumer_Router is using. + const ACE_TCHAR *supplier_port_; // Port that the Supplier_Router is using. + const ACE_TCHAR *consumer_file_; // file that the Consumer_Router is using. + const ACE_TCHAR *supplier_file_; // file that the Supplier_Router is using. +}; + +extern Options options; + +#include "Options.inl" +#endif /* ACE_HAS_THREADS */ +#endif /* DEVICE_OPTIONS_H */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Options.inl b/ACE/examples/ASX/UPIPE_Event_Server/Options.inl new file mode 100644 index 00000000000..af04f73eb26 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Options.inl @@ -0,0 +1,166 @@ +/* -*- C++ -*- */ +// $Id$ + +// Option manager for ustreams. + +// Since this is only included in Options.h these should stay +// inline, not ACE_INLINE. +// FUZZ: disable check_for_inline + + +inline void +Options::supplier_port (const ACE_TCHAR *port) +{ + this->supplier_port_ = port; +} + +inline const ACE_TCHAR * +Options::supplier_port (void) +{ + return this->supplier_port_; +} + +inline void +Options::supplier_file (const ACE_TCHAR *file) +{ + this->supplier_file_ = file; +} + +inline const ACE_TCHAR * +Options::supplier_file (void) +{ + return this->supplier_file_; +} + +inline void +Options::consumer_file (const ACE_TCHAR *file) +{ + this->consumer_file_ = file; +} + +inline const ACE_TCHAR * +Options::consumer_file (void) +{ + return this->consumer_file_; +} + +inline void +Options::consumer_port (const ACE_TCHAR *port) +{ + this->consumer_port_ = port; +} + +inline const ACE_TCHAR * +Options::consumer_port (void) +{ + return this->consumer_port_; +} + +inline void +Options::start_timer (void) +{ + this->itimer_.start (); +} + +inline void +Options::stop_timer (void) +{ + this->itimer_.stop (); +} + +inline void +Options::thr_count (size_t count) +{ + this->thr_count_ = count; +} + +inline size_t +Options::thr_count (void) +{ + return this->thr_count_; +} + +inline void +Options::initial_queue_length (size_t length) +{ + this->initial_queue_length_ = length; +} + +inline size_t +Options::initial_queue_length (void) +{ + return this->initial_queue_length_; +} + +inline void +Options::high_water_mark (size_t size) +{ + this->high_water_mark_ = size; +} + +inline size_t +Options::high_water_mark (void) +{ + return this->high_water_mark_; +} + +inline void +Options::low_water_mark (size_t size) +{ + this->low_water_mark_ = size; +} + +inline size_t +Options::low_water_mark (void) +{ + return this->low_water_mark_; +} + +inline void +Options::message_size (size_t size) +{ + this->message_size_ = size; +} + +inline size_t +Options::message_size (void) +{ + return this->message_size_; +} + +inline void +Options::iterations (size_t n) +{ + this->iterations_ = n; +} + +inline size_t +Options::iterations (void) +{ + return this->iterations_; +} + +inline void +Options::t_flags (long flag) +{ + this->t_flags_ |= flag; +} + +inline long +Options::t_flags (void) +{ + return this->t_flags_; +} + +inline int +Options::debug (void) +{ + return this->debugging_; +} + +inline int +Options::verbose (void) +{ + return this->verbosity_; +} + diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp new file mode 100644 index 00000000000..757eecedc33 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp @@ -0,0 +1,283 @@ +// $Id$ + +#if !defined (_PEER_ROUTER_C) + +#define _PEER_ROUTER_C + +#include "ace/Get_Opt.h" +#include "ace/Service_Config.h" + +#include "Peer_Router.h" +#include "Options.h" + +ACE_RCSID(UPIPE_Event_Server, Peer_Router, "$Id$") + +#if defined (ACE_HAS_THREADS) + +// Define some short-hand macros to deal with long templates +// names... + +#define PH PEER_HANDLER +#define PA PEER_ACCEPTOR +#define PAD PEER_ADDR +#define PK PEER_KEY +#define PM PEER_MAP + +template <class PH, class PK> int +Acceptor_Factory<PH, PK>::init (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("df:"), 0); + ACE_UPIPE_Addr addr; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'f': + addr.set (get_opt.opt_arg ()); + break; + case 'd': + break; + default: + break; + } + + if (this->open (addr, ACE_Reactor::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1); + return 0; +} + +template <class PH, class PK> +Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr) + : pr_ (pr) +{ +} + +template <class PH, class PK> Peer_Router<PH, PK> * +Acceptor_Factory<PH, PK>::router (void) +{ + return this->pr_; +} + +template <class ROUTER, class KEY> +Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm) + : ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH> (tm) +{ +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::svc (void) +{ + // Just a try !! we're just reading from our ACE_Message_Queue. + ACE_Message_Block *db, *hb; + int n; + // do an endless loop + for (;;) + { + db = new ACE_Message_Block (BUFSIZ); + hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); + + if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("recv failed")), -1); + else if (n == 0) // Client has closed down the connection. + { + + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("unbind failed")), -1); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down \n"))); + return -1; // We do not need to be deregistered by reactor + // as we were not registered at all + } + else // Transform incoming buffer into a Message and pass downstream. + { + db->wr_ptr (n); + *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment. + hb->wr_ptr (sizeof (long)); + if (this->router_task_->reply (hb) == -1) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Peer_Handler.svc : router_task->reply failed\n"))); + return -1; + } + + // return this->router_task_->reply (hb) == -1 ? -1 : 0; + } + } + ACE_NOTREACHED(return 0); +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + return this->peer ().send_n (mb->rd_ptr (), mb->length ()); +} + +// Create a new handler and point its ROUTER_TASK_ data member to the +// corresponding router. Note that this router is extracted out of +// the Acceptor_Factory * that is passed in via the +// ACE_Acceptor::handle_input() method. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::open (void *a) +{ + ACE_TCHAR buf[BUFSIZ], *p = buf; + + if (this->router_task_->info (&p, sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) creating handler for %s, fd = %d, this = %@\n"), + buf, this->get_handle (), a)); + else + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("info")), -1); + + if ( this->activate (options.t_flags ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("activation of thread failed")), -1); + else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("bind_peer")), -1); + return 0; +} + +// Receive a message from a supplier.. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) +{ + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) input arrived on sd %d\n"), h)); +// ACE_Reactor::instance ()->remove_handler(h, +// ACE_Event_Handler::ALL_EVENTS_MASK +// |ACE_Event_Handler::DONT_CALL); +// this method should be called only if the peer shuts down +// so we deactivate our ACE_Message_Queue to awake our svc thread + + return 0; + +#if 0 + ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); + ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); + int n; + + if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("recv failed")), -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("unbind failed")), -1); + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); + return -1; // Instruct the ACE_Reactor to deregister us by returning -1. + } + else // Transform incoming buffer into a Message and pass downstream. + { + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. + hb->wr_ptr (sizeof (long)); + return this->router_task_->reply (hb) == -1 ? -1 : 0; + } +#endif +} + +template <class PH, class PK> +Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm) + : ACE_Task<ACE_MT_SYNCH> (tm) +{ +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) +{ + ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_; + int bytes = 0; + int iterations = 0; + ACE_Message_Block *data_block = mb->cont (); + for (ACE_Map_Entry<PK, PH *> *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) sending to peer via sd %d\n"), + ss->ext_id_)); + + iterations++; + bytes += ss->int_id_->put (data_block); + } + + mb->release (); + return bytes == 0 ? 0 : bytes / iterations; +} + +template <class PH, class PK> +Peer_Router<PH, PK>::~Peer_Router (void) +{ +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::fini (void) +{ + delete this->acceptor_; + return 0; +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; + + switch (command = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + return -1; + } + return 0; +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::unbind_peer (PK key) +{ + return this->peer_map_.unbind (key); +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph) +{ + PH *peer_handler = (PH *) ph; + return this->peer_map_.bind (key, peer_handler); +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::init (int argc, ACE_TCHAR *argv[]) +{ + this->acceptor_ = new Acceptor_Factory <PH, PK> (this); + + if (this->acceptor_->init (argc, argv) == -1 + || this->peer_map_.open () == -1) + return -1; + else + { + ACE_UPIPE_Addr addr; + ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor (); + + if (pa.get_local_addr (addr) != -1) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) initializing %s, file = %s, fd = %d, this = %@\n"), + this->name (), addr.get_path_name (), pa.get_handle (), this)); + else + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), -1); + } + return 0; +} + +#undef PH +#undef PA +#undef PAD +#undef PK +#undef PM +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_C */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h new file mode 100644 index 00000000000..3962d371ae0 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Peer_Router.h @@ -0,0 +1,127 @@ +/* -*- C++ -*- */ +// $Id$ + +// The interface between one or more peers and a stream. A peer +// typically runs remotely on another machine. + +#ifndef _PEER_ROUTER_H +#define _PEER_ROUTER_H + +#include "ace/Acceptor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Svc_Handler.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Addr.h" +#include "ace/Thread_Manager.h" +#include "ace/Map_Manager.h" + +#if defined (ACE_HAS_THREADS) +#include "ace/RW_Mutex.h" + +// Forward declaration. +template <class PEER_HANDLER, class KEY> +class Peer_Router; + +template <class PEER_HANDLER, class KEY> +class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_UPIPE_ACCEPTOR> +{ +public: + Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr); + Peer_Router<PEER_HANDLER, KEY> *router (void); + + int init (int argc, ACE_TCHAR *argv[]); + // Initialize the acceptor when it's linked dynamically. + +private: + Peer_Router<PEER_HANDLER, KEY> *pr_; +}; + +// Receive input from a Peer.. +template <class ROUTER, class KEY> +class Peer_Handler : public ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH> +{ +public: + Peer_Handler (ACE_Thread_Manager * = 0); + + virtual int open (void * = 0); + // Called by the ACE_Acceptor::handle_input() to activate this object. + + virtual int handle_input (ACE_HANDLE); + // Receive input from the peer.. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send output to a peer. + +protected: + ROUTER *router_task_; + // Pointer to write task.. + +private: + // Don't need this method here... + virtual int svc (void); +}; + +// This abstract base class provides mechanisms for routing messages +// to/from a ACE_Stream from/to one or more peers (which are typically +// running on remote hosts). A subclass of Peer_Router overrides the +// open(), close(), and put() methods in order to specialize the +// behavior of the router to meet application-specific requirements. + +template <class PEER_HANDLER, class PEER_KEY> +class Peer_Router : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Peer_Router (ACE_Thread_Manager * = 0); + ~Peer_Router (void); + + typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER; + + // Remove a PEER_HANDLER from the PEER_MAP. + virtual int unbind_peer (PEER_KEY); + + // Add a PEER_HANDLER to the PEER_MAP. + virtual int bind_peer (PEER_KEY, HANDLER *); + + // Send the message block to the peer(s).. + int send_peers (ACE_Message_Block *mb); + +protected: +// Handle control messages arriving from adjacent Modules. + virtual int control (ACE_Message_Block *); + + // Map used to keep track of active peers. + ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> peer_map_; + + // Dynamic linking initialization hooks inherited from ACE_Task. + virtual int init (int argc, ACE_TCHAR *argv[]); + virtual int fini (void); + + // Factory for accepting new PEER_HANDLERs. + Acceptor_Factory<PEER_HANDLER, PEER_KEY> *acceptor_; + +private: +// Prevent copies and pass-by-value. + ACE_UNIMPLEMENTED_FUNC (Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &)) + ACE_UNIMPLEMENTED_FUNC (void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &)) +}; + +#if defined (__ACE_INLINE__) +#define ACE_INLINE inline +#else +#define ACE_INLINE +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Peer_Router.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Peer_Router.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_H */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp new file mode 100644 index 00000000000..8b4a53d6331 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp @@ -0,0 +1,137 @@ +// $Id$ + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "Options.h" +#include "Supplier_Router.h" + +ACE_RCSID(UPIPE_Event_Server, Supplier_Router, "$Id$") + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY; + +int +Supplier_Handler::open (void *a) +{ + SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a); +} + +Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm) +{ +} + +// Create a new router and associate it with the REACTOR parameter.. + +Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm) + : SUPPLIER_ROUTER (tm) +{ +} + +// Handle incoming messages in a separate thread.. + +int +Supplier_Router::svc (void) +{ + ACE_ASSERT (this->is_writer ()); + + ACE_Message_Block *message_block = 0; + + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ())); + + while (this->getq (message_block) > 0) + { + if (this->put_next (message_block) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1); + } + + return 0; +} + +// Initialize the Router.. + +int +Supplier_Router::open (void *) +{ + ACE_ASSERT (this->is_writer ()); + ACE_TCHAR *argv[3]; + + argv[0] = (ACE_TCHAR *)this->name (); + argv[1] = (ACE_TCHAR *)options.supplier_file (); + argv[2] = 0; + + if (this->init (1, &argv[1]) == -1) + return -1; + + // Make this an active object. + // return this->activate (options.t_flags ()); + + // Until that's done, return 1 to indicate that the object wasn't activated. + return 1; +} + +// Close down the router.. + +int +Supplier_Router::close (u_long) +{ + ACE_ASSERT (this->is_writer ()); + this->peer_map_.close (); + this->msg_queue ()->deactivate(); + return 0; +} + +// Send a MESSAGE_BLOCK to the supplier(s).. + +int +Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + ACE_ASSERT (this->is_writer ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else + { +//printf("supplier-Router is routing: send_peers\n"); + return this->send_peers (mb); + } +} + +// Return information about the Supplier_Router ACE_Module.. + +int +Supplier_Router::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_UPIPE_Addr addr; + const ACE_TCHAR *mod_name = this->name (); + ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_; + + if (sa.get_local_addr (addr) == -1) + return -1; + +#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR) +# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls") +#else +# define FMTSTR ACE_TEXT ("%s\t %s/ %s") +#endif + + ACE_OS::sprintf (buf, FMTSTR, + mod_name, ACE_TEXT ("upipe"), + ACE_TEXT ("# supplier router\n")); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h new file mode 100644 index 00000000000..4d5d440e018 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/Supplier_Router.h @@ -0,0 +1,57 @@ +/* -*- C++ -*- */ +// $Id$ + +// The interface between a supplier and an Event Service ACE_Stream. + +#ifndef _SUPPLIER_ROUTER_H +#define _SUPPLIER_ROUTER_H + +#include "ace/UPIPE_Addr.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/UPIPE_Acceptor.h" +#include "ace/Map_Manager.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +// Forward declaration. +class Supplier_Handler; + +// Type of search key for SUPPLIER_MAP. +typedef ACE_HANDLE SUPPLIER_KEY; + +// Instantiated type for routing messages to suppliers. + +typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER; + +class Supplier_Handler + : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> +{ +public: + Supplier_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Supplier_Router : public SUPPLIER_ROUTER +{ +public: + Supplier_Router (ACE_Thread_Manager *); + +protected: + // ACE_Task hooks.. + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + // Dynamic linking hooks inherited from Peer_Router. + virtual int info (ACE_TCHAR **info_string, size_t length) const; +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _SUPPLIER_ROUTER_H */ diff --git a/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc b/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc new file mode 100644 index 00000000000..a4c93dc1fdb --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/UPIPE_Event.mpc @@ -0,0 +1,15 @@ +// -*- MPC -*- +// $Id$ + +project(*Server) : aceexe { + avoids += ace_for_tao + exename = UPIPE_Event_Server + Source_Files { + Consumer_Router.cpp + Event_Analyzer.cpp + Options.cpp + Peer_Router.cpp + Supplier_Router.cpp + event_server.cpp + } +} diff --git a/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp b/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp new file mode 100644 index 00000000000..91ae382fac1 --- /dev/null +++ b/ACE/examples/ASX/UPIPE_Event_Server/event_server.cpp @@ -0,0 +1,271 @@ +// $Id$ + +// Test the event server. + +#include "ace/OS_main.h" +#include "ace/Stream.h" +#include "ace/Service_Config.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Connector.h" + +// FUZZ: disable check_for_streams_include +#include "ace/streams.h" + +#include "Options.h" +#include "Consumer_Router.h" +#include "Event_Analyzer.h" +#include "Supplier_Router.h" +#include "ace/Sig_Adapter.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID (UPIPE_Event_Server, + event_server, + "$Id$") + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; + +// Handle SIGINT and terminate the entire application. + +class Quit_Handler : public ACE_Sig_Adapter +{ +public: + Quit_Handler (void); + virtual int handle_input (ACE_HANDLE fd); +}; + +Quit_Handler::Quit_Handler (void) + : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop)) +{ + // Register to trap input from the user. + if (ACE_Event_Handler::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); + // Register to trap the SIGINT signal. + else if (ACE_Reactor::instance ()->register_handler + (SIGINT, this) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); +} + +int +Quit_Handler::handle_input (ACE_HANDLE) +{ + options.stop_timer (); + ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n")); + options.print_results (); + + ACE_Reactor::end_event_loop(); + return 0; +} + +static void * +consumer (void *) +{ + ACE_UPIPE_Stream c_stream; + ACE_UPIPE_Addr c_addr (ACE_TEXT ("/tmp/conupipe")); + + int verb = options.verbose (); + int msiz = options.message_size (); + time_t secs, par1, par2; + time_t currsec; + + if (verb) + cout << "consumer starting connect" << endl; + + ACE_UPIPE_Connector con; + + if (con.connect (c_stream, c_addr) == -1) + ACE_DEBUG ((LM_INFO, " (%t) connect failed\n")); + else + cout << "consumer :we're connected" << endl; + + ACE_Message_Block *mb_p; + + int done = 0; + int cnt = 0; + ACE_OS::time (&currsec); + + par1 = currsec; + + while (done == 0 + && (c_stream.recv (mb_p) != -1)) + if (mb_p->length () > 1) + { + cnt++; + if (verb) + cout << " consumer received message !!!!!! " + << mb_p->rd_ptr () << endl; + } + else + { + if (verb) + cout << "consumer got last mb" + << (char) * (mb_p->rd_ptr ()) << endl; + c_stream.close (); + done = 1; + } + + ACE_OS::time (&currsec); + par2 = currsec; + + secs = par2 - par1; + + if (secs <= 0) + secs=1; + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("consumer got %d messages of size %d ") + ACE_TEXT ("within %: seconds\n"), + cnt, msiz, secs)); + + ACE_OS::sleep (2); + cout << "consumer terminating " << endl; + return 0; +} + +static void * +supplier (void *dummy) +{ + ACE_UPIPE_Stream s_stream; + ACE_UPIPE_Addr serv_addr (ACE_TEXT ("/tmp/supupipe")); + ACE_UPIPE_Connector con; + + int iter = options.iterations (); + int verb = options.verbose (); + int msiz = options.message_size (); + cout << "supplier starting connect" << endl; + + if (con.connect (s_stream, serv_addr) == -1) + ACE_DEBUG ((LM_INFO, " (%t) connect failed\n")); + + cout << "supplier : we're connected" << endl; + int n; + n = 0; + ACE_Message_Block * mb_p; + + while (n < iter) + { + mb_p = new ACE_Message_Block (msiz); + ACE_OS::strcpy (mb_p->rd_ptr (), (char *) dummy); + mb_p->length (msiz); + if (verb) + cout << "supplier sending 1 message_block" << endl; + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send failed" << endl; + return (void *) -1; + } + n++; + } + + mb_p = new ACE_Message_Block (10); + mb_p->length (1); + *mb_p->rd_ptr () = 'g'; + + cout << "supplier sending last message_block" << endl; + + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send last mb failed" << endl; + return (void *) -1; + } + mb_p = new ACE_Message_Block (10); + mb_p->length (0); + + if (verb) + cout << "supplier sending very last message_block" << endl; + + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send very last mb failed" << endl; + return (void *) -1; + } + + ACE_OS::sleep (2); + cout << "supplier terminating" << endl; + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_Service_Config daemon; + + options.parse_args (argc, argv); + options.start_timer (); + + // Primary ACE_Stream for EVENT_SERVER application. + MT_Stream event_server; + + // Enable graceful shutdowns.... + Quit_Handler quit_handler; + + // Create the modules.. + + MT_Module *sr = new MT_Module (ACE_TEXT ("Supplier_Router"), + new Supplier_Router (ACE_Thread_Manager::instance ())); + MT_Module *ea = new MT_Module (ACE_TEXT ("Event_Analyzer"), + new Event_Analyzer, + new Event_Analyzer); + MT_Module *cr = new MT_Module (ACE_TEXT ("Consumer_Router"), + 0, // 0 triggers the creation of a ACE_Thru_Task... + new Consumer_Router (ACE_Thread_Manager::instance ())); + + // Push the modules onto the event_server stream. + + if (event_server.push (sr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Supplier_Router)")), -1); + + if (event_server.push (ea) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Event_Analyzer)")), -1); + + if (event_server.push (cr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (Consumer_Router)")), -1); + + // Set the high and low water marks appropriately. + + int wm = options.low_water_mark (); + + if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (setting low watermark)")), -1); + + wm = options.high_water_mark (); + if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("push (setting high watermark)")), -1); + + // spawn the two threads. + + if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer), (void *) 0, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1); + + else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello", + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1); + + // Perform the main event loop waiting for the user to type ^C or to + // enter a line on the ACE_STDIN. + + ACE_Reactor::instance ()->run_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("main exiting\n"))); + + return 0; +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("test not defined for this platform\n")), + -1); +} +#endif /* ACE_HAS_THREADS */ |