summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Ntalker/ntalker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Reactor/Ntalker/ntalker.cpp')
-rw-r--r--ACE/examples/Reactor/Ntalker/ntalker.cpp231
1 files changed, 231 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Ntalker/ntalker.cpp b/ACE/examples/Reactor/Ntalker/ntalker.cpp
new file mode 100644
index 00000000000..80873ead1a9
--- /dev/null
+++ b/ACE/examples/Reactor/Ntalker/ntalker.cpp
@@ -0,0 +1,231 @@
+// $Id$
+
+// Listens to multicast address. After first message received, will
+// listen for 5 more seconds. Prints Mbits/sec received from client.
+
+#include "ace/OS_main.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Dgram_Mcast.h"
+#include "ace/Reactor.h"
+#include "ace/Get_Opt.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Service_Config.h"
+
+ACE_RCSID(Ntalker, ntalker, "$Id$")
+
+#if defined (ACE_HAS_IP_MULTICAST)
+// Network interface to subscribe to. This is hardware specific. use
+// netstat(1M) to find whether your interface is le0 or ie0
+
+static const ACE_TCHAR *INTERFACE = 0;
+static const char *MCAST_ADDR = ACE_DEFAULT_MULTICAST_ADDR;
+static const u_short UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
+
+class Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // Handle both multicast and stdin events.
+public:
+ // = Initialization and termination methods.
+ Handler (u_short udp_port,
+ const char *ip_addr,
+ const ACE_TCHAR *a_interface,
+ ACE_Reactor & );
+ // Constructor.
+
+ ~Handler (void);
+ // Destructor.
+
+ // Event demuxer hooks.
+ virtual int handle_input (ACE_HANDLE);
+ virtual int handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask);
+ virtual ACE_HANDLE get_handle (void) const;
+
+private:
+ ACE_SOCK_Dgram_Mcast mcast_;
+ // Multicast wrapper.
+};
+
+ACE_HANDLE
+Handler::get_handle (void) const
+{
+ return this->mcast_.get_handle ();
+}
+
+int
+Handler::handle_input (ACE_HANDLE h)
+{
+ char buf[BUFSIZ];
+
+ if (h == ACE_STDIN)
+ {
+ ssize_t result = ACE_OS::read (h, buf, BUFSIZ);
+
+ if (result > 0)
+ {
+ if (this->mcast_.send (buf, result) != result)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "send error"),
+ -1);
+ return 0;
+ }
+ else if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "can't read from STDIN"),
+ -1);
+ else // result == 0
+ {
+ ACE_Reactor::end_event_loop ();
+ return -1;
+ }
+ }
+ else
+ {
+ ACE_INET_Addr remote_addr;
+
+ // Receive message from multicast group.
+ ssize_t result = this->mcast_.recv (buf,
+ sizeof buf,
+ remote_addr);
+
+ if (result != -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "received datagram from host %s on port %d bytes = %d\n",
+ remote_addr.get_host_name (),
+ remote_addr.get_port_number (),
+ result));
+ ACE_OS::write (ACE_STDERR, buf, result);
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "something amiss"),
+ -1);
+ }
+}
+
+int
+Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
+{
+ if (h == ACE_STDIN)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "STDIN_Events handle removed from reactor.\n"));
+ if (ACE_Reactor::instance ()->remove_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "remove_handler"),
+ -1);
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Mcast_Events handle removed from reactor.\n"));
+ return 0;
+}
+
+Handler::~Handler (void)
+{
+ if (this->mcast_.unsubscribe () == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "unsubscribe fails"));
+}
+
+Handler::Handler (u_short udp_port,
+ const char *ip_addr,
+ const ACE_TCHAR *a_interface,
+ ACE_Reactor &reactor)
+{
+ // Create multicast address to listen on.
+
+ ACE_INET_Addr sockmc_addr (udp_port, ip_addr);
+
+ // subscribe to multicast group.
+
+ if (this->mcast_.subscribe (sockmc_addr, 1, a_interface) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "can't subscribe to multicast group"));
+ }
+ // Disable loopbacks.
+ // if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 )
+ // ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1);
+
+ // Register callbacks with the ACE_Reactor.
+ else if (reactor.register_handler (this->mcast_.get_handle (),
+ this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "can't register with Reactor\n"));
+ // Register the STDIN handler.
+ else if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "register_stdin_handler"));
+}
+
+static void
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("i:u"));
+
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 'i':
+ INTERFACE = get_opt.opt_arg ();
+ break;
+ case 'u':
+ // Usage fallthrough.
+ default:
+ ACE_DEBUG ((LM_DEBUG,
+ "%s -i interface\n",
+ argv[0]));
+ ACE_OS::exit (1);
+ }
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ parse_args (argc, argv);
+
+ Handler handler (UDP_PORT,
+ MCAST_ADDR,
+ INTERFACE,
+ *ACE_Reactor::instance ());
+
+ // Run the event loop.
+ ACE_Reactor::run_event_loop ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "talker Done.\n"));
+ return 0;
+}
+
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *argv[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error: %s must be run on a platform that support IP multicast\n",
+ argv[0]),
+ 0);
+}
+#endif /* ACE_HAS_IP_MULTICAST */
+