summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp')
-rw-r--r--ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp165
1 files changed, 165 insertions, 0 deletions
diff --git a/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
new file mode 100644
index 00000000000..72c43ee6312
--- /dev/null
+++ b/ACE/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -0,0 +1,165 @@
+// $Id$
+
+#include "ace/os_include/os_assert.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Supplier_Router.h"
+#include "Options.h"
+
+ACE_RCSID(Event_Server, Supplier_Router, "$Id$")
+
+// Handle outgoing messages in a separate thread.
+
+int
+Supplier_Router::svc (void)
+{
+ assert (this->is_writer ());
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
+
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Supplier_Router is "
+ "forwarding a message via send_peers\n"));
+
+ // Broadcast the message to the Suppliers, even though this is
+ // "incorrect" (assuming a oneway flow of events from Suppliers
+ // to Consumers)!
+
+ if (this->context ()->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Supplier_Router\n"),
+ -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) stopping svc in Supplier_Router\n"));
+ return 0;
+}
+
+Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+ // Increment the reference count.
+ this->context ()->duplicate ();
+}
+
+// Initialize the Supplier Router.
+
+int
+Supplier_Router::open (void *)
+{
+ if (this->is_reader ())
+ {
+ // Set the <Peer_Router_Context> to point back to us so that all
+ // the Peer_Handler's <put> their incoming <Message_Blocks> to
+ // our reader Task.
+ this->context ()->peer_router (this);
+ return 0;
+ }
+
+ else // if (this->is_writer ()
+ {
+ // Increment the reference count.
+ this->context ()->duplicate ();
+
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
+ }
+}
+
+// Close down the router.
+
+int
+Supplier_Router::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing Supplier_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
+
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
+ return 0;
+}
+
+// Send an <ACE_Message_Block> to the supplier(s).
+
+int
+Supplier_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
+{
+ // Perform the necessary control operations before passing
+ // the message up the stream.
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+
+ // If we're the reader then we are responsible for pass messages up
+ // to the next Module's reader Task. Note that in a "real"
+ // application this is likely where we'd take a look a the actual
+ // information that was in the message, e.g., in order to figure out
+ // what operation it was and what it's "parameters" where, etc.
+ else if (this->is_reader ())
+ return this->put_next (mb);
+
+ else // if (this->is_writer ())
+ {
+ // Someone is trying to write to the Supplier. In this
+ // implementation this is considered an "error." However, we'll
+ // just go ahead and forward the message to the Supplier (who
+ // hopefully is prepared to receive it).
+ ACE_DEBUG ((LM_WARNING,
+ "(%t) warning: sending to a Supplier\n"));
+
+ // Queue up the message to processed by <Supplier_Router::svc>.
+ // Since we don't expect to be getting many of these messages,
+ // we queue them up and run them in a separate thread to avoid
+ // taxing the main thread.
+ return this->putq (mb);
+ }
+}
+
+// Return information about the <Supplier_Router>.
+#if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%s\t %d/%s %s (%s)\n")
+#else
+# define FMTSTR ACE_TEXT ("%ls\t %d/%ls %ls (%ls)\n")
+#endif /* ACE_WIN32 || !ACE_USES_WCHAR */
+
+int
+Supplier_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_INET_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
+ return -1;
+
+ ACE_OS::sprintf (buf,
+ FMTSTR,
+ mod_name,
+ addr.get_port_number (),
+ ACE_TEXT ("tcp"),
+ ACE_TEXT ("# supplier router"),
+ this->is_reader () ?
+ ACE_TEXT ("reader") : ACE_TEXT ("writer"));
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+
+ return ACE_OS::strlen (mod_name);
+}