diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-06-24 07:05:11 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-06-24 07:05:11 +0000 |
commit | 13c7fde4a38f03871c9d433456f14c16e4921b42 (patch) | |
tree | 2aca6c6087645136202067b5299b436bef97a88c | |
parent | f4a9897a651df2556b2e4c387cb6a05021cf8182 (diff) | |
download | ATCD-13c7fde4a38f03871c9d433456f14c16e4921b42.tar.gz |
*** empty log message ***
-rw-r--r-- | ChangeLog-98b | 8 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp | 39 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Consumer_Router.h | 38 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp | 8 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Event_Analyzer.h | 25 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Options.h | 25 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Peer_Router.cpp | 301 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Peer_Router.h | 69 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp | 54 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Supplier_Router.h | 35 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/event_server.cpp | 301 | ||||
-rw-r--r-- | examples/ASX/Event_Server/Transceiver/transceiver.cpp | 246 |
12 files changed, 744 insertions, 405 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b index b0c67ac0185..68695e1c2af 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,3 +1,11 @@ +Wed Jun 24 00:00:44 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * examples/ASX/Event_Server/Transceiver/transceiver.cpp: Cleaned + up the Event_Transceiver code a bit. + + * examples/ASX/Event_Server/Event_Server/event_server.cpp (main): + Cleaned up the code a bit. + Wed Jun 24 06:54:00 EET DST 1998 Wei Chiang <chiang@horizon.ntc.nokia.com> * Attached "CLASSIX_" to all the file names in ace/CLASSIX. * Modified include statments in ace/CLASSIX/* to reflect the diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp index f76a18012e0..27788af3719 100644 --- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp @@ -16,14 +16,17 @@ Consumer_Router::open (void *) { if (this->is_writer ()) { - // Set the Peer_Router_Context to point back to us so that if + // Set the <Peer_Router_Context> to point back to us so that if // any Consumer's "accidentally" send us data we'll be able to - // handle it. + // handle it by passing it down the stream. this->context ()->peer_router (this); - // Make this an active object to handle the error cases in a - // separate thread. + // Increment the reference count. this->context ()->duplicate (); + + // Make this an active object to handle the error cases in a + // separate thread. This is mostly just for illustration, i.e., + // it's probably overkill to use a thread for this! return this->activate (Options::instance ()->t_flags ()); } else // if (this->is_reader ()) @@ -43,7 +46,7 @@ Consumer_Router::close (u_long) // Inform the thread to shut down. this->msg_queue ()->deactivate (); - // Both writer and reader call release(), so the context knows when + // Both writer and reader call <release>, so the context knows when // to clean itself up. this->context ()->release (); return 0; @@ -64,7 +67,8 @@ Consumer_Router::svc (void) ) { ACE_DEBUG ((LM_DEBUG, - "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n")); + "(%t) warning: Consumer_Router is " + "forwarding a message to Supplier_Router\n")); // Pass this message down to the next Module's writer Task. if (this->put_next (mb) == -1) @@ -85,8 +89,8 @@ int Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) { - // Perform the necessary control operations before passing - // the message down the stream. + // Perform the necessary control operations before passing the + // message down the stream. if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) { @@ -94,7 +98,7 @@ Consumer_Router::put (ACE_Message_Block *mb, return this->put_next (mb); } - // If we're the reader side then we're responsible for broadcasting + // If we're the reader then we're responsible for broadcasting // messages to Consumers. else if (this->is_reader ()) @@ -108,13 +112,14 @@ Consumer_Router::put (ACE_Message_Block *mb, } else // if (this->is_writer ()) - // Queue up the message to processed by Consumer_Router::svc(). + // Queue up the message to processed by <Consumer_Router::svc> // Since we don't expect to be getting many of these messages, we // queue them up and run them in a separate thread to avoid taxing // the main thread. return this->putq (mb); } -// Return information about the Client_Router ACE_Module. + +// Return information about the <Consumer_Router>. int Consumer_Router::info (char **strp, size_t length) const @@ -126,13 +131,17 @@ Consumer_Router::info (char **strp, size_t length) const if (this->context ()->acceptor ().get_local_addr (addr) == -1) return -1; - ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n", - mod_name, addr.get_port_number (), "tcp", - "# consumer router", this->is_reader () ? "reader" : "writer"); - + ACE_OS::sprintf (buf, + "%s\t %d/%s %s (%s)\n", + mod_name, + addr.get_port_number (), + "tcp", + "# consumer router", + this->is_reader () ? "reader" : "writer"); if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) return -1; else ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); } diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h index cdaf7090b97..56e58c58c37 100644 --- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h @@ -10,21 +10,38 @@ #include "Peer_Router.h" class Consumer_Router : public Peer_Router +{ // = TITLE // Provides the interface between one or more Consumers and the - // Event Server ACE_Stream. -{ + // Event Server <ACE_Stream>. + // + // = DESCRIPTION + // This class normally sits on "top" of the Stream and routes + // messages coming from "downstream" to all the Consumers + // connected to it via its "read" <Task>. Normally, the messages + // flow up the stream from <Supplier_Router>s. However, if + // Consumers transmit data to the <Consumer_Router>, we dutifully + // push it out to the Suppliers via the <Supplier_Router>. + // + // When used on the "reader" side of a Stream, the + // <Consumer_Router> simply forwards all messages up the stream. + // When used on the "writer" side, the <Consumer_Router> queues + // up outgoing messages to suppliers and sends them down to the + // <Supplier_Router> in a separate thread. The reason for this + // is that it's really an "error" for a <Consumer_Router> to + // send messages to Suppliers, so we don't expect this to happen + // very much. When it does we use a separate thread to avoid + // taxing the main thread, which processes "normal" messages. + // + // All of the methods in this class except the constructor are + // called via base class pointers by the <ACE_Stream>. + // Therefore, we can put them in the protected section. public: Consumer_Router (Peer_Router_Context *prc); // Initialization method. protected: // = ACE_Task hooks. - - // All of these methods are called via base class pointers by the - // ACE Stream apparatus. Therefore, we can put them in the - // protected section. - virtual int open (void *a = 0); // Called by the Stream to initialize the router. @@ -32,16 +49,15 @@ protected: // Called by the Stream to shutdown the router. virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - // Called by the Consumer_Handler to pass a message to the Router. - // The Router queues up this message, which is then processed in the - // <svc> method in a separate thread. + // Called by the <Peer_Handler> to pass a message to the + // <Consumer_Router>. The <Consumer_Router> queues up this message, + // which is then processed in the <svc> method in a separate thread. virtual int svc (void); // Runs in a separate thread to dequeue messages and pass them up // the stream. // = Dynamic linking hooks. - virtual int info (char **info_string, size_t length) const; // Returns information about this service. }; diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp index 5f90e1941ad..5a796f31bd8 100644 --- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp @@ -6,12 +6,14 @@ int Event_Analyzer::open (void *) { + // No-op for now... return 0; } int Event_Analyzer::close (u_long) { + // No-op for now... return 0; } @@ -37,24 +39,28 @@ int Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) { if (Options::instance ()->debug ()) - ACE_DEBUG ((LM_DEBUG, "(%t) passing through Event_Analyser::put() (%s)\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) passing through Event_Analyser::put() (%s)\n", this->is_reader () ? "reader" : "writer")); if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) this->control (mb); + // Just pass the message along to the next Module in the stream... return this->put_next (mb); } int Event_Analyzer::init (int, char *[]) { + // No-op for now. return 0; } int Event_Analyzer::fini (void) { + // No-op for now. return 0; } diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h index a9497730421..a5c18496789 100644 --- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h @@ -10,24 +10,31 @@ #include "ace/Synch.h" class Event_Analyzer : public ACE_Task<ACE_SYNCH> - // = TITLE - // This is a "no-op" class that just forwards all its message - // blocks onto its neighboring Module in the Stream. In a real - // application these tasks would be where the Stream processing - // would go. { + // = TITLE + // This class forwards all the <ACE_Message_Block>s it receives + // onto its neighboring Module in the Stream. + // + // = DESCRIPTION + // In a "real" event service, application-specific processing + // would be done in the <put> (or <svc>) method in this class. public: + // = Initialization hooks called by <ACE_Stream> (not used). virtual int open (void *a = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - // Dynamic linking hooks. + virtual int put (ACE_Message_Block *msg, + ACE_Time_Value * = 0); + // Entry point into this task. + + // Dynamic linking hooks (not used). virtual int init (int argc, char *argv[]); virtual int fini (void); - virtual int info (char **info_string, size_t length) const; - + virtual int info (char **info_string, + size_t length) const; private: virtual int control (ACE_Message_Block *); + // Implements the watermark control processing. }; #endif /* _EVENT_ANALYZER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h index 639f013ac73..9e9bb0e7a14 100644 --- a/examples/ASX/Event_Server/Event_Server/Options.h +++ b/examples/ASX/Event_Server/Event_Server/Options.h @@ -8,51 +8,67 @@ #include "ace/Profile_Timer.h" class Options +{ // = TITLE // Option Singleton for Event Server. -{ - friend class ACE_Shutup_GPlusPlus; // Turn off g++ warning... public: static Options *instance (void); + // Singleton access point. void parse_args (int argc, char *argv[]); + // Parse the command-line arguments and set the options. + // = Timer management. void stop_timer (void); void start_timer (void); + // = Set/get the number of threads. void thr_count (size_t count); size_t thr_count (void); + // = Set/get the size of the queue. void initial_queue_length (size_t length); size_t initial_queue_length (void); + // = Set/get the high water mark. void high_water_mark (size_t size); size_t high_water_mark (void); + // = Set/get the high water mark. void low_water_mark (size_t size); size_t low_water_mark (void); + // = Set/get the size of a message. void message_size (size_t size); size_t message_size (void); + // = Set/get the number of iterations. void iterations (size_t n); size_t iterations (void); + // Set/get threading flags. void t_flags (long flag); long t_flags (void); + // Set/get supplier port number. void supplier_port (u_short port); u_short supplier_port (void); + // Set/get consumer port number. void consumer_port (u_short port); u_short consumer_port (void); + // Enabled if we're in debugging mode. int debug (void); + + // Enabled if we're in verbose mode. int verbose (void); + // Print the results to the STDERR. void print_results (void); private: + // = Ensure we're a Singleton. Options (void); ~Options (void); @@ -63,7 +79,7 @@ private: // Number of threads to spawn. long t_flags_; - // Flags to thr_create(). + // Flags to <thr_create>. size_t high_water_mark_; // ACE_Task high water mark. @@ -95,7 +111,8 @@ private: static Options *instance_; // Static Singleton. - + friend class ACE_Shutup_GPlusPlus; + // Turn off g++ warning... }; #include "Options.i" diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp index f1cd0afac5f..1dd8c99a159 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -8,6 +8,11 @@ #include "Options.h" #include "Peer_Router.h" +// Send the <ACE_Message_Block> to all the peers. Note that in a +// "real" application this logic would most likely be more selective, +// i.e., it would actually do "routing" based on addressing +// information passed in the <ACE_Message_Block>. + int Peer_Router_Context::send_peers (ACE_Message_Block *mb) { @@ -18,7 +23,12 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb) // Skip past the header and get the message to send. ACE_Message_Block *data_block = mb->cont (); - // "Multicast" the data to *all* the registered peers. + // Use an iterator to "multicast" the data to *all* the registered + // peers. Note that this doesn't really multicast, it just makes a + // "logical" copy of the <ACE_Message_Block> and enqueues it in the + // appropriate <Peer_Handler> corresponding to each peer. Note that + // a "real" application would probably "route" the data to a subset + // of connected peers here, rather than send it to all the peers. for (PEER_ENTRY *ss = 0; map_iter.next (ss) != 0; @@ -28,9 +38,11 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb) ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via handle %d\n", ss->ext_id_)); + iterations++; + // Increment reference count before sending since the - // Peer_Handler might be running in its own thread of control. + // <Peer_Handler> might be running in its own thread of control. bytes += ss->int_id_->put (data_block->duplicate ()); } @@ -38,12 +50,23 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb) return bytes == 0 ? 0 : bytes / iterations; } +// Remove the <Peer_Handler> from the peer connection map. + int Peer_Router_Context::unbind_peer (ROUTING_KEY key) { return this->peer_map_.unbind (key); } +// Add the <Peer_Handler> to the peer connection map. + +int +Peer_Router_Context::bind_peer (ROUTING_KEY key, + Peer_Handler *peer_handler) +{ + return this->peer_map_.bind (key, peer_handler); +} + void Peer_Router_Context::duplicate (void) { @@ -60,39 +83,36 @@ Peer_Router_Context::release (void) delete this; } -int -Peer_Router_Context::bind_peer (ROUTING_KEY key, - Peer_Handler *peer_handler) -{ - return this->peer_map_.bind (key, peer_handler); -} - Peer_Router_Context::Peer_Router_Context (u_short port) : reference_count_ (0) { - // Perform initializations. - + // Initialize the Acceptor's "listen-mode" socket. if (this->open (ACE_INET_Addr (port)) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "Acceptor::open")); + ACE_ERROR ((LM_ERROR, + "%p\n", + "Acceptor::open")); + // Initialize the connection map. else if (this->peer_map_.open () == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "Map_Manager::open")); - + ACE_ERROR ((LM_ERROR, + "%p\n", + "Map_Manager::open")); else { ACE_INET_Addr addr; - if (this->acceptor().get_local_addr (addr) != -1) + if (this->acceptor ().get_local_addr (addr) != -1) ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s on port = %d, handle = %d, this = %u\n", - addr.get_port_number () == Options::instance ()->supplier_port () ? - "Supplier_Handler" : "Consumer_Handler", + addr.get_port_number () == Options::instance ()->supplier_port () + ? "Supplier_Handler" : "Consumer_Handler", addr.get_port_number (), this->acceptor().get_handle (), this)); else ACE_ERROR ((LM_ERROR, - "%p\n", "get_local_addr")); + "%p\n", + "get_local_addr")); } } @@ -107,7 +127,8 @@ Peer_Router_Context::~Peer_Router_Context (void) PEER_ITERATOR map_iter = this->peer_map_; - // Make sure to take all the handles out of the map. + // Make sure to take all the handles out of the map to avoid + // "resource leaks." for (PEER_ENTRY *ss = 0; map_iter.next (ss) != 0; @@ -119,8 +140,11 @@ Peer_Router_Context::~Peer_Router_Context (void) ss->ext_id_)); if (ACE_Reactor::instance ()->remove_handler - (ss->ext_id_, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, "(%t) p\n", "remove_handle")); + (ss->ext_id_, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) p\n", + "remove_handle")); } // Close down the map. @@ -139,72 +163,32 @@ Peer_Router_Context::peer_router (Peer_Router *pr) this->peer_router_ = pr; } +// Factory Method that creates a new <Peer_Handler> for each +// connection. + int Peer_Router_Context::make_svc_handler (Peer_Handler *&sh) { - ACE_NEW_RETURN (sh, Peer_Handler (this), -1); + ACE_NEW_RETURN (sh, + Peer_Handler (this), + -1); return 0; } Peer_Handler::Peer_Handler (Peer_Router_Context *prc) - : prc_ (prc) + : peer_router_context_ (prc) { } -#if 0 - -// Right now, Peer_Handlers are purely Reactive, i.e., they all run in -// a single thread of control. It would be easy to make them Active -// Objects by calling activate() in Peer_Handler::open(), making -// Peer_Handler::put() enqueue each message on the message queue, and -// (3) then running the following svc() routine to route each message -// to its final destination within a separate thread. Note that we'd -// want to move the svc() call up to the Consumer_Router and -// Supplier_Router level in order to get the right level of control -// for input and output. - -Peer_Handler::svc (void) -{ - ACE_Message_Block *db, *hb; - int n; - - // Do an endless loop - for (;;) - { - db = new Message_Block (BUFSIZ); - hb = new Message_Block (sizeof (ROUTING_KEY), Message_Block::MB_PROTO, db); - - if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1) - LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1); - else if (n == 0) // Client has closed down the connection. - { - if (this->prc_->peer_router ()->unbind_peer (this->get_handle ()) == -1) - LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1); - LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n")); - return -1; // We do not need to be deregistered by reactor - // as we were not registered at all - } - else - // Transform incoming buffer into a Message and pass - // downstream. - { - db->wr_ptr (n); - *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. - hb->wr_ptr (sizeof (long)); - - if (this->prc_->peer_router ()->reply (hb) == -1) - { - cout << "Peer_Handler.svc : peer_router->reply failed" << endl ; - return -1; - } - } - } - return 0; -} -#endif +// Send output to a peer. Note that this implementation "blocks" if +// flow control occurs. This is undesirable for "real" applications. +// The best way around this is to make the <Peer_Handler> an Active +// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway +// application. int -Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +Peer_Handler::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { #if 0 // If we're running as Active Objects just enqueue the message here. @@ -212,14 +196,13 @@ Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) #else ACE_UNUSED_ARG (tv); - int result = 0; - - result = this->peer ().send_n (mb->rd_ptr (), - mb->length ()); + int result = this->peer ().send_n (mb->rd_ptr (), + mb->length ()); // Release the memory. mb->release (); + return result; -#endif +#endif /* 0 */ } // Initialize a newly connected handler. @@ -229,29 +212,45 @@ Peer_Handler::open (void *) { char buf[BUFSIZ], *p = buf; - if (this->prc_->peer_router ()->info (&p, sizeof buf) != -1) - ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, handle = %d\n", - buf, this->get_handle ())); + if (this->peer_router_context_->peer_router ()->info (&p, + sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, + "(%t) creating handler for %s, handle = %d\n", + buf, + this->get_handle ())); else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "info"), + -1); #if 0 // If we're running as an Active Object activate the Peer_Handler // here. if (this->activate (Options::instance ()->t_flags ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "activation of thread failed"), + -1); ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler::open registering with Reactor for handle_input\n")); #else + // Register with the Reactor to receive messages from our Peer. if (ACE_Reactor::instance ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); -#endif + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "register_handler"), + -1); +#endif /* 0 */ // Insert outselves into the routing map. - else if (this->prc_->bind_peer (this->get_handle (), this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); - + else if (this->peer_router_context_->bind_peer (this->get_handle (), + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "bind_peer"), + -1); else return 0; } @@ -261,7 +260,9 @@ Peer_Handler::open (void *) int Peer_Handler::handle_input (ACE_HANDLE h) { - ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on handle %d\n", h)); + ACE_DEBUG ((LM_DEBUG, + "(%t) input arrived on handle %d\n", + h)); ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY), @@ -276,27 +277,39 @@ Peer_Handler::handle_input (ACE_HANDLE h) return -1; } - ssize_t n = this->peer ().recv (db->rd_ptr (), db->size ()); + ssize_t n = this->peer ().recv (db->rd_ptr (), + db->size ()); if (n == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p", + "recv failed"), + -1); else if (n == 0) // Client has closed down the connection. { - if (this->prc_->unbind_peer (this->get_handle ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); - - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down handle %d\n", h)); - return -1; // Instruct the ACE_Reactor to deregister us by returning -1. + if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p", + "unbind failed"), + -1); + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down handle %d\n", h)); + // Instruct the <ACE_Reactor> to deregister us by returning -1. + return -1; } else { - // Transform incoming buffer into a Message. + // Transform incoming buffer into an <ACE_Message_Block>. // First, increment the write pointer to the end of the newly // read data block. db->wr_ptr (n); - // Second, copy the "address" into the header block. + // Second, copy the "address" into the header block. Note that + // for this implementation the HANDLE we receive the message on + // is considered the "address." A "real" application would want + // to do something more sophisticated. *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // Third, update the write pointer in the header block. @@ -306,26 +319,31 @@ Peer_Handler::handle_input (ACE_HANDLE h) // use <Task::put> here because this gives the method at *our* // level in the stream a chance to do something with the message // before it is sent up the other side. For instance, if we - // receive messages in the Supplier_Router, it will just call - // <put_next> and send them up the stream to the Consumer_Router - // (which broadcasts them to consumers). However, if we receive - // messages in the Consumer_Router, it will reply to the - // Consumer with an error since it's not correct for Consumers - // to send messages. - return this->prc_->peer_router ()->put (hb) == -1 ? -1 : 0; + // receive messages in the <Supplier_Router>, it will just call + // <put_next> and send them up the stream to the + // <Consumer_Router> (which broadcasts them to consumers). + // However, if we receive messages in the <Consumer_Router>, it + // could reply to the Consumer with an error since it's not + // correct for Consumers to send messages (we don't do this in + // the current implementation, but it could be done in a "real" + // application). + + if (this->peer_router_context_->peer_router ()->put (hb) == -1) + return -1; + else + return 0; } } Peer_Router::Peer_Router (Peer_Router_Context *prc) - : prc_ (prc) + : peer_router_context_ (prc) { - } Peer_Router_Context * Peer_Router::context (void) const { - return this->prc_; + return this->peer_router_context_; } int @@ -346,6 +364,69 @@ Peer_Router::control (ACE_Message_Block *mb) return 0; } +#if 0 + +// Right now, Peer_Handlers are purely Reactive, i.e., they all run in +// a single thread of control. It would be easy to make them Active +// Objects by calling activate() in Peer_Handler::open(), making +// Peer_Handler::put() enqueue each message on the message queue, and +// (3) then running the following svc() routine to route each message +// to its final destination within a separate thread. Note that we'd +// want to move the svc() call up to the Consumer_Router and +// Supplier_Router level in order to get the right level of control +// for input and output. + +Peer_Handler::svc (void) +{ + ACE_Message_Block *db, *hb; + + // Do an endless loop + for (;;) + { + db = new Message_Block (BUFSIZ); + hb = new Message_Block (sizeof (ROUTING_KEY), + Message_Block::MB_PROTO, + db); + + ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ()); + + if (n == -1) + LM_ERROR_RETURN ((LOG_ERROR, + "%p", + "recv failed"), + -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1) + LM_ERROR_RETURN ((LOG_ERROR, + "%p", + "unbind failed"), + -1); + LM_DEBUG ((LOG_DEBUG, + "(%t) shutting down \n")); + + // We do not need to be deregistered by reactor + // as we were not registered at all. + return -1; + } + else + { + // Transform incoming buffer into a Message. + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. + hb->wr_ptr (sizeof (long)); + + // Pass the message to the stream. + if (this->peer_router_context_->peer_router ()->reply (hb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "Peer_Handler.svc : peer_router->reply failed"), + -1); + } + } + return 0; +} +#endif /* 0 */ #endif /* _PEER_ROUTER_C */ #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h index d9b7021f726..3369fe6415f 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h @@ -16,10 +16,10 @@ class Peer_Router; class Peer_Router_Context; class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +{ // = TITLE // Receive input from a Peer and forward to the appropriate - // <Peer_Router>. -{ + // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>). public: Peer_Handler (Peer_Router_Context * = 0); // Initialization method. @@ -29,28 +29,34 @@ public: // object. virtual int handle_input (ACE_HANDLE); - // Receive input from the peer. + // Receive input from a peer. virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Send output to a peer. + // Send output to a peer. Note that this implementation "blocks" if + // flow control occurs. This is undesirable for "real" + // applications. The best way around this is to make the + // <Peer_Handler> an Active Object, e.g., as done in the + // $ACE_ROOT/apps/Gateway/Gateway application. protected: - Peer_Router_Context *prc_; - // Pointer to router context. + Peer_Router_Context *peer_router_context_; + // Pointer to router context. This maintains the state that is + // shared by both Tasks in a <Peer_Router> Module. }; class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> +{ // = TITLE // Defines state and behavior shared between both Tasks in a - // Peer_Router Module. + // <Peer_Router> Module. // // = DESCRIPTION - // This class also serves as an Acceptor, which creates - // Peer_Handlers when Peers connect. -{ + // This class also serves as an <ACE_Acceptor>, which creates + // <Peer_Handlers> when Peers connect. public: // = Initialization and termination methods. Peer_Router_Context (u_short port); + // Constructor. virtual int unbind_peer (ROUTING_KEY); // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds @@ -61,29 +67,37 @@ public: // <ROUTING_KEY>. int send_peers (ACE_Message_Block *mb); - // Send the <ACE_Message_Block> to the peer(s). + // Send the <ACE_Message_Block> to all the peers. Note that in a + // "real" application this logic would most likely be more + // selective, i.e., it would actually do "routing" based on + // addressing information passed in the <ACE_Message_Block>. int make_svc_handler (Peer_Handler *&sh); - // Create a new <Peer_Handler> for each connection. + // Factory Method that creates a new <Peer_Handler> for each + // connection. This method overrides the default behavior in + // <ACE_Acceptor>. // = Set/Get Router Task. - Peer_Router *peer_router (); + Peer_Router *peer_router (void); void peer_router (Peer_Router *); void release (void); // Decrement the reference count and delete <this> when count == 0; void duplicate (void); - // Increment the reference count + // Increment the reference count. private: Peer_Router *peer_router_; // Pointer to the <Peer_Router> that we are accepting for. - // = Useful typedefs - typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> PEER_MAP; - typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> PEER_ITERATOR; - typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> PEER_ENTRY; + // = Useful typedefs. + typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> + PEER_MAP; + typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> + PEER_ITERATOR; + typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> + PEER_ENTRY; PEER_MAP peer_map_; // Map used to keep track of active peers. @@ -95,21 +109,22 @@ private: // Private to ensure dynamic allocation. friend class Friend_Of_Peer_Router_Context; - // declare a friend class to avoid compiler warnings because the + // Declare a friend class to avoid compiler warnings because the // destructor is private. }; class Peer_Router : public ACE_Task<ACE_SYNCH> +{ // = TITLE // This abstract base class provides mechanisms for routing - // messages to/from a ACE_Stream from/to one or more peers (which + // messages to/from a <ACE_Stream> from/to one or more peers (which // are typically running on remote hosts). // // = DESCRIPTION - // A subclass of Peer_Router overrides the open(), close(), and - // put() methods in order to specialize the behavior of the router - // to meet application-specific requirements. -{ + // Subclasses of <Peer_Router> (such as <Consumer_Router> or + // <Supplier_Router>) override the <open>, <close>, and + // <put> methods to specialize the behavior of the router to + // meet application-specific requirements. protected: Peer_Router (Peer_Router_Context *prc); // Initialization method. @@ -124,9 +139,9 @@ protected: // Helpful typedef. private: - Peer_Router_Context *prc_; - // Reference to the context shared by the writer and reader Tasks in - // the Consumer and Supplier Modules. + Peer_Router_Context *peer_router_context_; + // Reference to the context shared by the writer and reader Tasks, + // e.g., in the <Consumer_Router> and <Supplier_Router> Modules. // = Prevent copies and pass-by-value. Peer_Router (const Peer_Router &); diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp index 0ab012eaa4a..e42c9371eed 100644 --- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp @@ -17,9 +17,12 @@ Supplier_Router::svc (void) ) { ACE_DEBUG ((LM_DEBUG, - "(%t) warning: Supplier_Router is forwarding a message via send_peers\n")); + "(%t) warning: Supplier_Router is " + "forwarding a message via send_peers\n")); - // Broadcast the message to the Suppliers. + // Broadcast the message to the Suppliers, even though this is + // "incorrect" (assuming a oneway flow of events from Suppliers + // to Consumers)! if (this->context ()->send_peers (mb) == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -27,35 +30,39 @@ Supplier_Router::svc (void) -1); } - ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Supplier_Router\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) stopping svc in Supplier_Router\n")); return 0; } Supplier_Router::Supplier_Router (Peer_Router_Context *prc) : Peer_Router (prc) { + // Increment the reference count. this->context ()->duplicate (); } -// Initialize the Supplier Router. +// Initialize the Supplier Router. int Supplier_Router::open (void *) { if (this->is_reader ()) { - // Set the Peer_Router_Context to point back to us so that all the - // Peer_Handler's <put> their incoming <Message_Blocks> to our - // reader Task. + // Set the <Peer_Router_Context> to point back to us so that all + // the Peer_Handler's <put> their incoming <Message_Blocks> to + // our reader Task. this->context ()->peer_router (this); return 0; } else // if (this->is_writer () { + // Increment the reference count. + this->context ()->duplicate (); + // Make this an active object to handle the error cases in a // separate thread. - this->context ()->duplicate (); return this->activate (Options::instance ()->t_flags ()); } } @@ -65,7 +72,8 @@ Supplier_Router::open (void *) int Supplier_Router::close (u_long) { - ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) closing Supplier_Router %s\n", this->is_reader () ? "reader" : "writer")); if (this->is_writer ()) @@ -94,19 +102,23 @@ Supplier_Router::put (ACE_Message_Block *mb, } // If we're the reader then we are responsible for pass messages up - // to the next Module's writer Task. - + // to the next Module's reader Task. Note that in a "real" + // application this is likely where we'd take a look a the actual + // information that was in the message, e.g., in order to figure out + // what operation it was and what it's "parameters" where, etc. else if (this->is_reader ()) return this->put_next (mb); + else // if (this->is_writer ()) { // Someone is trying to write to the Supplier. In this - // implementation this is considered an error. However, we'll + // implementation this is considered an "error." However, we'll // just go ahead and forward the message to the Supplier (who // hopefully is prepared to receive it). - ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n")); + ACE_DEBUG ((LM_WARNING, + "(%t) warning: sending to a Supplier\n")); - // Queue up the message to processed by Supplier_Router::svc(). + // Queue up the message to processed by <Supplier_Router::svc>. // Since we don't expect to be getting many of these messages, // we queue them up and run them in a separate thread to avoid // taxing the main thread. @@ -114,7 +126,7 @@ Supplier_Router::put (ACE_Message_Block *mb, } } -// Return information about the Supplier_Router ACE_Module. +// Return information about the <Supplier_Router>. int Supplier_Router::info (char **strp, size_t length) const @@ -126,13 +138,17 @@ Supplier_Router::info (char **strp, size_t length) const if (this->context ()->acceptor ().get_local_addr (addr) == -1) return -1; - ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n", - mod_name, addr.get_port_number (), "tcp", - "# supplier router", this->is_reader () ? "reader" : "writer"); - + ACE_OS::sprintf (buf, + "%s\t %d/%s %s (%s)\n", + mod_name, + addr.get_port_number (), + "tcp", + "# supplier router", + this->is_reader () ? "reader" : "writer"); if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) return -1; else ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); } diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h index 42d067a6454..b83035cc957 100644 --- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h @@ -11,20 +11,33 @@ #include "Peer_Router.h" class Supplier_Router : public Peer_Router +{ // = TITLE // Provides the interface between one or more Suppliers and the // Event Server ACE_Stream. // // = DESCRIPTION - // When used on the "reader" side of a Stream, this Router Task - // simply forwards all messages up the stream. When used on the - // "writer" side, this Router Task queues up outgoing messages - // to suppliers and sends them in a separate thread. The reason - // for this is that it's really an "error" for a - // <Supplier_Router> to send messages to Suppliers, so we don't - // expect this to happen very much. when it does we use a - // separate thread to avoid taxing the main thread. -{ + // This class normally sits on "bottom" of the Stream and sends + // all messages coming from Suppliers via its "write" <Task> + // "upstream" to all the Consumers connected to the + // <Consumer_Router>. Normally, the messages flow up the + // stream to <Consumer_Router>s. However, if Consumers + // transmit data to the <Consumer_Router>, we dutifully push it + // out to the Suppliers via the <Supplier_Router>. + // + // When used on the "reader" side of a Stream, the + // <Supplier_Router> simply forwards all messages up the stream. + // When used on the "writer" side, the <Supplier_Router> queues + // up outgoing messages to suppliers and sends them in a + // separate thread. The reason for this is that it's really an + // "error" for a <Supplier_Router> to send messages to + // Suppliers, so we don't expect this to happen very much. When + // it does we use a separate thread to avoid taxing the main + // thread, which processes "normal" messages. + // + // All of these methods are called via base class pointers by + // the <ACE_Stream> apparatus. Therefore, we can put them in + // the protected section. public: Supplier_Router (Peer_Router_Context *prc); // Initialization method. @@ -32,10 +45,6 @@ public: protected: // = ACE_Task hooks. - // All of these methods are called via base class pointers by the - // ACE Stream apparatus. Therefore, we can put them in the - // protected section. - virtual int open (void *a = 0); // Called by the Stream to initialize the router. diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp index 6dba2c38283..4fa59126d64 100644 --- a/examples/ASX/Event_Server/Event_Server/event_server.cpp +++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp @@ -1,6 +1,6 @@ // $Id$ -// Test the event server. +// Main driver program for the event server example. #include "ace/Stream.h" #include "ace/Service_Config.h" @@ -9,142 +9,241 @@ #include "Event_Analyzer.h" #include "Supplier_Router.h" +// Typedef these components to handle multi-threading correctly. typedef ACE_Stream<ACE_SYNCH> MT_Stream; typedef ACE_Module<ACE_SYNCH> MT_Module; -class Quit_Handler : public ACE_Sig_Adapter - // = TITLE - // Handle SIGINT and terminate the entire application. +class Event_Server : public ACE_Sig_Adapter { + // = TITLE + // Run the logic for the <Event_Server>. + // + // = DESCRIPTION + // In addition to packaging the <Event_Server> components, this + // class also handles SIGINT and terminate the entire + // application process. There are several ways to terminate + // this application process: + // + // 1. Send a SIGINT signal (e.g., via ^C) + // 2. Type any character on the STDIN. + // + // Note that by inheriting from the <ACE_Sig_Adapter> we can + // shutdown the <ACE_Reactor> cleanly when a SIGINT is + // generated. public: - Quit_Handler (void); - virtual int handle_input (ACE_HANDLE fd); + Event_Server (void); + // Constructor. + + int svc (void); + // Run the event-loop for the event server. + +private: + virtual int handle_input (ACE_HANDLE handle); + // Hook method called back when a user types something into the + // STDIN in order to shut down the program. + + int configure_stream (void); + // Setup the plumbing in the stream. + + int set_watermarks (void); + // Set the high and low queue watermarks. + + int run_event_loop (void); + // Run the event-loop for the <Event_Server>. + + MT_Stream event_server_; + // The <ACE_Stream> that contains the <Event_Server> application + // <Modules>. }; -Quit_Handler::Quit_Handler (void) +Event_Server::Event_Server (void) : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop)) + // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is + // received. { - // Register to trap input from the user. + // Register to trap STDIN from the user. if (ACE::register_stdin_handler (this, ACE_Reactor::instance (), ACE_Thread_Manager::instance ()) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); + ACE_ERROR ((LM_ERROR, + "%p\n", + "register_stdin_handler")); // Register to trap the SIGINT signal. else if (ACE_Reactor::instance ()->register_handler (SIGINT, this) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); + ACE_ERROR ((LM_ERROR, + "%p\n", + "register_handler")); } int -Quit_Handler::handle_input (ACE_HANDLE) +Event_Server::handle_input (ACE_HANDLE) { // This code here will make sure we actually wait for the user to - // type something. On platforms like Win32, handle_input() is called + // type something. On platforms like Win32, <handle_input> is called // prematurely (even when there is no data). char temp_buffer [BUFSIZ]; - ACE_OS::read (ACE_STDIN, temp_buffer, sizeof (temp_buffer)); + + ssize_t n = ACE_OS::read (ACE_STDIN, + temp_buffer, + sizeof (temp_buffer)); + // This ought to be > 0, otherwise something very strange has + // happened!! + ACE_ASSERT (n > 0); Options::instance ()->stop_timer (); - ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n")); + + ACE_DEBUG ((LM_INFO, + "(%t) closing down the test\n")); Options::instance ()->print_results (); - ACE_Reactor::end_event_loop(); + ACE_Reactor::end_event_loop (); return -1; } int +Event_Server::configure_stream (void) +{ + Peer_Router_Context *src; + // Create the <Supplier_Router>'s routing context. This contains a + // context shared by both the write-side and read-side of the + // <Supplier_Router> Module. + ACE_NEW_RETURN (src, + Peer_Router_Context (Options::instance ()->supplier_port ()), + -1); + + MT_Module *srm = 0; + // Create the <Supplier_Router> module. + ACE_NEW_RETURN (srm, + MT_Module + ("Supplier_Router", + new Supplier_Router (src), + new Supplier_Router (src)), + -1); + + MT_Module *eam = 0; + // Create the <Event_Analyzer> module. + ACE_NEW_RETURN (eam, + MT_Module + ("Event_Analyzer", + new Event_Analyzer, + new Event_Analyzer), + -1); + + Peer_Router_Context *crc; + // Create the <Consumer_Router>'s routing context. This contains a + // context shared by both the write-side and read-side of the + // <Consumer_Router> Module. + ACE_NEW_RETURN (crc, + Peer_Router_Context (Options::instance ()->consumer_port ()), + -1); + + MT_Module *crm = 0; + // Create the <Consumer_Router> module. + ACE_NEW_RETURN (crm, + MT_Module + ("Consumer_Router", + new Consumer_Router (crc), + new Consumer_Router (crc)), + -1); + + // Push the Modules onto the event_server stream. + + if (this->event_server_.push (srm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push (Supplier_Router)"), + -1); + else if (this->event_server_.push (eam) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push (Event_Analyzer)"), + -1); + else if (this->event_server_.push (crm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push (Consumer_Router)"), + -1); + return 0; +} + +int +Event_Server::set_watermarks (void) +{ + // Set the high and low water marks appropriately. The water marks + // control how much data can be buffered before the queues are + // considered "full." + int wm = Options::instance ()->low_water_mark (); + + if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM, + &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "push (setting low watermark)"), + -1); + + wm = Options::instance ()->high_water_mark (); + if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM, + &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "push (setting high watermark)"), + -1); + return 0; +} + +int +Event_Server::run_event_loop (void) +{ + // Begin the timer. + Options::instance ()->start_timer (); + + // Perform the main event loop waiting for the user to type ^C or to + // enter a line on the ACE_STDIN. + + ACE_Reactor::run_event_loop (); + + // Close down the stream and call the <close> hooks on all the + // <ACE_Task>s in the various Modules in the Stream. + this->event_server_.close (); + + // Wait for the threads in the <Consumer_Router> and + // <Supplier_Router> to exit. + return ACE_Thread_Manager::instance ()->wait (); +} + +int +Event_Server::svc (void) +{ + if (this->configure_stream () == -1) + return -1; + else if (this->set_watermarks () == -1) + return -1; + else if (this->run_event_loop () == -1) + return -1; + else + return 0; +} + +int main (int argc, char *argv[]) { #if defined (ACE_HAS_THREADS) - ACE_Service_Config daemon; - Options::instance ()->parse_args (argc, argv); - { - // Primary ACE_Stream for EVENT_SERVER application. - MT_Stream event_server; - - // Enable graceful shutdowns... - Quit_Handler quit_handler; - - Peer_Router_Context *src; - // Create the Supplier_Router's routing context, which contains - // context shared by both the write-side and read-side of the - // Supplier_Router Module. - ACE_NEW_RETURN (src, - Peer_Router_Context (Options::instance ()->supplier_port ()), - -1); - - MT_Module *srm = 0; - // Create the Supplier Router module. - ACE_NEW_RETURN (srm, MT_Module - ("Supplier_Router", - new Supplier_Router (src), - new Supplier_Router (src)), - -1); - - MT_Module *eam = 0; - // Create the Event Analyzer module. - ACE_NEW_RETURN (eam, MT_Module - ("Event_Analyzer", - new Event_Analyzer, - new Event_Analyzer), - -1); - - Peer_Router_Context *crc; - // Create the Consumer_Router's routing context, which contains - // context shared by both the write-side and read-side of the - // Consumer_Router Module. - ACE_NEW_RETURN (crc, - Peer_Router_Context (Options::instance ()->consumer_port ()), - -1); - - MT_Module *crm = 0; - // Create the Consumer Router module. - ACE_NEW_RETURN (crm, MT_Module - ("Consumer_Router", - new Consumer_Router (crc), - new Consumer_Router (crc)), - -1); - - // Push the Modules onto the event_server stream. - - if (event_server.push (srm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1); - - if (event_server.push (eam) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1); - - if (event_server.push (crm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1); - - // Set the high and low water marks appropriately. - - int wm = Options::instance ()->low_water_mark (); - - if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1); - - wm = Options::instance ()->high_water_mark (); - if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1); - - Options::instance ()->start_timer (); - - // Perform the main event loop waiting for the user to type ^C or - // to enter a line on the ACE_STDIN. - - ACE_Reactor::run_event_loop (); - // The destructor of event_server will close down the stream and - // call the close() hooks on all the ACE_Tasks. - } - - // Wait for the threads to exit. - ACE_Thread_Manager::instance ()->wait (); - ACE_DEBUG ((LM_DEBUG, "exiting main\n")); + + // Initialize the <Event_Server>. + Event_Server event_server; + + // Run the event server's event-loop. + int result = event_server.svc (); + + ACE_DEBUG ((LM_DEBUG, + "exiting main\n")); + + return result; #else ACE_UNUSED_ARG (argc); ACE_UNUSED_ARG (argv); - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + ACE_ERROR_RETURN ((LM_ERROR, + "threads not supported on this platform\n"), + 1); #endif /* ACE_HAS_THREADS */ - return 0; } diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp index 787fd064803..148c4ede90f 100644 --- a/examples/ASX/Event_Server/Transceiver/transceiver.cpp +++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp @@ -9,66 +9,21 @@ #include "ace/SOCK_Connector.h" #include "ace/Get_Opt.h" -// Port number of event server. -static u_short port_number; - -// Name of event server. -static char *host_name; - -// Are we playing the Consumer ('C') or Supplier ('S') role? -static char role = 'S'; - -// Handle the command-line arguments. - -static void -parse_args (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "Ch:p:S"); - - port_number = ACE_DEFAULT_SERVER_PORT; - host_name = ACE_DEFAULT_SERVER_HOST; - - for (int c; (c = get_opt ()) != -1; ) - switch (c) - { - case 'C': - role = c; - break; - case 'h': - host_name = get_opt.optarg; - break; - case 'p': - port_number = ACE_OS::atoi (get_opt.optarg); - break; - case 'S': - role = c; - break; - default: - ACE_ERROR ((LM_ERROR, - "usage: %n [-p portnum] [-h host_name]\n%a", 1)); - /* NOTREACHED */ - break; - } - - // Increment by 1 if we're the supplier to mirror the default - // behavior of the Event_Server (which sets the Consumer port to - // ACE_DEFAULT_SERVER_PORT and the Supplier port to - // ACE_DEFAULT_SERVER_PORT + 1). - if (role == 'S' && port_number == ACE_DEFAULT_SERVER_PORT) - port_number++; -} - class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ // = TITLE // Generate and receives messages from the event server. // // = DESCRIPTION // This class is both a consumer and supplier of events, i.e., - // it is a ``transceiver.'' -{ + // it's a ``transceiver.'' public: // = Initialization method. + Event_Transceiver (int argc, char *argv[]); + // Performs the actual initialization. + Event_Transceiver (void); + // No-op constructor (required by the <ACE_Connector>). // = Svc_Handler hook called by the <ACE_Connector>. virtual int open (void *); @@ -88,15 +43,71 @@ private: int receiver (void); // Reads data from socket and writes to ACE_STDOUT. - int forwarder (void); + int transmitter (void); // Writes data from ACE_STDIN to socket. + + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. + + u_short port_number_; + // Port number of event server. + + char *host_name_; + // Name of event server. + + char *role_; + // Are we playing the Consumer or Supplier role? }; +// Handle the command-line arguments. + +int +Event_Transceiver::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "Ch:p:S"); + + this->port_number_ = ACE_DEFAULT_SERVER_PORT; + this->host_name_ = ACE_DEFAULT_SERVER_HOST; + this->role_ = "Supplier"; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'C': + this->role_ = "Consumer"; + break; + case 'h': + this->host_name_ = get_opt.optarg; + break; + case 'p': + this->port_number_ = ACE_OS::atoi (get_opt.optarg); + break; + case 'S': + this->role_ = "Supplier"; + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %n [-CS] [-h host_name] [-p portnum] \n"), + -1); + /* NOTREACHED */ + break; + } + + // Increment by 1 if we're the supplier to mirror the default + // behavior of the Event_Server (which sets the Consumer port to + // ACE_DEFAULT_SERVER_PORT and the Supplier port to + // ACE_DEFAULT_SERVER_PORT + 1). Note that this is kind of a + // hack... + if (ACE_OS::strcmp (this->role_, "Supplier") == 0 + && this->port_number_ == ACE_DEFAULT_SERVER_PORT) + this->port_number_++; +} + int Event_Transceiver::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { - ACE_Reactor::end_event_loop(); + ACE_Reactor::end_event_loop (); return 0; } @@ -107,41 +118,76 @@ Event_Transceiver::handle_signal (int signum, siginfo_t *, ucontext_t *) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) received signal %S\n", signum)); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) received signal %S\n", + signum)); - ACE_Reactor::end_event_loop(); + ACE_Reactor::end_event_loop (); return 0; } Event_Transceiver::Event_Transceiver (void) { - ACE_Sig_Set sig_set; - - sig_set.sig_add (SIGINT); - sig_set.sig_add (SIGQUIT); +} - if (ACE_Reactor::instance ()->register_handler - (sig_set, this) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); - - // We need to register <this> here before we're connected since - // otherwise <get_handle> will return the connection socket handle - // for the peer. - else if (ACE::register_stdin_handler (this, - ACE_Reactor::instance (), - ACE_Thread_Manager::instance ()) == -1) +Event_Transceiver::Event_Transceiver (int argc, char *argv[]) +{ + if (this->parse_args (argc, argv) == -1) ACE_ERROR ((LM_ERROR, - "%p\n", - "register_stdin_handler")); + "%p\n", + "parse_args")); + else + { + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + // Register to handle the SIGINT and SIGQUIT signals. + if (ACE_Reactor::instance ()->register_handler + (sig_set, + this) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "register_handler")); + + // We need to register <this> here before we're connected since + // otherwise <get_handle> will return the connection socket + // handle for the peer. + else if (ACE::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "register_stdin_handler")); + + // Address of the server. + ACE_INET_Addr server_addr (this->port_number_, + this->host_name_); + + ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector; + + // We need a pointer here because connect takes a reference to a + // pointer! + Event_Transceiver *etp = this; + + // Establish the connection to the Event Server. + if (connector.connect (etp, + server_addr) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + this->host_name_)); + } } int Event_Transceiver::open (void *) { - // Register ourselves to be notified when there's data on the - // socket. + // Register ourselves to be notified when there's data to read on + // the socket. if (ACE_Reactor::instance ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) + (this, + ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), @@ -152,18 +198,19 @@ Event_Transceiver::open (void *) int Event_Transceiver::handle_input (ACE_HANDLE handle) { + // Determine whether we play the role of a consumer or a supplier. if (handle == ACE_STDIN) - return this->forwarder (); + return this->transmitter (); else return this->receiver (); } - int -Event_Transceiver::forwarder (void) +Event_Transceiver::transmitter (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s forwarder\n", - role == 'C' ? "Consumer" : "Supplier")); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) entering %s transmitter\n", + this->role_)); char buf[BUFSIZ]; ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); @@ -172,45 +219,54 @@ Event_Transceiver::forwarder (void) if (n <= 0 || this->peer ().send_n (buf, n) != n) result = -1; - ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s forwarder\n", - role == 'C' ? "Consumer" : "Supplier")); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) leaving %s transmitter\n", + this->role_)); return result; } int Event_Transceiver::receiver (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s receiver\n", - role == 'C' ? "Consumer" : "Supplier")); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) entering %s receiver\n", + this->role_)); char buf[BUFSIZ]; ssize_t n = this->peer ().recv (buf, sizeof buf); int result = 0; - if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n) + if (n <= 0 + || ACE_OS::write (ACE_STDOUT, buf, n) != n) result = -1; - ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s receiver\n", - role == 'C' ? "Consumer" : "Supplier")); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) leaving %s receiver\n", + this->role_)); return result; } int main (int argc, char *argv[]) { - ACE_Service_Config daemon (argv[0]); - - parse_args (argc, argv); + if (ACE_Service_Config::open (argv[0]) == -1 + && errno != ENOENT) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "open"), + -1); - ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector; - Event_Transceiver transceiver, *tp = &transceiver; + // Create and initialize the transceiver. + Event_Transceiver transceiver (argc, argv); - ACE_INET_Addr server_addr (port_number, host_name); + // Demonstrate how we can check if a constructor failed... + if (ACE_LOG_MSG->op_status () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "Event_Transceiver constructor failed"), + -1); - // Establish the connection to the Event Server. - if (connector.connect (tp, server_addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1); // Run event loop until either the event server shuts down or we get // a SIGINT. |