summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp')
-rw-r--r--ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp138
1 files changed, 138 insertions, 0 deletions
diff --git a/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
new file mode 100644
index 00000000000..b9c9c0cf2bd
--- /dev/null
+++ b/ACE/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp
@@ -0,0 +1,138 @@
+// $Id$
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "Consumer_Router.h"
+#include "Options.h"
+
+ACE_RCSID(UPIPE_Event_Server, Consumer_Router, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
+
+int
+Consumer_Handler::open (void *a)
+{
+ CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
+ this->router_task_ = af->router ();
+ return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
+}
+
+Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
+ : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
+{
+}
+
+// Create a new handler that will interact with a consumer and point
+// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
+
+Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
+ : CONSUMER_ROUTER (tm)
+{
+}
+
+// Initialize the Router..
+
+int
+Consumer_Router::open (void *)
+{
+ ACE_ASSERT (this->is_reader ());
+ ACE_TCHAR *argv[3];
+
+ argv[0] = (ACE_TCHAR *) this->name ();
+ argv[1] = (ACE_TCHAR *) options.consumer_file ();
+ argv[2] = 0;
+
+ if (this->init (1, &argv[1]) == -1)
+ return -1;
+
+ // Make this an active object.
+ // return this->activate (options.t_flags ());
+
+ // Until that's done, return 1 to indicate that the object wasn't activated.
+ return 1;
+}
+
+int
+Consumer_Router::close (u_long)
+{
+ ACE_ASSERT (this->is_reader ());
+ this->peer_map_.close ();
+ this->msg_queue ()->deactivate();
+ return 0;
+}
+
+
+// Handle incoming messages in a separate thread..
+
+int
+Consumer_Router::svc (void)
+{
+ ACE_Message_Block *mb = 0;
+
+ ACE_ASSERT (this->is_reader ());
+
+ if (options.debug ())
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting svc in %s\n"),
+ this->name ()));
+
+ while (this->getq (mb) > 0)
+ if (this->put_next (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) put_next failed in %s\n"),
+ this->name ()), -1);
+
+ return 0;
+ // Note the implicit ACE_OS::thr_exit() via destructor.
+}
+
+// Send a MESSAGE_BLOCK to the supplier(s)..
+
+int
+Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ ACE_ASSERT (this->is_reader ());
+
+ if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
+ {
+ this->control (mb);
+ return this->put_next (mb);
+ }
+ else
+{
+//printf("consumer-Router is routing : send_peers\n");
+ return this->send_peers (mb);
+}
+}
+
+// Return information about the Client_Router ACE_Module..
+
+int
+Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_UPIPE_Addr addr;
+ const ACE_TCHAR *mod_name = this->name ();
+ ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
+
+ if (sa.get_local_addr (addr) == -1)
+ return -1;
+
+#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR)
+# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls")
+#else
+# define FMTSTR ACE_TEXT ("%s\t %s/ %s")
+#endif
+
+ ACE_OS::sprintf (buf, FMTSTR,
+ mod_name, ACE_TEXT ("upipe"),
+ ACE_TEXT ("# consumer router\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, mod_name, length);
+ return ACE_OS::strlen (mod_name);
+}
+
+#endif /* ACE_HAS_THREADS */