diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-12-18 05:25:37 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-12-18 05:25:37 +0000 |
commit | fa0982568e34a7422d2caa516bb3bf0353f14570 (patch) | |
tree | ab15d9bde4bea90a54c08097698d64dade88f0de /examples | |
parent | e989b625e2f3783cf59760129f8cc810ac2f4c60 (diff) | |
download | ATCD-fa0982568e34a7422d2caa516bb3bf0353f14570.tar.gz |
*** empty log message ***
Diffstat (limited to 'examples')
-rw-r--r-- | examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp | 281 | ||||
-rw-r--r-- | examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp | 171 |
2 files changed, 345 insertions, 107 deletions
diff --git a/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp b/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp index d3b92532cec..3bbd6dbd8dc 100644 --- a/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp +++ b/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp @@ -5,82 +5,295 @@ #include "ace/SOCK_Connector.h" #include "ace/INET_Addr.h" +#include "ace/Thread_Manager.h" +#include "ace/Singleton.h" +#include "ace/Get_Opt.h" // ACE SOCK_SAP client. -static const char QUIT_STRING[] = "quit"; +class Options + // = TITLE + // Define the options for this test. +{ +public: + Options (void); + // Constructor. -int -main (int argc, char *argv[]) -{ - const char *host = argc > 1 ? argv[1] : ACE_DEFAULT_SERVER_HOST; - u_short r_port = argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_SERVER_PORT; - ACE_Time_Value timeout (argc > 3 ? ACE_OS::atoi (argv[3]) : ACE_DEFAULT_TIMEOUT); + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. + + ACE_Time_Value *timeout (void) const; + // Timeout value. + + u_short port (void) const; + // Port of the server. + + const char *host (void) const; + // Host of the server. + + int threads (void) const; + // Number of threads. + + const char *quit_string (void) const; + // String that shuts down the client/server. + + ssize_t read (char *buf, size_t len, size_t &iterations); + // Read from the appropriate location. + +private: + const char *host_; + // Host of the server. + + u_short port_; + // Port of the server. + + ACE_Time_Value timeout_; + // Timeout value. + + int threads_; + // Number of threads. + + const char *quit_string_; + // String that shuts down the client/server. + + ACE_HANDLE io_source_; + // Are we reading I/O from ACE_STDIN or from our generator? + + size_t iterations_; + // Number of iterations. +}; + +Options::Options (void) + : host_ (ACE_DEFAULT_SERVER_HOST), + port_ (ACE_DEFAULT_SERVER_PORT), + timeout_ (ACE_DEFAULT_TIMEOUT), + threads_ (1), + quit_string_ ("quit"), + io_source_ (ACE_INVALID_HANDLE), // Defaults to using the generator. + iterations_ (10000) +{ +} + +int +Options::read (char *buf, size_t len, size_t &iteration) +{ + if (io_source_ == ACE_STDIN) + return ACE_OS::read (ACE_STDIN, buf, sizeof buf); + else + { + if (iteration >= this->iterations_) + return 0; + else + { + ACE_OS::strncpy (buf, "TAO", len); + iteration++; + return ACE_OS::strlen ("TAO") + 1; + } + } +} + +int +Options::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt getopt (argc, argv, "h:i:p:q:st:T:", 1); + + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case 'h': + this->host_ = getopt.optarg; + break; + case 'i': + this->iterations_ = ACE_OS::atoi (getopt.optarg); + break; + case 'p': + this->port_ = ACE_OS::atoi (getopt.optarg); + break; + case 'q': + this->quit_string_ = getopt.optarg; + break; + case 's': + this->io_source_ = ACE_STDIN; + break; + case 't': + this->threads_ = ACE_OS::atoi (getopt.optarg); + break; + case 'T': + this->timeout_ = ACE_OS::atoi (getopt.optarg); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) usage: %s [-h <host>] [-p <port>] [-q <quit string>] [-s] [-t <threads>] [-T <timeout>]"), + 1); + } + + return 0; +} + +u_short +Options::port (void) const +{ + return this->port_; +} + +const char * +Options::host (void) const +{ + return this->host_; +} + +const char * +Options::quit_string (void) const +{ + return this->quit_string_; +} + +int +Options::threads (void) const +{ + return this->threads_; +} + +ACE_Time_Value * +Options::timeout (void) const +{ + return (ACE_Time_Value *) &this->timeout_; +} + +// Options Singleton. +typedef ACE_Singleton<Options, ACE_Recursive_Thread_Mutex> OPTIONS; + +// Entry point to the client service. + +void * +client (void *) +{ char buf[BUFSIZ]; + Options *options = OPTIONS::instance (); + ACE_SOCK_Stream cli_stream; - ACE_INET_Addr remote_addr (r_port, host); + ACE_INET_Addr remote_addr (options->port (), + options->host ()); ACE_SOCK_Connector con; // Attempt a non-blocking connect to the server, reusing the local // addr if necessary. Initiate blocking connection with server. - ACE_DEBUG ((LM_DEBUG, "starting connect\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting connect\n")); if (con.connect (cli_stream, remote_addr) == -1) { // Initiate timed, non-blocking connection with server. - ACE_DEBUG ((LM_DEBUG, "starting non-blocking connect\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting non-blocking connect\n")); - if (con.connect (cli_stream, remote_addr, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + if (con.connect (cli_stream, + remote_addr, + (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) { if (errno != EWOULDBLOCK) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connection failed"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "connection failed"), + 0); - ACE_DEBUG ((LM_DEBUG, "starting timed connect\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting timed connect\n")); - // Check if non-blocking connection is in progress, - // and wait up to timeout seconds for it to complete. + // Check if non-blocking connection is in progress, and wait + // up to timeout seconds for it to complete. - if (con.complete (cli_stream, &remote_addr, &timeout) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "complete failed"), 1); + if (con.complete (cli_stream, + &remote_addr, + options->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "complete failed"), + 0); else - ACE_DEBUG ((LM_DEBUG, "connected to %s\n", remote_addr.get_host_name ())); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connected to %s\n", + remote_addr.get_host_name ())); } } else - ACE_DEBUG ((LM_DEBUG, "connected to %s\n", remote_addr.get_host_name ())); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connected to %s\n", + remote_addr.get_host_name ())); if (cli_stream.disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "disable"), + 0); + + // This variable is allocated off the stack to obviate the need for + // locking. + size_t iteration = 0; // Send data to server (correctly handles "incomplete writes"). for (ssize_t r_bytes; - (r_bytes = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0; + (r_bytes = options->read (buf, sizeof buf, iteration)) > 0; ) - if (ACE_OS::strncmp (buf, QUIT_STRING, sizeof (QUIT_STRING) - 1) == 0) - break; - else if (cli_stream.send (buf, r_bytes, 0, &timeout) == -1) - { - if (errno == ETIME) - ACE_DEBUG ((LM_DEBUG, "%p\n", "send_n")); - else - // Breakout if we didn't fail due to a timeout. - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "send_n"), -1); - } + if (ACE_OS::strncmp (buf, + options->quit_string (), + ACE_OS::strlen (options->quit_string ())) == 0) + break; + else if (cli_stream.send (buf, r_bytes, 0, options->timeout ()) == -1) + { + if (errno == ETIME) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %p\n", "send_n")); + else + // Breakout if we didn't fail due to a timeout. + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "send_n"), + 0); + } // Explicitly close the writer-side of the connection. if (cli_stream.close_writer () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "close_writer"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "close_writer"), + 0); // Wait for handshake with server. if (cli_stream.recv_n (buf, 1) != 1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv_n"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n"), + 0); // Close the connection completely. if (cli_stream.close () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "close"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "close"), + 0); + return 0; +} +int +main (int argc, char *argv[]) +{ + OPTIONS::instance ()->parse_args (argc, argv); + +#if defined (ACE_HAS_THREADS) + if (ACE_Thread_Manager::instance ()->spawn_n (OPTIONS::instance ()->threads (), + client) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "spawn_n"), 1); + else + ACE_Thread_Manager::instance ()->wait (); +#else + client (); +#endif /* ACE_HAS_THREADS */ + return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Singleton<Options, ACE_Recursive_Thread_Mutex>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Singleton<Options, ACE_Recursive_Thread_Mutex> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp b/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp index ffcdaf45bae..8d0b2ac4ca9 100644 --- a/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp +++ b/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp @@ -1,25 +1,104 @@ -// This example tests the non-blocking features of the -// ACE_SOCK_Acceptor and ACE_SOCK_Stream classes. // $Id$ +// This example tests the non-blocking features of the +// ACE_SOCK_Acceptor and ACE_SOCK_Stream classes. #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/INET_Addr.h" #include "ace/Handle_Set.h" +#include "ace/Thread_Manager.h" + +// Are we running verbosely? +static int verbose = 0; + +static ACE_Time_Value timeout; + +static int sleep_time; // ACE SOCK_SAP server. +void * +server (void *arg) +{ + ACE_Handle_Set handle_set; + ACE_SOCK_Stream new_stream; + ACE_HANDLE handle = ACE_HANDLE (arg); + + new_stream.set_handle (handle); + + // Enable non-blocking I/O. + if (new_stream.enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), 0); + + handle_set.set_bit (new_stream.get_handle ()); + + // Read data from client (terminate on error). + + for (ssize_t r_bytes;;) + { + // Wait to read until there's something from the client. + if (ACE_OS::select (int (new_stream.get_handle ()) + 1, + handle_set, + 0, 0, timeout) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "select"), 0); + + // Keep reading until the client shuts down. + for (;;) + { + // Sleep for some amount of time in order to + // test client flow control. + ACE_OS::sleep (sleep_time); + + char buf[BUFSIZ]; + + r_bytes = new_stream.recv (buf, sizeof buf, 0, &timeout); + + if (r_bytes <= 0) + { + if (errno == ETIME) + ACE_ERROR ((LM_ERROR, "%p\n", "ACE::recv")); + break; + } + else if (verbose && ACE::write_n (ACE_STDOUT, buf, r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, "%p\n", "ACE::send_n")); + } + + if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + + // Send handshake back to client to unblock it. + if (new_stream.send_n ("", 1) != 1) + ACE_ERROR ((LM_ERROR, "%p\n", "send_n")); + break; + } + else if (r_bytes == -1) + { + if (errno == EWOULDBLOCK || errno == ETIME) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) no input available, going back to reading\n")); + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv"), 0); + } + } + + // Close new endpoint (listening endpoint stays open). + if (new_stream.close () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "close")); + + return 0; +} + int main (int argc, char *argv[]) { u_short port = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_SERVER_PORT; - ACE_Time_Value timeout (argc > 2 - ? ACE_OS::atoi (argv[2]) - : ACE_DEFAULT_TIMEOUT); - int sleep_time = argc > 3 ? ACE_OS::atoi (argv[3]) : 0; + timeout = argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_TIMEOUT; + sleep_time = argc > 3 ? ACE_OS::atoi (argv[3]) : 0; ACE_SOCK_Acceptor peer_acceptor; @@ -35,7 +114,7 @@ main (int argc, char *argv[]) else if (peer_acceptor.get_local_addr (server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), 1); - ACE_DEBUG ((LM_DEBUG, "starting server at port %d\n", + ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting server at port %d\n", server_addr.get_port_number ())); // Keep these objects out here to prevent excessive constructor @@ -48,8 +127,6 @@ main (int argc, char *argv[]) for (;;) { - char buf[BUFSIZ]; - handle_set.reset (); handle_set.set_bit (peer_acceptor.get_handle ()); @@ -59,7 +136,7 @@ main (int argc, char *argv[]) if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "select"), -1); else if (result == 0) - ACE_DEBUG ((LM_DEBUG, "select timed out\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) select timed out\n")); else { // Create a new ACE_SOCK_Stream endpoint (note automatic restart @@ -67,75 +144,23 @@ main (int argc, char *argv[]) while ((result = peer_acceptor.accept (new_stream, &cli_addr)) != -1) { - ACE_DEBUG ((LM_DEBUG, "client %s connected from %d\n", + ACE_DEBUG ((LM_DEBUG, "(%P|%t) client %s connected from %d\n", cli_addr.get_host_name (), cli_addr.get_port_number ())); - - // Enable non-blocking I/O. - if (new_stream.enable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), -1); - - handle_set.reset (); - handle_set.set_bit (new_stream.get_handle ()); - - // Read data from client (terminate on error). - - for (ssize_t r_bytes;;) - { - // Wait to read until there's something from the client. - if (ACE_OS::select (int (new_stream.get_handle ()) + 1, - handle_set, - 0, 0, timeout) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "select"), -1); - - // Keep reading until the client shuts down. - for (;;) - { - // Sleep for some amount of time in order to - // test client flow control. - ACE_OS::sleep (sleep_time); - - r_bytes = new_stream.recv (buf, sizeof buf, 0, &timeout); - - if (r_bytes <= 0) - { - if (errno == ETIME) - ACE_ERROR ((LM_ERROR, "%p\n", "ACE::recv")); - break; - } - else if (ACE::write_n (ACE_STDOUT, buf, r_bytes) != r_bytes) - ACE_ERROR ((LM_ERROR, "%p\n", "ACE::send_n")); - } - - if (r_bytes == 0) - { - ACE_DEBUG ((LM_DEBUG, - "reached end of input, connection closed by client\n")); - - // Send handshake back to client to unblock it. - if (new_stream.send_n ("", 1) != 1) - ACE_ERROR ((LM_ERROR, "%p\n", "send_n")); - break; - } - else if (r_bytes == -1) - { - if (errno == EWOULDBLOCK || errno == ETIME) - ACE_DEBUG ((LM_DEBUG, - "no input available, going back to reading\n")); - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv"), -1); - } - } - - // Close new endpoint (listening endpoint stays open). - if (new_stream.close () == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "close")); - } + +#if defined (ACE_HAS_THREADS) + if (ACE_Thread_Manager::instance ()->spawn (server, + (void *) new_stream.get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "spawn"), 1); +#else + server ((void *) new_stream.get_handle ()); +#endif /* ACE_HAS_THREADS */ + } if (result == -1) { if (errno == EWOULDBLOCK) ACE_DEBUG ((LM_DEBUG, - "no connections available, going back to accepting\n")); + "(%P|%t) no connections available, going back to accepting\n")); else ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE::write"), -1); } |