diff options
Diffstat (limited to 'examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp')
-rw-r--r-- | examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp | 133 |
1 files changed, 67 insertions, 66 deletions
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp index f9450fd99e1..a9f4ebb8521 100644 --- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp @@ -1,33 +1,9 @@ -#include "Supplier_Router.h" // $Id$ +#include "Supplier_Router.h" #include "Options.h" -#if defined (ACE_HAS_THREADS) - -typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY; - -int -Supplier_Handler::open (void *a) -{ - SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a; - this->router_task_ = af->router (); - return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a); -} - -Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm) - : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm) -{ -} - -// Create a new router and associate it with the REACTOR parameter. - -Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm) - : SUPPLIER_ROUTER (tm) -{ -} - -// Handle incoming messages in a separate thread. +// Handle outgoing messages in a separate thread. int Supplier_Router::svc (void) @@ -35,14 +11,19 @@ Supplier_Router::svc (void) assert (this->is_writer ()); ACE_Thread_Control tc (this->thr_mgr ()); - ACE_Message_Block *mb = 0; ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n")); - while (this->getq (mb) >= 0) + for (ACE_Message_Block *mb = 0; + this->getq (mb) >= 0; + ) { - ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n")); - if (this->send_peers (mb) == -1) + ACE_DEBUG ((LM_DEBUG, + "(%t) warning: Supplier_Router is forwarding a message via send_peers\n")); + + // Broadcast the message to the Suppliers. + + if (this->context ()->send_peers (mb) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) send_peers failed in Supplier_Router\n"), -1); @@ -54,25 +35,26 @@ Supplier_Router::svc (void) // destructor. } -// Initialize the Router. +Supplier_Router::Supplier_Router (Peer_Router_Context *prc) + : Peer_Router (prc) +{ +} + +// Initialize the Supplier Router. int Supplier_Router::open (void *) { - assert (this->is_writer ()); - - char *argv[4]; - - argv[0] = (char *) this->name (); - argv[1] = "-p"; - argv[2] = options.supplier_port (); - argv[3] = 0; - - if (this->init (2, &argv[1]) == -1) - return -1; - - // Make this an active object. - return this->activate (options.t_flags ()); + if (this->is_reader ()) + // Set the Peer_Router_Context to point back to us so that all the + // Peer_Handler's <put> their incoming <Message_Blocks> to our + // reader Task. + this->context ()->peer_router (this); + + else // if (this->is_writer () + // Make this an active object to handle the error cases in a + // separate thread. + return this->activate (Options::instance ()->t_flags ()); } // Close down the router. @@ -80,31 +62,53 @@ Supplier_Router::open (void *) int Supplier_Router::close (u_long) { - assert (this->is_writer ()); - ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n")); - this->peer_map_.close (); + ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n", + this->is_reader () ? "reader" : "writer")); - // Inform the thread to shut down. - this->msg_queue ()->deactivate (); + if (this->is_writer ()) + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + + // Both writer and reader call release(), so the context knows when + // to clean itself up. + this->context ()->release (); return 0; } -// Send a MESSAGE_BLOCK to the supplier(s). +// Send an <ACE_Message_Block> to the supplier(s). int -Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +Supplier_Router::put (ACE_Message_Block *mb, + ACE_Time_Value *) { - assert (this->is_writer ()); + // Perform the necessary control operations before passing + // the message up the stream. if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) { this->control (mb); return this->put_next (mb); } - else - // Queue up the message, which will be processed by - // Supplier_Router::svc(). - return this->putq (mb); + + // If we're the reader then we are responsible for pass messages up + // to the next Module's writer Task. + + else if (this->is_reader ()) + return this->put_next (mb); + else // if (this->is_writer ()) + { + // Someone is trying to write to the Supplier. In this + // implementation this is considered an error. However, we'll + // just go ahead and forward the message to the Supplier (who + // hopefully is prepared to receive it). + ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n")); + + // Queue up the message to processed by Supplier_Router::svc(). + // Since we don't expect to be getting many of these messages, + // we queue them up and run them in a separate thread to avoid + // taxing the main thread. + return this->putq (mb); + } } // Return information about the Supplier_Router ACE_Module. @@ -112,17 +116,16 @@ Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) int Supplier_Router::info (char **strp, size_t length) const { - char buf[BUFSIZ]; - ACE_INET_Addr addr; + char buf[BUFSIZ]; + ACE_INET_Addr addr; const char *mod_name = this->name (); - ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor (); - if (sa.get_local_addr (addr) == -1) + if (this->context ()->acceptor ().get_local_addr (addr) == -1) return -1; - ACE_OS::sprintf (buf, "%s\t %d/%s %s", - mod_name, addr.get_port_number (), "tcp", - "# supplier router\n"); + ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n", + mod_name, addr.get_port_number (), "tcp", + "# supplier router", this->is_reader () ? "reader" : "writer"); if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) return -1; @@ -130,5 +133,3 @@ Supplier_Router::info (char **strp, size_t length) const ACE_OS::strncpy (*strp, mod_name, length); return ACE_OS::strlen (mod_name); } - -#endif /* ACE_HAS_THREADS */ |