diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-02 09:05:39 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-02 09:05:39 +0000 |
commit | 1c44106287219a05ddbff09df4574b90777040ae (patch) | |
tree | 1d371fe6828480e7cecdcb75a8887a2b4bb83f53 /examples | |
parent | fbcfcdb6ff9975a9e2152be0a5dc7e28a32635fc (diff) | |
download | ATCD-1c44106287219a05ddbff09df4574b90777040ae.tar.gz |
foo
Diffstat (limited to 'examples')
31 files changed, 872 insertions, 719 deletions
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp index 41b5dcb6ea6..67c61bca9b4 100644 --- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp @@ -1,31 +1,10 @@ - // $Id$ #include "Consumer_Router.h" #include "Options.h" -#if defined (ACE_HAS_THREADS) - -typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY; - -int -Consumer_Handler::open (void *a) -{ - CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a; - this->router_task_ = af->router (); - return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a); -} - -Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm) - : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm) -{ -} - -// Create a new handler that will interact with a consumer and point -// its ROUTER_TASK_ data member to the CONSUMER_ROUTER. - -Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm) - : CONSUMER_ROUTER (tm) +Consumer_Router::Consumer_Router (Peer_Router_Context *prc) + : Peer_Router (prc) { } @@ -34,93 +13,122 @@ Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm) int Consumer_Router::open (void *) { - assert (this->is_reader ()); - - char *argv[4]; - - argv[0] = (char *) this->name (); - argv[1] = "-p"; - argv[2] = options.consumer_port (); - argv[3] = 0; - - if (this->init (2, &argv[1]) == -1) - return -1; + if (this->is_writer ()) + { + // Set the Peer_Router_Context to point back to us so that if + // any Consumer's "accidentally" send us data we'll be able to + // handle it. + this->context ()->peer_router (this); + + // Make this an active object to handle the error cases in a + // separate thread. + return this->activate (Options::instance ()->t_flags ()); + } + else // if (this->is_reader ()) - // Make this an active object. - return this->activate (options.t_flags ()); + // Nothing to do since this side is primarily used to transmit to + // Consumers, rather than receive. + return 0; } int Consumer_Router::close (u_long) { - assert (this->is_reader ()); - ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router\n")); - this->peer_map_.close (); + ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router %s\n", + this->is_reader () ? "reader" : "writer")); - // Inform the thread to shut down. - this->msg_queue ()->deactivate (); + if (this->is_writer ()) + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + + // Both writer and reader call release(), so the context knows when + // to clean itself up. + this->context ()->release (); return 0; } -// Handle incoming messages in a separate thread. +// Handle incoming messages in a separate thread. int Consumer_Router::svc (void) { - ACE_Thread_Control tc (this->thr_mgr ()); - ACE_Message_Block *mb = 0; + assert (this->is_writer ()); - assert (this->is_reader ()); + ACE_Thread_Control tc (this->thr_mgr ()); - ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Consumer_Router\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) starting svc in Consumer_Router\n")); - while (this->getq (mb) >= 0) + for (ACE_Message_Block *mb = 0; + this->getq (mb) >= 0; + ) { - ACE_DEBUG ((LM_DEBUG, "Consumer_Router is routing via send_peers\n")); - if (this->send_peers (mb) == -1) + ACE_DEBUG ((LM_DEBUG, + "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n")); + + // Pass this message down to the next Module's writer Task. + if (this->put_next (mb) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) send_peers failed in Consumer_Router\n"), -1); } - ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Consumer_Router\n")); + + ACE_DEBUG ((LM_DEBUG, + "(%t) stopping svc in Consumer_Router\n")); return 0; // Note the implicit ACE_OS::thr_exit() via destructor. } -// Send a MESSAGE_BLOCK to the supplier(s). +// Send a <Message_Block> to the supplier(s). int -Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +Consumer_Router::put (ACE_Message_Block *mb, + ACE_Time_Value *) { - assert (this->is_reader ()); + // Perform the necessary control operations before passing + // the message down the stream. if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) { this->control (mb); return this->put_next (mb); } - else - // Queue up the message, which will be processed by - // Consumer_Router::svc(). + + // If we're the reader side then we're responsible for broadcasting + // messages to Consumers. + + else if (this->is_reader ()) + { + if (this->context ()->send_peers (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) send_peers failed in Consumer_Router\n"), + -1); + else + return 0; + } + else // if (this->is_writer ()) + + // Queue up the message to processed by Consumer_Router::svc(). + // Since we don't expect to be getting many of these messages, we + // queue them up and run them in a separate thread to avoid taxing + // the main thread. return this->putq (mb); } - -// Return information about the Client_Router ACE_Module. +// Return information about the Client_Router ACE_Module. int Consumer_Router::info (char **strp, size_t length) const { - char buf[BUFSIZ]; + char buf[BUFSIZ]; ACE_INET_Addr addr; const char *mod_name = this->name (); - ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor (); - if (sa.get_local_addr (addr) == -1) + if (this->context ()->acceptor ().get_local_addr (addr) == -1) return -1; - ACE_OS::sprintf (buf, "%s\t %d/%s %s", - mod_name, addr.get_port_number (), "tcp", - "# consumer router\n"); + ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n", + mod_name, addr.get_port_number (), "tcp", + "# consumer router", this->is_reader () ? "reader" : "writer"); if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) return -1; @@ -128,5 +136,3 @@ Consumer_Router::info (char **strp, size_t length) const ACE_OS::strncpy (*strp, mod_name, length); return ACE_OS::strlen (mod_name); } - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h index 1fb9f4e40f2..cdaf7090b97 100644 --- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h @@ -1,46 +1,49 @@ /* -*- C++ -*- */ // $Id$ -/* The interface between one or more consumers and an Event Server ACE_Stream */ - #if !defined (_CONSUMER_ROUTER_H) #define _CONSUMER_ROUTER_H -#include "ace/Thread_Manager.h" #include "ace/SOCK_Acceptor.h" #include "ace/UPIPE_Acceptor.h" #include "ace/Svc_Handler.h" #include "Peer_Router.h" -#if defined (ACE_HAS_THREADS) - -class Consumer_Handler; /* Forward declaration... */ - -typedef ACE_HANDLE CONSUMER_KEY; - -typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER; - -class Consumer_Handler : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> -{ -public: - Consumer_Handler (ACE_Thread_Manager *tm = 0); - virtual int open (void *); -}; - -class Consumer_Router : public CONSUMER_ROUTER +class Consumer_Router : public Peer_Router + // = TITLE + // Provides the interface between one or more Consumers and the + // Event Server ACE_Stream. { public: - Consumer_Router (ACE_Thread_Manager *thr_manager); + Consumer_Router (Peer_Router_Context *prc); + // Initialization method. protected: - /* ACE_Task hooks. */ + // = ACE_Task hooks. + + // All of these methods are called via base class pointers by the + // ACE Stream apparatus. Therefore, we can put them in the + // protected section. + virtual int open (void *a = 0); + // Called by the Stream to initialize the router. + virtual int close (u_long flags = 0); + // Called by the Stream to shutdown the router. + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + // Called by the Consumer_Handler to pass a message to the Router. + // The Router queues up this message, which is then processed in the + // <svc> method in a separate thread. + virtual int svc (void); + // Runs in a separate thread to dequeue messages and pass them up + // the stream. + + // = Dynamic linking hooks. - /* Dynamic linking hooks */ virtual int info (char **info_string, size_t length) const; + // Returns information about this service. }; -#endif /* ACE_HAS_THREADS */ + #endif /* _CONSUMER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp index 5a63224eb51..e413c16acce 100644 --- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp @@ -1,8 +1,7 @@ -#include "Event_Analyzer.h" // $Id$ - -#if defined (ACE_HAS_THREADS) +#include "Options.h" +#include "Event_Analyzer.h" int Event_Analyzer::open (void *) @@ -35,6 +34,10 @@ Event_Analyzer::control (ACE_Message_Block *mb) int Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) { + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) passing through Event_Analyser::put() (%s)\n", + this->is_reader () ? "reader" : "writer")); + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) this->control (mb); @@ -64,5 +67,3 @@ Event_Analyzer::info (char **strp, size_t length) const ACE_OS::strncpy (*strp, mod_name, length); return ACE_OS::strlen (mod_name); } - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h index 498e91d476e..3eb012c6add 100644 --- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h @@ -1,8 +1,6 @@ /* -*- C++ -*- */ // $Id$ -/* Signal router */ - #if !defined (_EVENT_ANALYZER_H) #define _EVENT_ANALYZER_H @@ -11,16 +9,19 @@ #include "ace/Task.h" #include "ace/Synch.h" -#if defined (ACE_HAS_THREADS) - class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH> + // = TITLE + // This is a "no-op" class that just forwards all its message + // blocks onto its neighboring Module in the Stream. In a real + // application these tasks would be where the Stream processing + // would go. { public: virtual int open (void *a = 0); virtual int close (u_long flags = 0); virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - /* Dynamic linking hooks */ + // Dynamic linking hooks. virtual int init (int argc, char *argv[]); virtual int fini (void); virtual int info (char **info_string, size_t length) const; @@ -29,5 +30,4 @@ private: virtual int control (ACE_Message_Block *); }; -#endif /* ACE_HAS_THREADS */ #endif /* _EVENT_ANALYZER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Makefile b/examples/ASX/Event_Server/Event_Server/Makefile index c94314b19fd..15c448561a1 100644 --- a/examples/ASX/Event_Server/Event_Server/Makefile +++ b/examples/ASX/Event_Server/Event_Server/Makefile @@ -120,21 +120,21 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Mem_Map.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ Peer_Router.h \ $(WRAPPER_ROOT)/ace/Acceptor.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ Options.h \ $(WRAPPER_ROOT)/ace/Profile_Timer.h \ @@ -174,10 +174,35 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h .obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp Consumer_Router.h \ - $(WRAPPER_ROOT)/ace/Thread_Manager.h \ - $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -189,17 +214,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Synch.h \ - $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ - $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ - $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ - $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ - $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Event_Handler.h \ - $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ - $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ - $(WRAPPER_ROOT)/ace/SOCK_IO.h \ - $(WRAPPER_ROOT)/ace/SOCK.h \ $(WRAPPER_ROOT)/ace/Addr.h \ $(WRAPPER_ROOT)/ace/IPC_SAP.h \ $(WRAPPER_ROOT)/ace/IPC_SAP.i \ @@ -214,6 +228,13 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Message_Block.h \ $(WRAPPER_ROOT)/ace/Malloc.h \ $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Set.h \ @@ -222,8 +243,23 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/SPIPE.h \ $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \ $(WRAPPER_ROOT)/ace/SPIPE.i \ @@ -234,21 +270,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ - $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ - $(WRAPPER_ROOT)/ace/Reactor.h \ - $(WRAPPER_ROOT)/ace/Handle_Set.h \ - $(WRAPPER_ROOT)/ace/Pipe.h \ - $(WRAPPER_ROOT)/ace/Pipe.i \ - $(WRAPPER_ROOT)/ace/Reactor.i \ - $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ Peer_Router.h \ $(WRAPPER_ROOT)/ace/Acceptor.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ Options.h \ @@ -278,20 +301,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -305,6 +320,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Get_Opt.h Options.h \ $(WRAPPER_ROOT)/ace/Profile_Timer.h \ @@ -314,9 +340,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h diff --git a/examples/ASX/Event_Server/Event_Server/Options.cpp b/examples/ASX/Event_Server/Event_Server/Options.cpp index 087d8ed57c5..1255bbecf6a 100644 --- a/examples/ASX/Event_Server/Event_Server/Options.cpp +++ b/examples/ASX/Event_Server/Event_Server/Options.cpp @@ -6,20 +6,30 @@ #include "Options.h" -#if defined (ACE_HAS_THREADS) +/* static */ +Options *Options::instance_ = 0; + +Options * +Options::instance (void) +{ + if (Options::instance_ == 0) + Options::instance_ = new Options; + + return Options::instance_; +} Options::Options (void) - : thr_count_ (4), - t_flags_ (THR_DETACHED), - high_water_mark_ (8 * 1024), - low_water_mark_ (1024), - message_size_ (128), - initial_queue_length_ (0), - iterations_ (100000), - debugging_ (0), - verbosity_ (0), - consumer_port_ ("-p " ACE_ITOA (10000)), - supplier_port_ ("-p " ACE_ITOA (10001)) + : thr_count_ (4), + t_flags_ (THR_DETACHED), + high_water_mark_ (8 * 1024), + low_water_mark_ (1024), + message_size_ (128), + initial_queue_length_ (0), + iterations_ (100000), + debugging_ (0), + verbosity_ (0), + consumer_port_ (ACE_DEFAULT_SERVER_PORT), + supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1) { } @@ -36,7 +46,7 @@ void Options::print_results (void) this->itimer_.elapsed_time (et); this->itimer_.get_rusage (rusage); - if (options.verbose ()) + if (this->verbose ()) { #if defined (ACE_HAS_PRUSAGE_T) ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ()); @@ -86,9 +96,6 @@ void Options::print_results (void) #endif /* ACE_WIN32 */ } -/* Manages the options */ -Options options; - void Options::parse_args (int argc, char *argv[]) { @@ -104,7 +111,7 @@ Options::parse_args (int argc, char *argv[]) this->t_flags (THR_BOUND); break; case 'c': - this->consumer_port (get_opt.optarg); + this->consumer_port (ACE_OS::atoi (get_opt.optarg)); break; case 'd': this->debugging_ = 1; @@ -128,7 +135,7 @@ Options::parse_args (int argc, char *argv[]) this->t_flags (THR_NEW_LWP); break; case 's': - this->supplier_port (get_opt.optarg); + this->supplier_port (ACE_OS::atoi (get_opt.optarg)); break; case 'T': if (ACE_OS::strcasecmp (get_opt.optarg, "ON") == 0) @@ -182,5 +189,3 @@ Options::parse_args (int argc, char *argv[]) (this->t_flags () & THR_BOUND) != 0, (this->t_flags () & THR_NEW_LWP) != 0); } - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h index 5a7a541b835..596b2b48cc3 100644 --- a/examples/ASX/Event_Server/Event_Server/Options.h +++ b/examples/ASX/Event_Server/Event_Server/Options.h @@ -14,61 +14,88 @@ class Options { public: - Options (void); - ~Options (void); + static Options *instance (void); + void parse_args (int argc, char *argv[]); - void stop_timer (void); - void start_timer (void); + void stop_timer (void); + void start_timer (void); - void thr_count (size_t count); + void thr_count (size_t count); size_t thr_count (void); - void initial_queue_length (size_t length); + void initial_queue_length (size_t length); size_t initial_queue_length (void); - void high_water_mark (size_t size); + void high_water_mark (size_t size); size_t high_water_mark (void); - void low_water_mark (size_t size); + void low_water_mark (size_t size); size_t low_water_mark (void); - void message_size (size_t size); + void message_size (size_t size); size_t message_size (void); - void iterations (size_t n); + void iterations (size_t n); size_t iterations (void); - void t_flags (long flag); - long t_flags (void); + void t_flags (long flag); + long t_flags (void); - void supplier_port (char *port); - char *supplier_port (void); + void supplier_port (u_short port); + u_short supplier_port (void); - void consumer_port (char *port); - char *consumer_port (void); + void consumer_port (u_short port); + u_short consumer_port (void); - int debug (void); - int verbose (void); + int debug (void); + int verbose (void); - void print_results (void); + void print_results (void); private: - ACE_Profile_Timer itimer_; /* Time the process */ - size_t thr_count_; /* Number of threads to spawn */ - long t_flags_; /* Flags to thr_create() */ - size_t high_water_mark_; /* ACE_Task high water mark */ - size_t low_water_mark_; /* ACE_Task low water mark */ - size_t message_size_; /* Size of a message */ - size_t initial_queue_length_; /* Initial number of items in the queue */ - size_t iterations_; /* Number of iterations to run the test program */ - int debugging_; /* Extra debugging info */ - int verbosity_; /* Extra verbose messages */ - char *consumer_port_; /* Port that the Consumer_Router is using */ - char *supplier_port_; /* Port that the Supplier_Router is using */ -}; + Options (void); + ~Options (void); + + ACE_Profile_Timer itimer_; + // Time the process. + + size_t thr_count_; + // Number of threads to spawn. + + long t_flags_; + // Flags to thr_create(). + + size_t high_water_mark_; + // ACE_Task high water mark. -extern Options options; + size_t low_water_mark_; + // ACE_Task low water mark. + + size_t message_size_; + // Size of a message. + + size_t initial_queue_length_; + // Initial number of items in the queue. + + size_t iterations_; + // Number of iterations to run the test program. + + int debugging_; + // Extra debugging info. + + int verbosity_; + // Extra verbose messages. + + u_short consumer_port_; + // Port that the Consumer_Router is using. + + u_short supplier_port_; + // Port that the Supplier_Router is using. + + static Options *instance_; + // Static Singleton. +}; #include "Options.i" #endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Options.i b/examples/ASX/Event_Server/Event_Server/Options.i index 518a309f6cb..b3ce4e2e807 100644 --- a/examples/ASX/Event_Server/Event_Server/Options.i +++ b/examples/ASX/Event_Server/Event_Server/Options.i @@ -4,24 +4,24 @@ /* Option manager for ustreams */ inline void -Options::supplier_port (char *port) +Options::supplier_port (u_short port) { this->supplier_port_ = port; } -inline char * +inline u_short Options::supplier_port (void) { return this->supplier_port_; } inline void -Options::consumer_port (char *port) +Options::consumer_port (u_short port) { this->consumer_port_ = port; } -inline char * +inline u_short Options::consumer_port (void) { return this->consumer_port_; diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp index 38e2977140d..1cbaa77b83b 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -1,70 +1,148 @@ -#if !defined (_PEER_ROUTER_C) // $Id$ +#if !defined (_PEER_ROUTER_C) #define _PEER_ROUTER_C #include "ace/Service_Config.h" #include "ace/Get_Opt.h" - #include "Options.h" #include "Peer_Router.h" -#if defined (ACE_HAS_THREADS) +int +Peer_Router_Context::send_peers (ACE_Message_Block *mb) +{ + PEER_ITERATOR map_iter = this->peer_map_; + int bytes = 0; + int iterations = 0; + + // Skip past the header and get the message to send. + ACE_Message_Block *data_block = mb->cont (); + + // "Multicast" the data to *all* the registered peers. -// Define some short-hand macros to deal with verbose templates -// names... + for (PEER_ENTRY *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, + "(%t) sending to peer via handle %d\n", + ss->ext_id_)); + iterations++; + // Increment reference count before sending since the + // Peer_Handler might be running in its own thread of control. + bytes += ss->int_id_->put (data_block->duplicate ()); + } -#define PH PEER_HANDLER -#define PA PEER_ACCEPTOR -#define PAD PEER_ADDR -#define PK PEER_KEY -#define PM PEER_MAP + mb->release (); + return bytes == 0 ? 0 : bytes / iterations; +} -template <class PH, class PK> int -Acceptor_Factory<PH, PK>::init (int argc, char *argv[]) +int +Peer_Router_Context::unbind_peer (ROUTING_KEY key) { - ACE_Get_Opt get_opt (argc, argv, "dp:", 0); - ACE_INET_Addr addr; - - for (int c; (c = get_opt ()) != -1; ) - switch (c) - { - case 'p': - addr.set (ACE_OS::atoi (get_opt.optarg)); - break; - case 'd': - break; - default: - break; - } - - if (this->open (addr, ACE_Service_Config::reactor ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - return 0; + return this->peer_map_.unbind (key); } -template <class PH, class PK> -Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr) - : pr_ (pr) +void +Peer_Router_Context::release (void) { + this->reference_count_--; + if (this->reference_count_ == 0) + delete this; } -template <class PH, class PK> Peer_Router<PH, PK> * -Acceptor_Factory<PH, PK>::router (void) -{ - return this->pr_; +int +Peer_Router_Context::bind_peer (ROUTING_KEY key, + Peer_Handler *peer_handler) +{ + return this->peer_map_.bind (key, peer_handler); } -template <class ROUTER, class KEY> -Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm) - : inherited (tm) +Peer_Router_Context::Peer_Router_Context (u_short port) + : reference_count_ (2) // 1 Consumer + 1 Supplier { + // Perform initializations. + + if (this->open (ACE_INET_Addr (port)) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "Acceptor::open")); + + else if (this->peer_map_.open () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "Map_Manager::open")); + + else + { + ACE_INET_Addr addr; + + if (this->acceptor().get_local_addr (addr) != -1) + ACE_DEBUG ((LM_DEBUG, + "(%t) initializing on port = %d, handle = %d, this = %u\n", + addr.get_port_number (), + this->acceptor().get_handle (), + this)); + else + ACE_ERROR ((LM_ERROR, + "%p\n", "get_local_addr")); + } +} + +Peer_Router_Context::~Peer_Router_Context (void) +{ + // Free up the handle and close down the listening socket. + ACE_DEBUG ((LM_DEBUG, + "(%t) closing down Peer_Router_Context")); + + // Close down the Acceptor and take ourselves out of the Reactor. + this->handle_close (); + + PEER_ITERATOR map_iter = this->peer_map_; + + // Make sure to take all the handles out of the map. + + for (PEER_ENTRY *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (Options::instance ()->debug ()) + ACE_DEBUG ((LM_DEBUG, + "(%t) closing down peer on handle %d\n", + ss->ext_id_)); + + if (ACE_Service_Config::reactor ()->remove_handler + (ss->ext_id_, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) p\n", "remove_handle")); + } + + // Close down the map. + this->peer_map_.close (); } -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::svc (void) +Peer_Router * +Peer_Router_Context::peer_router (void) { + return this->peer_router_; +} + +void +Peer_Router_Context::peer_router (Peer_Router *pr) +{ + this->peer_router_ = pr; +} + +Peer_Handler * +Peer_Router_Context::make_svc_handler (void) +{ + return new Peer_Handler (this); +} + +Peer_Handler::Peer_Handler (Peer_Router_Context *prc) + : prc_ (prc) +{ +} + #if 0 +Peer_Handler::svc (void) +{ ACE_Thread_Control thread_control (tm); ACE_Message_Block *db, *hb; @@ -74,13 +152,13 @@ Peer_Handler<ROUTER, KEY>::svc (void) for (;;) { db = new Message_Block (BUFSIZ); - hb = new Message_Block (sizeof (KEY), Message_Block::MB_PROTO, db); + hb = new Message_Block (sizeof (ROUTING_KEY), Message_Block::MB_PROTO, db); if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1) LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1); else if (n == 0) // Client has closed down the connection. { - if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + if (this->prc_->peer_router ()->unbind_peer (this->get_handle ()) == -1) LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1); LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n")); return -1; // We do not need to be deregistered by reactor @@ -94,130 +172,133 @@ Peer_Handler<ROUTER, KEY>::svc (void) *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. hb->wr_ptr (sizeof (long)); - if (this->router_task_->reply (hb) == -1) + if (this->prc_->peer_router ()->reply (hb) == -1) { - cout << "Peer_Handler.svc : router_task->reply failed" << endl ; + cout << "Peer_Handler.svc : peer_router->reply failed" << endl ; return -1; } } } return 0; -#else - return -1; -#endif } +#endif -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *) +int +Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) { - return this->peer ().send_n (mb->rd_ptr (), mb->length ()); + return this->peer ().send_n (mb->rd_ptr (), + mb->length ()); } -// Create a new handler and point its ROUTER_TASK_ data member to the -// corresponding router. Note that this router is extracted out of -// the Acceptor_Factory * that is passed in via the -// ACE_Acceptor::handle_input() method. +// Initialize a newly connected handler. -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::open (void *a) +int +Peer_Handler::open (void *) { char buf[BUFSIZ], *p = buf; - if (this->router_task_->info (&p, sizeof buf) != -1) - ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", - buf, this->get_handle (), a)); + if (this->prc_->peer_router ()->info (&p, sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, handle = %d\n", + buf, this->get_handle ())); else ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); #if 0 - if (this->activate (options.t_flags ()) == -1) + if (this->activate (Options::instance ()->t_flags ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); #endif ACE_DEBUG ((LM_DEBUG, - "Peer_Handler::open registering with Reactor for handle_input\n")); + "(%t) Peer_Handler::open registering with Reactor for handle_input\n")); + // Register with the Reactor to receive messages from our Peer. if (ACE_Service_Config::reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); - else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) + + // Insert outselves into the routing map. + else if (this->prc_->bind_peer (this->get_handle (), this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); + + // Turn off non-blocking I/O in case it was turned on. else if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable non-blocking I/O"), -1); - return 0; + else + return 0; } -// Receive a message from a supplier. +// Receive a message from a Peer. -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) +int +Peer_Handler::handle_input (ACE_HANDLE h) { - ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); + ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on handle %d\n", h)); ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); - ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), + ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY), ACE_Message_Block::MB_PROTO, db); - int n; - if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + // Check for memory failures. + if (db == 0 || hb == 0) + { + delete hb; + delete db; + errno = ENOMEM; + return -1; + } + + ssize_t n = this->peer ().recv (db->rd_ptr (), db->size ()); + + if (n == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); else if (n == 0) // Client has closed down the connection. { - if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + if (this->prc_->unbind_peer (this->get_handle ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down handle %d\n", h)); return -1; // Instruct the ACE_Reactor to deregister us by returning -1. } - else // Transform incoming buffer into a Message and pass downstream. + else { + // Transform incoming buffer into a Message. + + // First, increment the write pointer to the end of the newly + // read data block. db->wr_ptr (n); - *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment. - hb->wr_ptr (sizeof (ACE_HANDLE)); - return this->router_task_->reply (hb) == -1 ? -1 : 0; - } -} -template <class PH, class PK> -Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm) - : ACE_Task<ACE_MT_SYNCH> (tm) -{ -} + // Second, copy the "address" into the header block. + *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); -template <class PH, class PK> int -Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) -{ - PEER_ITERATOR map_iter = this->peer_map_; - int bytes = 0; - int iterations = 0; - ACE_Message_Block *data_block = mb->cont (); - - for (ACE_Map_Entry<PK, PH *> *ss = 0; - map_iter.next (ss) != 0; - map_iter.advance ()) - { - if (options.debug ()) - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_)); + // Third, update the write pointer in the header block. + hb->wr_ptr (sizeof (ACE_HANDLE)); - iterations++; - bytes += ss->int_id_->put (data_block); + // Finally, pass the message through the stream. Note that we + // use <Task::put> here because this gives the method at *our* + // level in the stream a chance to do something with the message + // before it is sent up the other side. For instance, if we + // receive messages in the Supplier_Router, it will just call + // <put_next> and send them up the stream to the Consumer_Router + // (which broadcasts them to consumers). However, if we receive + // messages in the Consumer_Router, it will reply to the + // Consumer with an error since it's not correct for Consumers + // to send messages. + return this->prc_->peer_router ()->put (hb) == -1 ? -1 : 0; } - - delete mb; - return bytes == 0 ? 0 : bytes / iterations; } -template <class PH, class PK> -Peer_Router<PH, PK>::~Peer_Router (void) +Peer_Router::Peer_Router (Peer_Router_Context *prc) + : prc_ (prc) { + } -template <class PH, class PK> int -Peer_Router<PH, PK>::fini (void) +Peer_Router_Context * +Peer_Router::context (void) const { - delete this->acceptor_; - return 0; + return this->prc_; } -template <class PH, class PK> int -Peer_Router<PH, PK>::control (ACE_Message_Block *mb) +int +Peer_Router::control (ACE_Message_Block *mb) { ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; @@ -234,46 +315,4 @@ Peer_Router<PH, PK>::control (ACE_Message_Block *mb) return 0; } -template <class PH, class PK> int -Peer_Router<PH, PK>::unbind_peer (PK key) -{ - return this->peer_map_.unbind (key); -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph) -{ - PH *peer_handler = (PH *) ph; - return this->peer_map_.bind (key, peer_handler); -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::init (int argc, char *argv[]) -{ - this->acceptor_ = new ACCEPTOR (this); - - if (this->acceptor_->init (argc, argv) == -1 - || this->peer_map_.open () == -1) - return -1; - else - { - ACE_INET_Addr addr; - ACE_SOCK_Acceptor &acceptor = this->acceptor_->acceptor(); - - if (acceptor.get_local_addr (addr) != -1) - ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, port = %d, fd = %d, this = %u\n", - this->name (), addr.get_port_number (), - acceptor.get_handle (), this)); - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1); - } - return 0; -} - -#undef PH -#undef PA -#undef PAD -#undef PK -#undef PM -#endif /* ACE_HAS_THREADS */ #endif /* _PEER_ROUTER_C */ diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h index 299c534f72c..32d7e85906d 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h @@ -1,121 +1,133 @@ /* -*- C++ -*- */ // $Id$ - #if !defined (_PEER_ROUTER_H) #define _PEER_ROUTER_H #include "ace/Acceptor.h" #include "ace/SOCK_Acceptor.h" -#include "ace/Thread_Manager.h" #include "ace/Map_Manager.h" -#if defined (ACE_HAS_THREADS) +// Type of search key for CONSUMER_MAP +typedef ACE_HANDLE ROUTING_KEY; -// Forward declaration. -template <class PEER_HANDLER, class KEY> +// Forward declarations. class Peer_Router; +class Peer_Router_Context; -template <class PEER_HANDLER, class KEY> -class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_SOCK_ACCEPTOR> - // = TITLE - // Creates <PEER_HANDLERs>, which route events between peers. -{ -public: - Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr); - Peer_Router<PEER_HANDLER, KEY> *router (void); - - int init (int argc, char *argv[]); - // Initialize the acceptor when it's linked dynamically. - -private: - Peer_Router<PEER_HANDLER, KEY> *pr_; -}; - -template <class ROUTER, class KEY> class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> // = TITLE - // Receive input from a Peer. + // Receive input from a Peer and forward to the appropriate + // <Peer_Router>. { public: - Peer_Handler (ACE_Thread_Manager * = 0); + Peer_Handler (Peer_Router_Context * = 0); + // Initialization method. virtual int open (void * = 0); - // Called by the ACE_Acceptor::handle_input() to activate this object. + // Called by the ACE_Acceptor::handle_input() to activate this + // object. virtual int handle_input (ACE_HANDLE); // Receive input from the peer. virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Send output to a peer. + // Send output to a peer. protected: - typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited; - - ROUTER *router_task_; - // Pointer to write task. - -private: - virtual int svc (void); - // Don't need this method here... + Peer_Router_Context *prc_; + // Pointer to router context. }; -template <class PEER_HANDLER, class PEER_KEY> -class Peer_Router : public ACE_Task<ACE_MT_SYNCH> +class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> // = TITLE - // This abstract base class provides mechanisms for routing - // messages to/from a ACE_Stream from/to one or more peers (which - // are typically running on remote hosts). + // Defines state and behavior shared between both Tasks in a + // Peer_Router Module. // // = DESCRIPTION - // A subclass of Peer_Router overrides the open(), close(), and - // put() methods in order to specialize the behavior of the router - // to meet application-specific requirements. + // This class also serves as an Acceptor, which creates + // Peer_Handlers when Peers connect. { public: // = Initialization and termination methods. - Peer_Router (ACE_Thread_Manager * = 0); - ~Peer_Router (void); - - typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER; + Peer_Router_Context (u_short port); - virtual int unbind_peer (PEER_KEY); - // Remove a PEER_HANDLER from the PEER_MAP. + virtual int unbind_peer (ROUTING_KEY); + // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds + // to the <ROUTING_KEY>. - virtual int bind_peer (PEER_KEY, HANDLER *); - // Add a PEER_HANDLER to the PEER_MAP + virtual int bind_peer (ROUTING_KEY, Peer_Handler *); + // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the + // <ROUTING_KEY>. int send_peers (ACE_Message_Block *mb); - // Send the message block to the peer(s). + // Send the <ACE_Message_Block> to the peer(s). -protected: - virtual int control (ACE_Message_Block *); - // Handle control messages arriving from adjacent Modules. + Peer_Handler *make_svc_handler (void); + // Create a new <Peer_Handler> for each connection. + + // = Set/Get Router Task. + Peer_Router *peer_router (); + void peer_router (Peer_Router *); + + void release (void); + // Decrement the reference count and delete <this> when count == 0; + +private: + Peer_Router *peer_router_; + // Pointer to the <Peer_Router> that we are accepting for. // = Useful typedefs - typedef ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_MAP; - typedef ACE_Map_Iterator<PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_ITERATOR; + typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_MAP; + typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_ITERATOR; + typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> PEER_ENTRY; PEER_MAP peer_map_; // Map used to keep track of active peers. - // = Dynamic linking initialization hooks inherited from ACE_Task - virtual int init (int argc, char *argv[]); - virtual int fini (void); + int reference_count_; + // Keep track of when we can delete ourselves. - typedef Acceptor_Factory<PEER_HANDLER, PEER_KEY> ACCEPTOR; + ~Peer_Router_Context (void); + // Private to ensure dynamic allocation. +}; - ACCEPTOR *acceptor_; - // Factory for accepting new PEER_HANDLERs. +class Peer_Router : public ACE_Task<ACE_MT_SYNCH> + // = TITLE + // This abstract base class provides mechanisms for routing + // messages to/from a ACE_Stream from/to one or more peers (which + // are typically running on remote hosts). + // + // = DESCRIPTION + // A subclass of Peer_Router overrides the open(), close(), and + // put() methods in order to specialize the behavior of the router + // to meet application-specific requirements. +{ +protected: + Peer_Router (Peer_Router_Context *prc); + // Initialization method. + + virtual int control (ACE_Message_Block *); + // Handle control messages arriving from adjacent Modules. + + Peer_Router_Context *context (void) const; + // Returns the routing context. + + typedef ACE_Task<ACE_MT_SYNCH> inherited; + // Helpful typedef. private: + Peer_Router_Context *prc_; + // Reference to the context shared by the writer and reader Tasks in + // the Consumer and Supplier Modules. + // = Prevent copies and pass-by-value. - Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {} - void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {} + Peer_Router (const Peer_Router &) {} + void operator= (const Peer_Router &) {} }; #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) #include "Peer_Router.cpp" #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ -#endif /* ACE_HAS_THREADS */ + #endif /* _PEER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp index f9450fd99e1..a9f4ebb8521 100644 --- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp @@ -1,33 +1,9 @@ -#include "Supplier_Router.h" // $Id$ +#include "Supplier_Router.h" #include "Options.h" -#if defined (ACE_HAS_THREADS) - -typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY; - -int -Supplier_Handler::open (void *a) -{ - SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a; - this->router_task_ = af->router (); - return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a); -} - -Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm) - : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm) -{ -} - -// Create a new router and associate it with the REACTOR parameter. - -Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm) - : SUPPLIER_ROUTER (tm) -{ -} - -// Handle incoming messages in a separate thread. +// Handle outgoing messages in a separate thread. int Supplier_Router::svc (void) @@ -35,14 +11,19 @@ Supplier_Router::svc (void) assert (this->is_writer ()); ACE_Thread_Control tc (this->thr_mgr ()); - ACE_Message_Block *mb = 0; ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n")); - while (this->getq (mb) >= 0) + for (ACE_Message_Block *mb = 0; + this->getq (mb) >= 0; + ) { - ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n")); - if (this->send_peers (mb) == -1) + ACE_DEBUG ((LM_DEBUG, + "(%t) warning: Supplier_Router is forwarding a message via send_peers\n")); + + // Broadcast the message to the Suppliers. + + if (this->context ()->send_peers (mb) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) send_peers failed in Supplier_Router\n"), -1); @@ -54,25 +35,26 @@ Supplier_Router::svc (void) // destructor. } -// Initialize the Router. +Supplier_Router::Supplier_Router (Peer_Router_Context *prc) + : Peer_Router (prc) +{ +} + +// Initialize the Supplier Router. int Supplier_Router::open (void *) { - assert (this->is_writer ()); - - char *argv[4]; - - argv[0] = (char *) this->name (); - argv[1] = "-p"; - argv[2] = options.supplier_port (); - argv[3] = 0; - - if (this->init (2, &argv[1]) == -1) - return -1; - - // Make this an active object. - return this->activate (options.t_flags ()); + if (this->is_reader ()) + // Set the Peer_Router_Context to point back to us so that all the + // Peer_Handler's <put> their incoming <Message_Blocks> to our + // reader Task. + this->context ()->peer_router (this); + + else // if (this->is_writer () + // Make this an active object to handle the error cases in a + // separate thread. + return this->activate (Options::instance ()->t_flags ()); } // Close down the router. @@ -80,31 +62,53 @@ Supplier_Router::open (void *) int Supplier_Router::close (u_long) { - assert (this->is_writer ()); - ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n")); - this->peer_map_.close (); + ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n", + this->is_reader () ? "reader" : "writer")); - // Inform the thread to shut down. - this->msg_queue ()->deactivate (); + if (this->is_writer ()) + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + + // Both writer and reader call release(), so the context knows when + // to clean itself up. + this->context ()->release (); return 0; } -// Send a MESSAGE_BLOCK to the supplier(s). +// Send an <ACE_Message_Block> to the supplier(s). int -Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +Supplier_Router::put (ACE_Message_Block *mb, + ACE_Time_Value *) { - assert (this->is_writer ()); + // Perform the necessary control operations before passing + // the message up the stream. if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) { this->control (mb); return this->put_next (mb); } - else - // Queue up the message, which will be processed by - // Supplier_Router::svc(). - return this->putq (mb); + + // If we're the reader then we are responsible for pass messages up + // to the next Module's writer Task. + + else if (this->is_reader ()) + return this->put_next (mb); + else // if (this->is_writer ()) + { + // Someone is trying to write to the Supplier. In this + // implementation this is considered an error. However, we'll + // just go ahead and forward the message to the Supplier (who + // hopefully is prepared to receive it). + ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n")); + + // Queue up the message to processed by Supplier_Router::svc(). + // Since we don't expect to be getting many of these messages, + // we queue them up and run them in a separate thread to avoid + // taxing the main thread. + return this->putq (mb); + } } // Return information about the Supplier_Router ACE_Module. @@ -112,17 +116,16 @@ Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) int Supplier_Router::info (char **strp, size_t length) const { - char buf[BUFSIZ]; - ACE_INET_Addr addr; + char buf[BUFSIZ]; + ACE_INET_Addr addr; const char *mod_name = this->name (); - ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor (); - if (sa.get_local_addr (addr) == -1) + if (this->context ()->acceptor ().get_local_addr (addr) == -1) return -1; - ACE_OS::sprintf (buf, "%s\t %d/%s %s", - mod_name, addr.get_port_number (), "tcp", - "# supplier router\n"); + ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n", + mod_name, addr.get_port_number (), "tcp", + "# supplier router", this->is_reader () ? "reader" : "writer"); if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) return -1; @@ -130,5 +133,3 @@ Supplier_Router::info (char **strp, size_t length) const ACE_OS::strncpy (*strp, mod_name, length); return ACE_OS::strlen (mod_name); } - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h index 23da0812781..42d067a6454 100644 --- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h @@ -1,8 +1,6 @@ /* -*- C++ -*- */ // $Id$ -/* The interface between a supplier and an Event Service ACE_Stream */ - #if !defined (_SUPPLIER_ROUTER_H) #define _SUPPLIER_ROUTER_H @@ -12,40 +10,49 @@ #include "ace/Svc_Handler.h" #include "Peer_Router.h" -#if defined (ACE_HAS_THREADS) - -/* Forward declaration */ -class Supplier_Handler; - -/* Type of search key for SUPPLIER_MAP */ -typedef ACE_HANDLE SUPPLIER_KEY; - -/* Instantiated type for routing messages to suppliers */ - -typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER; - -class Supplier_Handler : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> +class Supplier_Router : public Peer_Router + // = TITLE + // Provides the interface between one or more Suppliers and the + // Event Server ACE_Stream. + // + // = DESCRIPTION + // When used on the "reader" side of a Stream, this Router Task + // simply forwards all messages up the stream. When used on the + // "writer" side, this Router Task queues up outgoing messages + // to suppliers and sends them in a separate thread. The reason + // for this is that it's really an "error" for a + // <Supplier_Router> to send messages to Suppliers, so we don't + // expect this to happen very much. when it does we use a + // separate thread to avoid taxing the main thread. { public: - Supplier_Handler (ACE_Thread_Manager *tm = 0); - virtual int open (void *); -}; - -class Supplier_Router : public SUPPLIER_ROUTER -{ -public: - Supplier_Router (ACE_Thread_Manager *); + Supplier_Router (Peer_Router_Context *prc); + // Initialization method. protected: - /* ACE_Task hooks. */ + // = ACE_Task hooks. + + // All of these methods are called via base class pointers by the + // ACE Stream apparatus. Therefore, we can put them in the + // protected section. + virtual int open (void *a = 0); + // Called by the Stream to initialize the router. + virtual int close (u_long flags = 0); + // Called by the Stream to shutdown the router. + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + // Called by the <SUPPLIER_HANDLER> to pass a message to the Router. + // The Router queues up this message, which is then processed in the + // <svc> method in a separate thread. + virtual int svc (void); + // Runs in a separate thread to dequeue messages and pass them up + // the stream. - /* Dynamic linking hooks inherited from Peer_Router */ virtual int info (char **info_string, size_t length) const; + // Dynamic linking hook. }; -#endif /* ACE_HAS_THREADS */ #endif /* _SUPPLIER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp index 1e111a69fb9..d40f82b987c 100644 --- a/examples/ASX/Event_Server/Event_Server/event_server.cpp +++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp @@ -2,7 +2,6 @@ // Test the event server. - #include "ace/Stream.h" #include "ace/Service_Config.h" #include "Options.h" @@ -10,14 +9,12 @@ #include "Event_Analyzer.h" #include "Supplier_Router.h" -#if defined (ACE_HAS_THREADS) - typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; typedef ACE_Module<ACE_MT_SYNCH> MT_Module; -// Handle SIGINT and terminate the entire application. - class Quit_Handler : public ACE_Sig_Adapter + // = TITLE + // Handle SIGINT and terminate the entire application. { public: Quit_Handler (void); @@ -41,9 +38,9 @@ Quit_Handler::Quit_Handler (void) int Quit_Handler::handle_input (ACE_HANDLE) { - options.stop_timer (); + Options::instance ()->stop_timer (); ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n")); - options.print_results (); + Options::instance ()->print_results (); ACE_Service_Config::end_reactor_event_loop (); return 0; @@ -54,8 +51,7 @@ main (int argc, char *argv[]) { ACE_Service_Config daemon; - options.parse_args (argc, argv); - + Options::instance ()->parse_args (argc, argv); { // Primary ACE_Stream for EVENT_SERVER application. MT_Stream event_server; @@ -63,49 +59,72 @@ main (int argc, char *argv[]) // Enable graceful shutdowns... Quit_Handler quit_handler; - // Create the Supplier Router module. + Peer_Router_Context *src; + // Create the Supplier_Router's routing context, which contains + // context shared by both the write-side and read-side of the + // Supplier_Router Module. + ACE_NEW_RETURN (src, + Peer_Router_Context (Options::instance ()->supplier_port ()), + -1); - MT_Module *sr = new MT_Module ("Supplier_Router", - new Supplier_Router (ACE_Service_Config::thr_mgr ())); + MT_Module *srm = 0; + // Create the Supplier Router module. + ACE_NEW_RETURN (srm, MT_Module + ("Supplier_Router", + new Supplier_Router (src), + new Supplier_Router (src)), + -1); + MT_Module *eam = 0; // Create the Event Analyzer module. - - MT_Module *ea = new MT_Module ("Event_Analyzer", - new Event_Analyzer, - new Event_Analyzer); - + ACE_NEW_RETURN (eam, MT_Module + ("Event_Analyzer", + new Event_Analyzer, + new Event_Analyzer), + -1); + + Peer_Router_Context *crc; + // Create the Consumer_Router's routing context, which contains + // context shared by both the write-side and read-side of the + // Consumer_Router Module. + ACE_NEW_RETURN (crc, + Peer_Router_Context (Options::instance ()->consumer_port ()), + -1); + + MT_Module *crm = 0; // Create the Consumer Router module. - - MT_Module *cr = new MT_Module ("Consumer_Router", - 0, // 0 triggers the creation of a ACE_Thru_Task... - new Consumer_Router (ACE_Service_Config::thr_mgr ())); + ACE_NEW_RETURN (crm, MT_Module + ("Consumer_Router", + new Consumer_Router (crc), + new Consumer_Router (crc)), + -1); // Push the Modules onto the event_server stream. - if (event_server.push (sr) == -1) + if (event_server.push (srm) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1); - if (event_server.push (ea) == -1) + if (event_server.push (eam) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1); - if (event_server.push (cr) == -1) + if (event_server.push (crm) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1); // Set the high and low water marks appropriately. - int wm = options.low_water_mark (); + int wm = Options::instance ()->low_water_mark (); if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1) ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1); - wm = options.high_water_mark (); + wm = Options::instance ()->high_water_mark (); if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1) ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1); - options.start_timer (); + Options::instance ()->start_timer (); - // Perform the main event loop waiting for the user to type ^C or to - // enter a line on the ACE_STDIN. + // Perform the main event loop waiting for the user to type ^C or + // to enter a line on the ACE_STDIN. daemon.run_reactor_event_loop (); // The destructor of event_server will close down the stream and @@ -117,10 +136,3 @@ main (int argc, char *argv[]) ACE_DEBUG ((LM_DEBUG, "exiting main\n")); return 0; } -#else -int -main (void) -{ - ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1); -} -#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/README b/examples/ASX/Event_Server/README index f97e767cdd8..8e1342fd7bc 100644 --- a/examples/ASX/Event_Server/README +++ b/examples/ASX/Event_Server/README @@ -1,13 +1,18 @@ The subdirectory illustrates a number of the ACE ASX framework -features using an ACE_Stream application called the Event Server. The -Event Server works as follows: +features using an ACE_Stream application called the Event Server. For +more information on the design and use of the ACE ASX framework please +see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and +http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz + +The Event Server example works as follows: 1. When the ./Event_Server/event_server executable is run it creates two SOCK_Acceptors, which listen for and accept - incoming connections from consumers and suppliers. + incoming connections from Consumers and Suppliers. -2. The ./Event_Server/Transceiver/transceiver application may be - started multiple times. Each call should be either: +2. The ./Event_Server/Transceiver/transceiver application plays + the role of Consumer and Supplier. It can be started multiple + times. Each call should be either: % transceiver -p XYZ -h hostname @@ -15,24 +20,25 @@ Event Server works as follows: % transceiver -p ABC -h hostname - where XYZ and ABC are the consumer port and supplier port, - respectively, on the event server and "hostname" is the name of the - machine the event_server is running. I typically open up multiple - windows. + where XYZ and ABC are the Consumer listening port and the Supplier + listening port, respectively, on the event server and "hostname" is + the name of the machine the event_server is running. I typically + run the Consumer(s) and Supplier(s) in different windows. -3. Once the consumer(s) and supplier(s) are connected, you can type - data from any supplier windows. This data will be routed +3. Once the Consumer(s) and Supplier(s) are connected, you can type + data from any Supplier window. This data will be routed through the Modules/Tasks in an event_server's Stream and - be forwarded to the consumer(s). + be forwarded to the Consumer(s). Note that you can also + send messages from the Consumer(s) to Supplier(s), but the + Event Server will warn you about this since it's not really + kosher... 4. When you want to shut down the tranceivers or event server - just type ^C (which generates a SIGINT). + just type ^C (which generates a SIGINT) or type any input + in the window running the Event Server. -What makes this example particularly interesting is that -once you've got the hang of this basic architecture, you can -"push" new filtering Modules onto the event_server Stream - and modify the application's behavior. +What makes this example particularly interesting is that once you've +got the hang of this basic architecture, you can "push" new filtering +Modules onto the event_server Stream and modify the application's +behavior transparently. -For more information on the design and use of the ACE ASX framework -please see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and -http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp index 37335b8209f..673ec0a0e93 100644 --- a/examples/ASX/Event_Server/Transceiver/transceiver.cpp +++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp @@ -1,16 +1,62 @@ -// Test program for the event transceiver. This program can play the // $Id$ +// Test program for the event transceiver. This program can play the // role of either Consumer or Supplier. You can terminate this // program by typing ^C.... - #include "ace/Service_Config.h" #include "ace/Connector.h" #include "ace/SOCK_Connector.h" #include "ace/Get_Opt.h" -#if defined (ACE_HAS_THREADS) +// Port number of event server. +static u_short port_number; + +// Name of event server. +static char *host_name; + +// Are we playing the Consumer ('C') or Supplier ('S') role? +static char role = 'S'; + +// Handle the command-line arguments. + +static void +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "Ch:p:S"); + + port_number = ACE_DEFAULT_SERVER_PORT; + host_name = ACE_DEFAULT_SERVER_HOST; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'C': + role = c; + break; + case 'h': + host_name = get_opt.optarg; + break; + case 'p': + port_number = ACE_OS::atoi (get_opt.optarg); + break; + case 'S': + role = c; + break; + default: + ACE_ERROR ((LM_ERROR, + "usage: %n [-p portnum] [-h host_name]\n%a", 1)); + /* NOTREACHED */ + break; + } + + // Increment by 1 if we're the supplier to mirror the default + // behavior of the Event_Server (which sets the Consumer port to + // ACE_DEFAULT_SERVER_PORT and the Supplier port to + // ACE_DEFAULT_SERVER_PORT + 1). + if (role == 'S' && port_number == ACE_DEFAULT_SERVER_PORT) + port_number++; +} class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> // = TITLE @@ -34,6 +80,9 @@ public: virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); // Close down via SIGINT. + virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + // Close down the event loop. + private: int receiver (void); // Reads data from socket and writes to ACE_STDOUT. @@ -42,6 +91,14 @@ private: // Writes data from ACE_STDIN to socket. }; +int +Event_Transceiver::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + ACE_Service_Config::end_reactor_event_loop (); + return 0; +} + // Close down via SIGINT. int @@ -71,8 +128,7 @@ int Event_Transceiver::open (void *) { if (ACE_Service_Config::reactor ()->register_handler - (this->peer ().get_handle (), - this, + (this->peer ().get_handle (), this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); else if (ACE::register_stdin_handler (this, @@ -95,7 +151,8 @@ Event_Transceiver::handle_input (ACE_HANDLE handle) int Event_Transceiver::forwarder (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver forwarder\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s forwarder\n", + role == 'C' ? "Consumer" : "Supplier")); char buf[BUFSIZ]; ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); @@ -104,14 +161,16 @@ Event_Transceiver::forwarder (void) if (n <= 0 || this->peer ().send_n (buf, n) != n) result = -1; - ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver forwarder\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s forwarder\n", + role == 'C' ? "Consumer" : "Supplier")); return result; } int Event_Transceiver::receiver (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver receiver\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s receiver\n", + role == 'C' ? "Consumer" : "Supplier")); char buf[BUFSIZ]; @@ -121,43 +180,11 @@ Event_Transceiver::receiver (void) if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n) result = -1; - ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver receiver\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s receiver\n", + role == 'C' ? "Consumer" : "Supplier")); return result; } -// Port number of event server. -static u_short port_number; - -// Name of event server. -static char *host_name; - -// Handle the command-line arguments. - -static void -parse_args (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "h:p:"); - - port_number = ACE_DEFAULT_SERVER_PORT; - host_name = ACE_DEFAULT_SERVER_HOST; - - for (int c; (c = get_opt ()) != -1; ) - switch (c) - { - case 'h': - host_name = get_opt.optarg; - break; - case 'p': - port_number = ACE_OS::atoi (get_opt.optarg); - break; - default: - ACE_ERROR ((LM_ERROR, - "usage: %n [-p portnum] [-h host_name]\n%a", 1)); - /* NOTREACHED */ - break; - } -} - int main (int argc, char *argv[]) { @@ -169,7 +196,8 @@ main (int argc, char *argv[]) ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector; Event_Transceiver transceiver; - if (connector.connect (&transceiver, ACE_INET_Addr (port_number, host_name)) == -1) + if (connector.connect (&transceiver, + ACE_INET_Addr (port_number, host_name)) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1); // Run event loop until either the event server shuts down or we get @@ -177,11 +205,3 @@ main (int argc, char *argv[]) ACE_Service_Config::run_reactor_event_loop (); return 0; } -#else -int -main (void) -{ - ACE_ERROR ((LM_ERROR, "test not defined for this platform\n")); - return 0; -} -#endif /* ACE_HAS_THREADS */ diff --git a/examples/Connection/non_blocking/CPP-connector.cpp b/examples/Connection/non_blocking/CPP-connector.cpp index b2628abfa52..130692028f2 100644 --- a/examples/Connection/non_blocking/CPP-connector.cpp +++ b/examples/Connection/non_blocking/CPP-connector.cpp @@ -1,9 +1,8 @@ -#if !defined (CPP_CONNECTOR_C) // $Id$ +#if !defined (CPP_CONNECTOR_C) #define CPP_CONNECTOR_C - #include "CPP-connector.h" #define PR_ST_1 ACE_PEER_STREAM_1 @@ -27,7 +26,8 @@ Peer_Handler<PR_ST_2>::open (void *) this->action_ = &Peer_Handler<PR_ST_2>::connected; if (this->reactor ()) - this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK); + this->reactor ()->register_handler + (this, ACE_Event_Handler::WRITE_MASK); else { while (this->connected () != -1) @@ -48,10 +48,11 @@ template <PR_ST_1> int Peer_Handler<PR_ST_2>::disconnecting (void) { char buf[BUFSIZ]; - int n; + ssize_t n = this->peer ().recv (buf, sizeof buf); - if ((n = this->peer ().recv (buf, sizeof buf)) > 0) + if (n > 0) ACE_OS::write (ACE_STDOUT, buf, n); + this->action_ = &Peer_Handler<PR_ST_2>::idle; return -1; } @@ -67,12 +68,12 @@ template <PR_ST_1> int Peer_Handler<PR_ST_2>::connected (void) { char buf[BUFSIZ]; - int n; ACE_DEBUG ((LM_DEBUG, "please enter input..: ")); - if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0 - && this->peer ().send_n (buf, n) != n) + ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); + + if (n > 0 && this->peer ().send_n (buf, n) != n) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "write failed"), -1); else if (n == 0) /* Explicitly close the connection. */ { @@ -89,13 +90,12 @@ template <PR_ST_1> int Peer_Handler<PR_ST_2>::stdio (void) { char buf[BUFSIZ]; - int n; - ACE_DEBUG ((LM_DEBUG, "stdio!\n")); - - ACE_DEBUG ((LM_DEBUG, "please enter input..: ")); + ACE_DEBUG ((LM_DEBUG, "stdio!\nplease enter input..: ")); - if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0) + ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); + + if (n > 0) { ACE_OS::write (ACE_STDOUT, buf, n); return 0; @@ -125,13 +125,20 @@ Peer_Handler<PR_ST_2>::handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { ACE_DEBUG ((LM_DEBUG, "closing down (%d)\n", mask)); + + // When the socket closes down, then we'll switch over to reading + // from stdin and writing to stdout. if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)) { this->action_ = &Peer_Handler<PR_ST_2>::stdio; this->peer ().close (); ACE_OS::rewind (stdin); - return this->reactor () && this->reactor ()->register_handler - (ACE_STDIN, this, ACE_Event_Handler::READ_MASK); + + if (this->reactor ()) + return ACE::register_stdin_handler + (this, this->reactor (), ACE_Service_Config::thr_mgr ()); + else + return 0; } else if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)) delete this; @@ -167,8 +174,11 @@ IPC_Client<SH, PR_CO_2>::init (int argc, char *argv[]) this->inherited::open (ACE_Service_Config::reactor ()); char *r_addr = argc > 1 ? argv[1] : - ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST, ACE_DEFAULT_SERVER_PORT_STR); - ACE_Time_Value timeout (argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_TIMEOUT); + ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST, + ACE_DEFAULT_SERVER_PORT_STR); + ACE_Time_Value timeout (argc > 2 + ? ACE_OS::atoi (argv[2]) + : ACE_DEFAULT_TIMEOUT); char *l_addr = argc > 3 ? argv[3] : ACE_DEFAULT_LOCAL_PORT_STR; // Handle signals through the ACE_Reactor. diff --git a/examples/Logger/Acceptor-server/server_loggerd.cpp b/examples/Logger/Acceptor-server/server_loggerd.cpp index 20522d9168c..89ed3a859a7 100644 --- a/examples/Logger/Acceptor-server/server_loggerd.cpp +++ b/examples/Logger/Acceptor-server/server_loggerd.cpp @@ -242,7 +242,7 @@ main (int argc, char *argv[]) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (REACTOR::instance ()->register_handler - (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1) + (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); diff --git a/examples/Logger/simple-server/Logging_Acceptor.cpp b/examples/Logger/simple-server/Logging_Acceptor.cpp index 641ed58256c..c971f3537e9 100644 --- a/examples/Logger/simple-server/Logging_Acceptor.cpp +++ b/examples/Logger/simple-server/Logging_Acceptor.cpp @@ -34,7 +34,7 @@ Logging_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) Logging_Acceptor::~Logging_Acceptor (void) { this->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::READ_MASK); + ACE_Event_Handler::ACCEPT_MASK); } // Returns underlying device descriptor. diff --git a/examples/Logger/simple-server/server_loggerd.cpp b/examples/Logger/simple-server/server_loggerd.cpp index 2d65a8b56db..dedb8366289 100644 --- a/examples/Logger/simple-server/server_loggerd.cpp +++ b/examples/Logger/simple-server/server_loggerd.cpp @@ -1,9 +1,9 @@ -// This server daemon collects, formats, and displays logging // $Id$ +// This server daemon collects, formats, and displays logging // information forwarded from client daemons running on other hosts in // the network. - +// // In addition, it also illustrates how the ACE_Reactor framework is // used. @@ -47,7 +47,7 @@ main (int argc, char *argv[]) if (peer_acceptor.open (addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (REACTOR::instance ()->register_handler - (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1) + (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); // Run forever, performing logging service. diff --git a/examples/Reactor/Ntalker/ntalker.cpp b/examples/Reactor/Ntalker/ntalker.cpp index 3234c201048..f81a2c75376 100644 --- a/examples/Reactor/Ntalker/ntalker.cpp +++ b/examples/Reactor/Ntalker/ntalker.cpp @@ -115,10 +115,9 @@ Handle_Events::Handle_Events (u_short udp_port, if (this->mcast_.subscribe (sockmc_addr, 1, interface) == -1) ACE_OS::perror ("can't subscribe to multicast group"), ACE_OS::exit (1); - // disable loopbacks - -// if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 ) -// ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1); + // Disable loopbacks. + // if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 ) + // ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1); this->handle_set_.set_bit (0); this->handle_set_.set_bit (this->mcast_.get_handle ()); diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i index cbd5ffdb04b..306222534d7 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i @@ -62,7 +62,7 @@ Handle_Broadcast::init (int argc, char *argv[]) if (this->open (sba) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -71,7 +71,7 @@ ACE_INLINE int Handle_Broadcast::fini (void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i index 8b631a6f252..7ebf45ba2f3 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i @@ -62,7 +62,7 @@ Handle_L_CODgram::init (int argc, char *argv[]) if (this->open (sucd) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; @@ -72,7 +72,7 @@ ACE_INLINE int Handle_L_CODgram::fini(void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE ACE_HANDLE diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i index e3ecb36ffd9..8946947f4fd 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i @@ -61,7 +61,7 @@ Handle_L_Dgram::init (int argc, char *argv[]) if (this->open (sudg) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -69,7 +69,7 @@ Handle_L_Dgram::init (int argc, char *argv[]) ACE_INLINE int Handle_L_Dgram::fini (void) { - return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK); + return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i index f8044d312d9..8b667cb6681 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i @@ -55,7 +55,7 @@ Handle_L_FIFO::init (int argc, char *argv[]) if (this->open (rendezvous_fifo) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -63,7 +63,7 @@ Handle_L_FIFO::init (int argc, char *argv[]) ACE_INLINE int Handle_L_FIFO::fini (void) { - return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK); + return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i index ff59a5ffd6b..048f66cbcff 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - #include "ace/Get_Opt.h" ACE_INLINE @@ -78,7 +77,7 @@ Handle_L_Pipe::init (int argc, char *argv[]) if (this->open (sup) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; @@ -88,7 +87,7 @@ ACE_INLINE int Handle_L_Pipe::fini (void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i index 9bb576deccf..e129c26521c 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i @@ -60,8 +60,8 @@ Handle_L_SPIPE::init (int argc, char *argv[]) susp.set (rendezvous); if (this->open (susp) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - else if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -69,7 +69,8 @@ Handle_L_SPIPE::init (int argc, char *argv[]) ACE_INLINE int Handle_L_SPIPE::fini (void) { - return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK); + return ACE_Service_Config::reactor ()->remove_handler + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i index 60dbe34fb47..3fd2c09e2a7 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i @@ -71,7 +71,7 @@ Handle_L_Stream::init (int argc, char *argv[]) if (this->open (sus) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; @@ -81,7 +81,7 @@ ACE_INLINE int Handle_L_Stream::fini (void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE ACE_HANDLE diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i index c683f7bbab6..aaf11ceacb0 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i @@ -57,7 +57,7 @@ Handle_R_Dgram::init (int argc, char *argv[]) if (this->open (sidg) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -66,7 +66,7 @@ ACE_INLINE int Handle_R_Dgram::fini (void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i index 8c4439f8c0f..aa17b7ca160 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i +++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i @@ -58,8 +58,8 @@ Handle_R_Stream::init (int argc, char *argv[]) if (this->open (sis) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - else if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::ACCEPT_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1); return 0; } @@ -67,7 +67,8 @@ Handle_R_Stream::init (int argc, char *argv[]) ACE_INLINE int Handle_R_Stream::fini (void) { - return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK); + return ACE_Service_Config::reactor ()->remove_handler + (this, ACE_Event_Handler::ACCEPT_MASK); } ACE_INLINE int diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp index 94b41367a5c..f7e2d98f69a 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp +++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp @@ -1,6 +1,6 @@ -#if !defined (ACE_HANDLE_THR_STREAM_C) // $Id$ +#if !defined (ACE_HANDLE_THR_STREAM_C) #define ACE_HANDLE_THR_STREAM_C #include "ace/Get_Opt.h" @@ -13,31 +13,20 @@ #include "Handle_Thr_Stream.i" #endif /* __ACE_INLINE__ */ -// Shorthand names. -#define SH SVC_HANDLER -#define PR_AC_1 ACE_PEER_ACCEPTOR_1 -#define PR_AC_2 ACE_PEER_ACCEPTOR_2 -#define PR_ST_1 ACE_PEER_STREAM_1 -#define PR_ST_2 ACE_PEER_STREAM_2 - template <class SH, PR_AC_1> -Handle_Thr_Stream<SH, PR_AC_2>::~Handle_Thr_Stream (void) +Handle_Thr_Acceptor<SH, PR_AC_2>::~Handle_Thr_Acceptor (void) { } template <class SH, PR_AC_1> -Handle_Thr_Stream<SH, PR_AC_2>::Handle_Thr_Stream (void) -#if defined (ACE_HAS_THREADS) +Handle_Thr_Acceptor<SH, PR_AC_2>::Handle_Thr_Acceptor (void) : thr_flags_ (THR_DETACHED | THR_NEW_LWP) -#else - : thr_flags_ (0) -#endif /* ACE_HAS_THREADS */ { } template <class SH, PR_AC_1> int -Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp, - size_t length) const +Handle_Thr_Acceptor<SH, PR_AC_2>::info (char **strp, + size_t length) const { char buf[BUFSIZ]; ACE_INET_Addr sa; @@ -56,7 +45,7 @@ Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp, } template <class SH, PR_AC_1> int -Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[]) +Handle_Thr_Acceptor<SH, PR_AC_2>::init (int argc, char *argv[]) { ACE_INET_Addr local_addr (inherited::DEFAULT_PORT_); int n_threads = ACE_DEFAULT_THREADS; @@ -64,19 +53,17 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[]) ACE_Get_Opt get_opt (argc, argv, "p:t:", 0); for (int c; (c = get_opt ()) != -1; ) - { - switch (c) - { - case 'p': - local_addr.set (ACE_OS::atoi (get_opt.optarg)); - break; - case 't': - n_threads = ACE_OS::atoi (get_opt.optarg); - break; - default: - break; - } - } + switch (c) + { + case 'p': + local_addr.set (ACE_OS::atoi (get_opt.optarg)); + break; + case 't': + n_threads = ACE_OS::atoi (get_opt.optarg); + break; + default: + break; + } // Initialize the threading strategy. if (this->thr_strategy_.open (&this->thr_mgr_, @@ -94,10 +81,10 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[]) } template <class SH, PR_AC_1> int -Handle_Thr_Stream<SH, PR_AC_2>::fini (void) +Handle_Thr_Acceptor<SH, PR_AC_2>::fini (void) { return ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::READ_MASK); + (this, ACE_Event_Handler::ACCEPT_MASK); } template <PR_ST_1> @@ -112,7 +99,7 @@ CLI_Stream<PR_ST_2>::close (u_long) ACE_DEBUG ((LM_DEBUG, "(%t) client stream object closing down\n")); this->peer ().close (); - /* Must be allocated dynamically! */ + // Must be allocated dynamically! delete this; return 0; } @@ -159,12 +146,6 @@ CLI_Stream<PR_ST_2>::svc (void) return 0; } -#undef SH -#undef PR_AC_1 -#undef PR_AC_2 -#undef PR_ST_1 -#undef PR_ST_2 - //---------------------------------------- #if defined (ACE_HAS_TLI) @@ -181,19 +162,19 @@ CLI_Stream<PR_ST_2>::svc (void) #include "ace/INET_Addr.h" typedef CLI_Stream <THR_STREAM> CLI_STREAM; -typedef Handle_Thr_Stream<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_STREAM; +typedef Handle_Thr_Acceptor<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_ACCEPTOR; -/* Static class variables */ +// Static class variables. -u_short HANDLE_THR_STREAM::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT; +u_short HANDLE_THR_ACCEPTOR::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT; -/* Service object */ -HANDLE_THR_STREAM remote_thr_stream; +// Service object. +HANDLE_THR_ACCEPTOR remote_thr_stream; ACE_Service_Object_Type rts (&remote_thr_stream, "Remote_Thr_Stream"); #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) template class CLI_Stream<THR_STREAM>; -template class Handle_Thr_Stream<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>; +template class Handle_Thr_Acceptor<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ #endif /* ACE_HAS_THREADS */ diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h index 9c12d872f47..be17d2d0338 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h +++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h @@ -1,7 +1,7 @@ /* -*- C++ -*- */ // $Id$ -/* Handle connections from remote INET connections. */ +// Handle connections from remote INET connections. #if !defined (_HANDLE_THR_STREAM_H) #define _HANDLE_THR_STREAM_H @@ -12,12 +12,12 @@ #if defined (ACE_HAS_THREADS) template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1> -class Handle_Thr_Stream : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> +class Handle_Thr_Acceptor : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> { public: // = Initialization and termination. - Handle_Thr_Stream (void); - ~Handle_Thr_Stream (void); + Handle_Thr_Acceptor (void); + ~Handle_Thr_Acceptor (void); // = Dynamic linking hooks. virtual int init (int argc, char *argv[]); @@ -25,7 +25,7 @@ public: virtual int fini (void); private: - typedef Handle_Thr_Stream<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited; + typedef Handle_Thr_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited; static u_short DEFAULT_PORT_; |