summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp')
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp133
1 files changed, 67 insertions, 66 deletions
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
index f9450fd99e1..a9f4ebb8521 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -1,33 +1,9 @@
-#include "Supplier_Router.h"
// $Id$
+#include "Supplier_Router.h"
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
-
-typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
-
-int
-Supplier_Handler::open (void *a)
-{
- SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
- this->router_task_ = af->router ();
- return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
-}
-
-Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
- : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
-{
-}
-
-// Create a new router and associate it with the REACTOR parameter.
-
-Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
- : SUPPLIER_ROUTER (tm)
-{
-}
-
-// Handle incoming messages in a separate thread.
+// Handle outgoing messages in a separate thread.
int
Supplier_Router::svc (void)
@@ -35,14 +11,19 @@ Supplier_Router::svc (void)
assert (this->is_writer ());
ACE_Thread_Control tc (this->thr_mgr ());
- ACE_Message_Block *mb = 0;
ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
- while (this->getq (mb) >= 0)
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
{
- ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n"));
- if (this->send_peers (mb) == -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Supplier_Router is forwarding a message via send_peers\n"));
+
+ // Broadcast the message to the Suppliers.
+
+ if (this->context ()->send_peers (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Supplier_Router\n"),
-1);
@@ -54,25 +35,26 @@ Supplier_Router::svc (void)
// destructor.
}
-// Initialize the Router.
+Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+}
+
+// Initialize the Supplier Router.
int
Supplier_Router::open (void *)
{
- assert (this->is_writer ());
-
- char *argv[4];
-
- argv[0] = (char *) this->name ();
- argv[1] = "-p";
- argv[2] = options.supplier_port ();
- argv[3] = 0;
-
- if (this->init (2, &argv[1]) == -1)
- return -1;
-
- // Make this an active object.
- return this->activate (options.t_flags ());
+ if (this->is_reader ())
+ // Set the Peer_Router_Context to point back to us so that all the
+ // Peer_Handler's <put> their incoming <Message_Blocks> to our
+ // reader Task.
+ this->context ()->peer_router (this);
+
+ else // if (this->is_writer ()
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
}
// Close down the router.
@@ -80,31 +62,53 @@ Supplier_Router::open (void *)
int
Supplier_Router::close (u_long)
{
- assert (this->is_writer ());
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n"));
- this->peer_map_.close ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
return 0;
}
-// Send a MESSAGE_BLOCK to the supplier(s).
+// Send an <ACE_Message_Block> to the supplier(s).
int
-Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Supplier_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
- assert (this->is_writer ());
+ // Perform the necessary control operations before passing
+ // the message up the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
- else
- // Queue up the message, which will be processed by
- // Supplier_Router::svc().
- return this->putq (mb);
+
+ // If we're the reader then we are responsible for pass messages up
+ // to the next Module's writer Task.
+
+ else if (this->is_reader ())
+ return this->put_next (mb);
+ else // if (this->is_writer ())
+ {
+ // Someone is trying to write to the Supplier. In this
+ // implementation this is considered an error. However, we'll
+ // just go ahead and forward the message to the Supplier (who
+ // hopefully is prepared to receive it).
+ ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n"));
+
+ // Queue up the message to processed by Supplier_Router::svc().
+ // Since we don't expect to be getting many of these messages,
+ // we queue them up and run them in a separate thread to avoid
+ // taxing the main thread.
+ return this->putq (mb);
+ }
}
// Return information about the Supplier_Router ACE_Module.
@@ -112,17 +116,16 @@ Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
int
Supplier_Router::info (char **strp, size_t length) const
{
- char buf[BUFSIZ];
- ACE_INET_Addr addr;
+ char buf[BUFSIZ];
+ ACE_INET_Addr addr;
const char *mod_name = this->name ();
- ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
- if (sa.get_local_addr (addr) == -1)
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s",
- mod_name, addr.get_port_number (), "tcp",
- "# supplier router\n");
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
+ mod_name, addr.get_port_number (), "tcp",
+ "# supplier router", this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
@@ -130,5 +133,3 @@ Supplier_Router::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */