summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Multicast/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Reactor/Multicast/server.cpp')
-rw-r--r--ACE/examples/Reactor/Multicast/server.cpp247
1 files changed, 247 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Multicast/server.cpp b/ACE/examples/Reactor/Multicast/server.cpp
new file mode 100644
index 00000000000..65e39b97d1a
--- /dev/null
+++ b/ACE/examples/Reactor/Multicast/server.cpp
@@ -0,0 +1,247 @@
+// $Id$
+
+// server.cpp (written by Tim Harrison)
+
+// Listens to multicast address for client log messages. Prints
+// Mbits/sec received from client.
+
+#include "ace/OS_main.h"
+#include "ace/SOCK_Dgram.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Dgram_Mcast.h"
+#include "ace/Reactor.h"
+#include "ace/Log_Msg.h"
+#include "Log_Wrapper.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/os_include/os_netdb.h"
+
+ACE_RCSID(Multicast, server, "$Id$")
+
+#if defined (ACE_HAS_IP_MULTICAST)
+class Server_Events : public ACE_Event_Handler
+{
+public:
+ Server_Events (u_short port,
+ const char *mcast_addr,
+ long time_interval = 0);
+ ~Server_Events (void);
+
+ virtual int handle_input (ACE_HANDLE fd);
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg);
+
+ virtual ACE_HANDLE get_handle (void) const;
+
+ ACE_Time_Value *wait_time (void);
+
+private:
+ char *message_;
+ Log_Wrapper::Log_Record *log_record_;
+ char buf_[4 * BUFSIZ];
+ char hostname_[MAXHOSTNAMELEN];
+
+ int initialized_;
+ int count_;
+ long interval_;
+ // time interval to log messages
+
+ ACE_Time_Value *how_long_;
+ ACE_Reactor *reactor_;
+ ACE_SOCK_Dgram_Mcast mcast_dgram_;
+ ACE_INET_Addr remote_addr_;
+ ACE_INET_Addr mcast_addr_;
+
+ // = statistics on messages received
+ double total_bytes_received_;
+ int total_messages_received_;
+ int last_sequence_number_;
+};
+
+static const char MCAST_ADDR[] = ACE_DEFAULT_MULTICAST_ADDR;
+static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
+static const int DURATION = 5;
+
+ACE_HANDLE
+Server_Events::get_handle (void) const
+{
+ return this->mcast_dgram_.get_handle ();
+}
+
+ACE_Time_Value *
+Server_Events::wait_time (void)
+{
+ return this->how_long_;
+}
+
+Server_Events::Server_Events (u_short port,
+ const char *mcast_addr,
+ long time_interval)
+ : initialized_ (0),
+ count_ (1),
+ interval_ (time_interval),
+ mcast_addr_ (port, mcast_addr),
+ total_bytes_received_ (0)
+{
+ // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
+
+ if (ACE_OS::hostname (this->hostname_,
+ MAXHOSTNAMELEN) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "hostname"));
+
+ else if (this->mcast_dgram_.subscribe (this->mcast_addr_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "subscribe"));
+ else
+ {
+ // Point to NULL so that we block in the beginning.
+ this->how_long_ = 0;
+
+ this->log_record_ = (Log_Wrapper::Log_Record *) &buf_;
+ this->message_ = &buf_[sizeof (Log_Wrapper::Log_Record)];
+ }
+}
+
+// A destructor that emacs refuses to color blue ;-)
+
+Server_Events::~Server_Events (void)
+{
+ this->mcast_dgram_.unsubscribe ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "total bytes received = %d after %d second\n",
+ this->total_bytes_received_,
+ this->interval_));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Mbits/sec = %.2f\n",
+ (float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_))));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "last sequence number = %d\ntotal messages received = %d\ndiff = %d\n",
+ this->last_sequence_number_,
+ this->total_messages_received_,
+ this->last_sequence_number_ - total_messages_received_));
+}
+
+int
+Server_Events::handle_timeout (const ACE_Time_Value &,
+ const void *arg)
+{
+ ACE_DEBUG ((LM_DEBUG, "\t%d timeout%s occurred for %s.\n",
+ this->count_,
+ this->count_ == 1 ? "" : "s",
+ (char *) arg));
+
+ // Don't let the timeouts continue if there's no activity since
+ // otherwise we use up a lot of CPU time unnecessarily.
+ if (this->count_ == 5)
+ {
+ reactor ()->cancel_timer (this);
+ this->initialized_ = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\tcancelled timeout for %s to avoid busy waiting.\n",
+ (char *) arg));
+ }
+
+ this->count_++;
+ return 0;
+}
+
+int
+Server_Events::handle_input (ACE_HANDLE)
+{
+ // Receive message from multicast group.
+ iovec iovp[2];
+ iovp[0].iov_base = buf_;
+ iovp[0].iov_len = sizeof (log_record_);
+ iovp[1].iov_base = &buf_[sizeof (log_record_)];
+ iovp[1].iov_len = 4 * BUFSIZ - sizeof (log_record_);
+
+ ssize_t retcode =
+ this->mcast_dgram_.recv (iovp,
+ 2,
+ this->remote_addr_);
+ if (retcode != -1)
+ {
+ total_messages_received_++;
+ total_bytes_received_ += retcode;
+ last_sequence_number_ =
+ ntohl (log_record_->sequence_number);
+
+ for (char *message_end = this->message_ + ACE_OS::strlen (this->message_) - 1;
+ ACE_OS::strchr ("\r\n \t", *message_end) != 0;
+ )
+ {
+ *message_end-- = '\0';
+ if (message_end == this->message_)
+ break;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "sequence number = %d\n",
+ last_sequence_number_));
+ ACE_DEBUG ((LM_DEBUG,
+ "message = '%s'\n",
+ this->message_));
+
+ if (this->initialized_ == 0)
+ {
+ // Restart the timer since we've received events again.
+ if (reactor()->schedule_timer (this,
+ (void *) this->hostname_,
+ ACE_Time_Value::zero,
+ ACE_Time_Value (DURATION)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "schedule_timer"),
+ -1);
+ this->initialized_ = 1;
+ }
+
+ this->count_ = 1;
+ return 0;
+ }
+ else
+ return -1;
+}
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Instantiate a server which will receive messages for DURATION
+ // seconds.
+ Server_Events server_events (UDP_PORT,
+ MCAST_ADDR,
+ DURATION);
+ // Instance of the ACE_Reactor.
+ ACE_Reactor reactor;
+
+ if (reactor.register_handler (&server_events,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n%a",
+ "register_handler",
+ 1));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "starting up server\n"));
+
+ for (;;)
+ reactor.handle_events (server_events.wait_time ());
+
+ ACE_NOTREACHED (return 0;)
+}
+#else
+int
+main (int, char *argv[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error: %s must be run on a platform that support IP multicast\n",
+ argv[0]), -1);
+}
+#endif /* ACE_HAS_IP_MULTICAST */