From 4b6dfea597deaa86cd1a0a13f9c1231bae598724 Mon Sep 17 00:00:00 2001 From: schmidt Date: Sat, 22 Mar 1997 17:24:12 +0000 Subject: foo --- ChangeLog-97a | 8 ++ ace/config-sunos4-sun4.x.h | 3 +- examples/Reactor/Multicast/Log_Wrapper.cpp | 29 ++++---- examples/Reactor/Multicast/Log_Wrapper.h | 9 ++- examples/Reactor/Multicast/client.cpp | 58 ++++++++------- examples/Reactor/Multicast/server.cpp | 114 +++++++++++++++++++---------- 6 files changed, 137 insertions(+), 84 deletions(-) diff --git a/ChangeLog-97a b/ChangeLog-97a index 045da33e39d..89c0846b4a9 100644 --- a/ChangeLog-97a +++ b/ChangeLog-97a @@ -12,6 +12,14 @@ Sat Mar 22 07:30:49 1997 David L. Levine Fri Mar 21 15:10:59 1997 Douglas C. Schmidt + * ace/Makefile (FILES): Moved Log_Msg to the beginning of the + source list in the ace Makefile. This is nice because it forces + the compilation of OS.i and that, obviously, is where you get + the complaints for the port. It's nice to have it at the + beginning so you catch the errors very early on before you've + compiled 30 or 40 files. Thanks to Scott Halstead + for this suggestion. + * ace/Memory_Pool.cpp: Added #if !defined (ACE_LACKS_SYSV_SHMEM) to handle systems like Chorus that lack this feature. diff --git a/ace/config-sunos4-sun4.x.h b/ace/config-sunos4-sun4.x.h index 74c1f7bd554..051ad8cdade 100644 --- a/ace/config-sunos4-sun4.x.h +++ b/ace/config-sunos4-sun4.x.h @@ -8,7 +8,8 @@ #define ACE_CONFIG_H #define ACE_HAS_CHARPTR_SPRINTF -#define ACE_HAS_UNION_WAIT +#define ACE_LACKS_POSIX_PROTO +// #define ACE_HAS_UNION_WAIT // Platform supports System V IPC (most versions of UNIX, but not Win32) #define ACE_HAS_SYSV_IPC diff --git a/examples/Reactor/Multicast/Log_Wrapper.cpp b/examples/Reactor/Multicast/Log_Wrapper.cpp index 38b4cdea09f..b7306b36371 100644 --- a/examples/Reactor/Multicast/Log_Wrapper.cpp +++ b/examples/Reactor/Multicast/Log_Wrapper.cpp @@ -2,11 +2,12 @@ // client.C + #include "Log_Wrapper.h" Log_Wrapper::Log_Wrapper (void) { - this->log_msg_.sequence_number = 0; + sequence_number_ = 0; this->log_msg_.app_id = ACE_OS::getpid (); } @@ -29,17 +30,14 @@ Log_Wrapper::open (const int port, const char *mcast_addr) return -1; else ACE_OS::memcpy ((char *) &this->log_msg_.host, - (char *) host_info->h_addr, - host_info->h_length); + (char *) host_info->h_addr, + host_info->h_length); // This starts out initialized to all zeros! - ACE_INET_Addr sockdg_addr; + server_ = ACE_INET_Addr(port, mcast_addr); - if (this->logger_.open (sockdg_addr) == -1) - return -1; - - if (this->server_.set (port, mcast_addr) == -1) - return -1; + if (logger_.subscribe (server_) == -1) + perror("can't subscribe to multicast group"), exit(1); // success. return 0; @@ -52,17 +50,20 @@ Log_Wrapper::open (const int port, const char *mcast_addr) int Log_Wrapper::log_message (ACE_Log_Priority type, char *message) { - this->log_msg_.type = type; this->log_msg_.time = time (0); - this->log_msg_.msg_length = strlen(message); - this->log_msg_.sequence_number++; + sequence_number_++; + + this->log_msg_.type = type; + this->log_msg_.time = time (0); + this->log_msg_.msg_length = strlen(message)+1; + this->log_msg_.sequence_number = htonl(sequence_number_); - iovec *iovp = new iovec[2]; + iovec *iovp = new iovec[2]; iovp[0].iov_base = (char *) &log_msg_; iovp[0].iov_len = sizeof log_msg_; iovp[1].iov_base = message; iovp[1].iov_len = log_msg_.msg_length; - logger_.send (iovp, 2, server_); + logger_.send (iovp, 2); delete iovp; diff --git a/examples/Reactor/Multicast/Log_Wrapper.h b/examples/Reactor/Multicast/Log_Wrapper.h index 7fb0e78ceaa..f560bb95f5f 100644 --- a/examples/Reactor/Multicast/Log_Wrapper.h +++ b/examples/Reactor/Multicast/Log_Wrapper.h @@ -7,7 +7,7 @@ #include "ace/Profile_Timer.h" #include "ace/INET_Addr.h" -#include "ace/SOCK_Dgram.h" +#include "ace/SOCK_Dgram_Mcast.h" #if !defined (_LM_WRAPPER_H) #define _LM_WRAPPER_H @@ -37,7 +37,7 @@ public: // = Format of the logging record. struct ACE_Log_Record { - unsigned long sequence_number; + u_long sequence_number; ACE_Log_Priority type; long host; long time; @@ -49,10 +49,13 @@ private: ACE_INET_Addr server_; // Server address where records are logged. + u_long sequence_number_; + // Keep track of the sequence. + ACE_Log_Record log_msg_; // One record used for many log messages. - ACE_SOCK_Dgram logger_; + ACE_SOCK_Dgram_Mcast logger_; // A logger object. }; diff --git a/examples/Reactor/Multicast/client.cpp b/examples/Reactor/Multicast/client.cpp index 7a116083336..2147fd18d49 100644 --- a/examples/Reactor/Multicast/client.cpp +++ b/examples/Reactor/Multicast/client.cpp @@ -4,6 +4,8 @@ // Log_Wrapper. +#include "ace/Get_Opt.h" + #include "Log_Wrapper.h" const char *MCAST_ADDR = ACE_DEFAULT_MULTICAST_ADDR; @@ -19,24 +21,24 @@ static int iterations = 0; static void parse_args (int argc, char *argv[]) { - int c; - ACE_LOG_MSG->open (argv[0]); - while ((c = ACE_OS::getopt (argc, argv, "m:ui:")) != -1) + ACE_Get_Opt getopt (argc, argv, "m:ui:", 1); // Start at argv[1] + + for (int c; (c = getopt ()) != -1; ) switch (c) { case 'm': - max_message_size = ACE_OS::atoi (optarg) * BUFSIZ; + max_message_size = ACE_OS::atoi (getopt.optarg) * BUFSIZ; break; case 'i': - iterations = ACE_OS::atoi (optarg); - break; + iterations = ACE_OS::atoi (getopt.optarg); + break; case 'u': - // usage fallthrough + // usage fallthrough default: ACE_ERROR ((LM_ERROR, "%n: -m max_message_size (in k) -i iterations\n%a", 1)); - /* NOTREACHED */ + /* NOTREACHED */ } } @@ -66,8 +68,8 @@ main (int argc, char **argv) { ACE_OS::memset (buf,1,::max_message_size); while (iterations--) - if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) - perror("log failed."), exit(1); + if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) + perror("log failed."), exit(1); } // otherwise, a file has been redirected, or give prompts @@ -75,29 +77,29 @@ main (int argc, char **argv) { // If a file has been redirected, don't activate user prompts if (ACE_OS::isatty (0)) - user_prompt = 1; + user_prompt = 1; else - user_prompt = 0; + user_prompt = 0; int nbytes; // continually read messages from stdin and log them. while (1) - { - if (user_prompt) - ACE_DEBUG ((LM_DEBUG, "\nEnter message ('Q':quit):\n")); - - if ((nbytes = read (0, buf, max_message_size)) == 0) - break; // end of file - buf[nbytes] = '\0'; - - // quitting? - if (buf[0] == 'Q') - break; - - // send the message to the logger - else if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) - perror("log failed."), exit(1); - } // while(1) + { + if (user_prompt) + ACE_DEBUG ((LM_DEBUG, "\nEnter message ('Q':quit):\n")); + + if ((nbytes = read (0, buf, max_message_size)) == 0) + break; // end of file + buf[nbytes-1] = '\0'; + + // quitting? + if (buf[0] == 'Q') + break; + + // send the message to the logger + else if (log.log_message (Log_Wrapper::LM_DEBUG, buf) == -1) + perror("log failed."), exit(1); + } // while(1) } ACE_DEBUG ((LM_DEBUG, "Client done.\n")); diff --git a/examples/Reactor/Multicast/server.cpp b/examples/Reactor/Multicast/server.cpp index 9fd8ad034c3..d85b2109d02 100644 --- a/examples/Reactor/Multicast/server.cpp +++ b/examples/Reactor/Multicast/server.cpp @@ -1,13 +1,11 @@ -// server.C (written by Tim Harrison) // $Id$ -// +// server.C (written by Tim Harrison) // listens to multicast address. after first message received, will // listen for 5 more seconds. prints Mbits/sec received from client. #include "ace/SOCK_Dgram.h" #include "ace/INET_Addr.h" - #include "ace/SOCK_Dgram_Mcast.h" #include "ace/Reactor.h" #include "Log_Wrapper.h" @@ -16,30 +14,35 @@ class Server_Events : public ACE_Event_Handler { public: - Server_Events (u_short port, + Server_Events (u_short port, const char *mcast_addr, long time_interval = 0); ~Server_Events (void); virtual int handle_input (ACE_HANDLE fd); + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg); + virtual ACE_HANDLE get_handle (void) const; ACE_Time_Value *wait_time (void); - + private: char *message_; Log_Wrapper::ACE_Log_Record *log_record_; - char buf_[4*BUFSIZ]; + char buf_[4 * BUFSIZ]; + int initialize_; + int count_; int interval_; // time interval to log messages - + ACE_Time_Value *how_long_; - ACE_Reactor *reactor_; + ACE_Reactor *reactor_; ACE_SOCK_Dgram_Mcast mcast_dgram_; - ACE_INET_Addr remote_addr_; - ACE_INET_Addr mcast_addr_; - + ACE_INET_Addr remote_addr_; + ACE_INET_Addr mcast_addr_; + // = statistics on messages received double total_bytes_received_; int total_messages_received_; @@ -59,20 +62,22 @@ Server_Events::wait_time (void) } Server_Events::Server_Events (u_short port, - const char *mcast_addr, - long time_interval) - : interval_ (time_interval), - mcast_addr_ (port, mcast_addr), - total_bytes_received_ (0) + const char *mcast_addr, + long time_interval) + : total_bytes_received_ (0), + count_(-1), + initialize_ (0), + interval_ (time_interval), + mcast_addr_ (port, mcast_addr) { - // use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group. - + // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group. + if (this->mcast_dgram_.subscribe (this->mcast_addr_) == -1) perror("can't subscribe to multicast group"), exit(1); - + // Point to NULL so that we block in the beginning. this->how_long_ = 0; - + this->log_record_ = (Log_Wrapper::ACE_Log_Record *) &buf_; this->message_ = &buf_[sizeof (Log_Wrapper::ACE_Log_Record)]; } @@ -82,13 +87,13 @@ Server_Events::Server_Events (u_short port, Server_Events::~Server_Events (void) { this->mcast_dgram_.unsubscribe (); - + ACE_DEBUG ((LM_DEBUG, "total bytes received = %d after %d second\n", this->total_bytes_received_, this->interval_)); - + ACE_DEBUG ((LM_DEBUG, "Mbits/sec = %.2f\n", (float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_)))); - + ACE_DEBUG ((LM_DEBUG, "last sequence number = %d\ntotal messages received = %d\ndiff = %d\n", this->last_sequence_number_, @@ -97,25 +102,58 @@ Server_Events::~Server_Events (void) } int -Server_Events::handle_input (ACE_HANDLE) +Server_Events::handle_timeout (const ACE_Time_Value &tv, + const void *arg) { - // after the first message, point this to a timer - // that way, the next time reactor::handle_events is called, - // a nonzero time value will be passed in. - if (this->how_long_ == 0) - this->how_long_ = new ACE_Time_Value (this->interval_); + ACE_DEBUG ((LM_DEBUG, "\t%d timeout occurred for %s.\n", + ++count_, + (char *) arg)); + if (this->count_ == 5) + { + reactor()->cancel_timer (this); + this->initialize_ = 0; + + ACE_DEBUG ((LM_DEBUG, "\t%d canceled timeout occurred for %s.\n", + count_, (char *) arg)); + } + return 0; +} + +int +Server_Events::handle_input (ACE_HANDLE) +{ // receive message from multicast group - int retcode = this->mcast_dgram_.recv (this->buf_, - sizeof this->buf_, + iovec iovp[2]; + iovp[0].iov_base = buf_; + iovp[0].iov_len = sizeof log_record_; + iovp[1].iov_base = &buf_[sizeof (log_record_)]; + iovp[1].iov_len = 4 * BUFSIZ - sizeof log_record_; + + int retcode = this->mcast_dgram_.recv (iovp, + 2, this->remote_addr_); if (retcode != -1) { total_messages_received_++; total_bytes_received_ += retcode; - last_sequence_number_ = log_record_->sequence_number; + last_sequence_number_ = ntohl(log_record_->sequence_number); ACE_DEBUG ((LM_DEBUG, "sequence number = %d\n", - log_record_->sequence_number)); + last_sequence_number_)); + ACE_DEBUG ((LM_DEBUG, "message = '%s'\n", + this->message_)); + + if (!this->initialize_) + { + reactor()->schedule_timer (this, + (void *) "Foo", + ACE_Time_Value::zero, + ACE_Time_Value(5)); + initialize_ = 1; + } + + count_ = -1; + return 0; } else @@ -135,14 +173,14 @@ main(int, char *[]) // Instance of the ACE_Reactor. ACE_Reactor reactor; - + if (reactor.register_handler (&server_events, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "register_handler", 1)); - + for (;;) reactor.handle_events (server_events.wait_time ()); - + /* NOTREACHED */ return 0; } @@ -151,7 +189,7 @@ int main (int, char *argv[]) { ACE_ERROR_RETURN ((LM_ERROR, - "error: %s must be run on a platform that support IP multicast\n", - argv[0]), -1); + "error: %s must be run on a platform that support IP multicast\n", + argv[0]), -1); } #endif /* ACE_HAS_IP_MULTICAST */ -- cgit v1.2.1