diff options
Diffstat (limited to 'ACE/examples/IPC_SAP/SOCK_SAP')
21 files changed, 2865 insertions, 0 deletions
diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/.cvsignore b/ACE/examples/IPC_SAP/SOCK_SAP/.cvsignore new file mode 100644 index 00000000000..07a07ab2d27 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/.cvsignore @@ -0,0 +1,18 @@ +C-inclient +C-inclient +C-inserver +C-inserver +CPP-inclient +CPP-inclient +CPP-inserver +CPP-inserver +CPP-memclient +CPP-memclient +CPP-memserver +CPP-memserver +CPP-unclient +CPP-unclient +CPP-unserver +CPP-unserver +FD-unclient +FD-unclient diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/C-inclient.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/C-inclient.cpp new file mode 100644 index 00000000000..e4cf8997857 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/C-inclient.cpp @@ -0,0 +1,84 @@ +// $Id$ + +#include "ace/OS_main.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_sys_socket.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_netdb.h" +#include "ace/Default_Constants.h" + +ACE_RCSID(SOCK_SAP, C_inclient, "$Id$") + +/* BSD socket client */ + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + // Initialize WinSock DLL on Win32... + ACE_OS::socket_init (ACE_WSOCK_VERSION); + + struct sockaddr_in saddr; + struct hostent *hp; + const ACE_TCHAR *host = argc > 1 ? argv[1] : ACE_DEFAULT_SERVER_HOST; + u_short port_num = + htons (argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_SERVER_PORT); + int sockbufsize = argc > 3 ? ACE_OS::atoi (argv[3]) : 0; + char buf[BUFSIZ]; + ACE_HANDLE s_handle; + int w_bytes; + int r_bytes; + int n; + + // Create a local endpoint of communication. + if ((s_handle = ACE_OS::socket (PF_INET, SOCK_STREAM, 0)) == ACE_INVALID_HANDLE) + ACE_OS::perror (ACE_TEXT("socket")), ACE_OS::exit (1); + + // If a sockbufsize was specified, set it for both send and receive. + if (sockbufsize > 0) + { + if (ACE_OS::setsockopt (s_handle, SOL_SOCKET, SO_SNDBUF, + (const char *) &sockbufsize, + sizeof (sockbufsize)) != 0) + ACE_OS::perror (ACE_TEXT("SO_SNDBUF")), ACE_OS::exit (1); + if (ACE_OS::setsockopt (s_handle, SOL_SOCKET, SO_RCVBUF, + (const char *) &sockbufsize, + sizeof (sockbufsize)) != 0) + ACE_OS::perror (ACE_TEXT("SO_RCVBUF")), ACE_OS::exit (1); + } + + // Determine IP address of the server. + if ((hp = ACE_OS::gethostbyname (ACE_TEXT_ALWAYS_CHAR(host))) == 0) + ACE_OS::perror (ACE_TEXT("gethostbyname")), ACE_OS::exit (1); + + // Set up the address information to contact the server. + ACE_OS::memset ((void *) &saddr, 0, sizeof saddr); + saddr.sin_family = AF_INET; + saddr.sin_port = port_num; + ACE_OS::memcpy (&saddr.sin_addr, hp->h_addr, hp->h_length); + + // Establish connection with remote server. + if (ACE_OS::connect (s_handle, + reinterpret_cast<sockaddr *> (&saddr), + sizeof saddr) == -1) + ACE_OS::perror (ACE_TEXT("connect")), ACE_OS::exit (1); + + // Send data to server (correctly handles "incomplete writes" due to + // flow control). + + while ((r_bytes = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0) + for (w_bytes = 0; w_bytes < r_bytes; w_bytes += n) + if ((n = ACE_OS::send (s_handle, buf + w_bytes, + r_bytes - w_bytes)) < 0) + ACE_OS::perror (ACE_TEXT("write")), ACE_OS::exit (1); + + if (ACE_OS::recv (s_handle, buf, 1) == 1) + ACE_OS::write (ACE_STDOUT, buf, 1); + + // Explicitly close the connection. + if (ACE_OS::closesocket (s_handle) == -1) + ACE_OS::perror (ACE_TEXT("close")), ACE_OS::exit (1); + + return 0; +} diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/C-inserver.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/C-inserver.cpp new file mode 100644 index 00000000000..0a6915cc463 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/C-inserver.cpp @@ -0,0 +1,116 @@ +// $Id$ + +#include "ace/OS_main.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_sys_socket.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_netdb.h" +#include "ace/OS_NS_errno.h" +#include "ace/Default_Constants.h" + +ACE_RCSID(SOCK_SAP, C_inserver, "$Id$") + +/* BSD socket server. */ + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + // Initialize WinSock DLL on Win32... + ACE_OS::socket_init (ACE_WSOCK_VERSION); + + u_short port_num = + htons (argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_SERVER_PORT); + int sockbufsize = argc > 2 ? ACE_OS::atoi (argv[2]) : 0; + struct sockaddr_in saddr; + ACE_HANDLE s_handle, n_handle; + + /* Create a local endpoint of communication */ + if ((s_handle = ACE_OS::socket (PF_INET, SOCK_STREAM, 0)) == ACE_INVALID_HANDLE) + ACE_OS::perror (ACE_TEXT("socket")), ACE_OS::exit (1); + + // If a sockbufsize was specified, set it for both send and receive. + if (sockbufsize > 0) + { + if (ACE_OS::setsockopt (s_handle, SOL_SOCKET, SO_SNDBUF, + (const char *) &sockbufsize, + sizeof (sockbufsize)) != 0) + ACE_OS::perror (ACE_TEXT("SO_SNDBUF")), ACE_OS::exit (1); + if (ACE_OS::setsockopt (s_handle, SOL_SOCKET, SO_RCVBUF, + (const char *) &sockbufsize, + sizeof (sockbufsize)) != 0) + ACE_OS::perror (ACE_TEXT("SO_RCVBUF")), ACE_OS::exit (1); + } + + /* Set up the address information to become a server */ + ACE_OS::memset ((void *) &saddr, 0, sizeof saddr); + saddr.sin_family = AF_INET; + saddr.sin_port = port_num; + saddr.sin_addr.s_addr = INADDR_ANY; + + /* Associate address with endpoint */ + if (ACE_OS::bind (s_handle, + reinterpret_cast<struct sockaddr *> (&saddr), + sizeof saddr) == -1) + ACE_OS::perror (ACE_TEXT("bind")), ACE_OS::exit (1); + + /* Make endpoint listen for service requests */ + if (ACE_OS::listen (s_handle, 5) == -1) + ACE_OS::perror (ACE_TEXT("listen")), ACE_OS::exit (1); + + /* Performs the iterative server activities */ + + for (;;) + { + char buf[BUFSIZ]; + int r_bytes; + struct sockaddr_in cli_addr; + int cli_addr_len = sizeof cli_addr; + struct hostent *hp; + + /* Create a new endpoint of communication */ + do + n_handle = + ACE_OS::accept (s_handle, + reinterpret_cast<struct sockaddr *> (&cli_addr), + &cli_addr_len); + while (n_handle == ACE_INVALID_HANDLE && errno == EINTR); + + if (n_handle == ACE_INVALID_HANDLE) + { + ACE_OS::perror (ACE_TEXT("accept")); + continue; + } + +#if !defined(_UNICOS) + int addr_len = sizeof cli_addr.sin_addr.s_addr; +#else /* ! _UNICOS */ + // sizeof on bitfield fails + int addr_len = sizeof cli_addr.sin_addr; // 32 bit biffield in UNICOS +#endif /* ! _UNICOS */ + hp = ACE_OS::gethostbyaddr ((char *) &cli_addr.sin_addr, + addr_len, AF_INET); + + if (hp != 0) + ACE_OS::printf ("client %s\n", hp->h_name), ACE_OS::fflush (stdout); + else + ACE_OS::perror (ACE_TEXT("gethostbyaddr")); + + /* Read data from client (terminate on error) */ + + while ((r_bytes = ACE_OS::recv (n_handle, buf, sizeof buf)) > 0) + if (ACE_OS::write (ACE_STDOUT, buf, r_bytes) != r_bytes) + ACE_OS::perror (ACE_TEXT("write")), ACE_OS::exit (1); + + if (ACE_OS::send (n_handle, "", 1) != 1) + ::perror ("write"), ACE_OS::exit (1); + + /* Close the new endpoint + (listening endpoint remains open) */ + if (ACE_OS::closesocket (n_handle) == -1) + ACE_OS::perror (ACE_TEXT("close")), ACE_OS::exit (1); + ACE_OS::exit (0); + } + + ACE_NOTREACHED (return 0;) +} diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp new file mode 100644 index 00000000000..3428b02cd12 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp @@ -0,0 +1,424 @@ +// $Id$ + +// This tests the features of the <ACE_SOCK_Connector> and +// <ACE_SOCK_Stream> classes. In addition, it can be used to test the +// oneway and twoway latency and throughput at the socket-level. This +// is useful as a baseline to compare against ORB-level performance +// for the same types of data. + +#include "CPP-inclient.h" + +#include "ace/SOCK_Connector.h" +#include "ace/INET_Addr.h" +#include "ace/Thread_Manager.h" +#include "ace/Singleton.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_main.h" + +ACE_RCSID(SOCK_SAP, CPP_inclient, "$Id$") + +Options::Options (void) + : host_ (ACE_DEFAULT_SERVER_HOST), + port_ (ACE_DEFAULT_SERVER_PORT), + sleep_time_ (0, 0), // By default, don't sleep between calls. + threads_ (10), + quit_string_ (ACE_TEXT("q")), + message_len_ (0), + message_buf_ (0), + io_source_ (ACE_INVALID_HANDLE), // Defaults to using the generator. + iterations_ (10000), + oneway_ (1) // Make oneway calls the default. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + , barrier_ (0) +#endif /* ACE_MT_SAFE */ +{ +} + +Options::~Options (void) +{ + ACE_MT (delete this->barrier_); + delete [] this->message_buf_; +} + +// Options Singleton. +typedef ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> OPTIONS; + +int +Options::init (void) +{ + + ACE_DEBUG((LM_DEBUG,"Options::init, len = %d\n",this->message_len_)); + + // Check for default case. + if (this->message_len_ == 0) + this->message_len_ = ACE_OS::strlen ("TAO"); + ACE_DEBUG((LM_DEBUG,"Options::init, len = %d\n",this->message_len_)); + + this->message_len_ += sizeof (ACE_UINT32); + ACE_DEBUG((LM_DEBUG,"Options::init, len = %d\n",this->message_len_)); + + ACE_NEW_RETURN (this->message_buf_, + char[this->message_len_], + -1); + + // Copy the length into the beginning of the message. + ACE_UINT32 length = ntohl (this->message_len_); + ACE_OS::memcpy ((void *) this->message_buf_, + (void *) &length, + sizeof length); + + ACE_OS::memset ((void *) (this->message_buf_ + sizeof (ACE_UINT32)), + 'a', + this->message_len_ - sizeof (ACE_UINT32)); + + // Allocate the barrier with the correct count. + ACE_MT (ACE_NEW_RETURN (this->barrier_, + ACE_Barrier (this->threads_), + -1)); + return 0; +} + +size_t +Options::message_len (void) const +{ + return this->message_len_; +} + +const void * +Options::message_buf (void) const +{ + return this->message_buf_; +} + +ssize_t +Options::read (void *buf, size_t len, size_t &iteration) +{ + ACE_UNUSED_ARG (len); + + if (this->io_source_ == ACE_STDIN) + return ACE_OS::read (ACE_STDIN, buf, len); + else if (iteration >= this->iterations_) + return 0; + else + { + ACE_OS::memcpy (buf, + this->message_buf (), + len); + iteration++; + return len; + } +} + +int +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt getopt (argc, argv, ACE_TEXT("2h:i:m:p:q:st:T:"), 1); + + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case '2': // Disable the oneway client. + this->oneway_ = 0; + break; + case 'h': + this->host_ = getopt.opt_arg (); + break; + case 'i': + this->iterations_ = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'm': + this->message_len_ = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'p': + this->port_ = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'q': + this->quit_string_ = getopt.opt_arg (); + break; + case 's': + this->io_source_ = ACE_STDIN; + break; + case 't': + this->threads_ = (size_t) ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'T': + this->sleep_time_.set (0, ACE_OS::atoi (getopt.opt_arg ())); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) usage: %n [-2] [-h <host>] [-i iterations] [-m message-size] [-p <port>] [-q <quit string>] [-s] [-t <threads>] [-T <sleep_time>]\n"), + -1); + } + + return this->init (); +} + +u_short +Options::port (void) const +{ + return this->port_; +} + +const ACE_TCHAR * +Options::host (void) const +{ + return this->host_; +} + +const ACE_TCHAR * +Options::quit_string (void) const +{ + return this->quit_string_; +} + +size_t +Options::threads (void) const +{ + return this->threads_; +} + +const ACE_Time_Value & +Options::sleep_time (void) const +{ + return this->sleep_time_; +} + +char * +Options::shared_client_test (u_short port, + ACE_SOCK_Stream &cli_stream) +{ + ACE_INET_Addr remote_addr (port, this->host_); + + ACE_SOCK_Connector con; + + if (con.connect (cli_stream, + remote_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "connection failed"), + 0); + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connected to %s at port %d\n", + remote_addr.get_host_name (), + remote_addr.get_port_number ())); + + ACE_INT32 len = htonl (this->message_len ()); + + // Allocate the transmit buffer. + char *buf; + ACE_DEBUG((LM_DEBUG,"(%P|%t) allocating buffer, len = %d msglen = %d\n", + len, message_len_)); + + ACE_NEW_RETURN (buf, + char[this->message_len()], + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) waiting...\n")); + + // Wait for all other threads to finish initialization. + ACE_MT (this->barrier_->wait ()); + return buf; +} +// Static function entry point to the oneway client service. + +void * +Options::oneway_client_test (void *) +{ + Options *options = OPTIONS::instance (); + ACE_SOCK_Stream cli_stream; + + ACE_DEBUG((LM_DEBUG,"options = %d, len = %d\n",options,options->message_len())); + + // Add 1 to the port to trigger the oneway test! + char *request = options->shared_client_test (options->port () + 1, + cli_stream); + if (request == 0) + return 0; + + // This variable is allocated off the stack to obviate the need for + // locking. + size_t iteration = 0; + + // Keep track of return value. + int result = 0; + ACE_INT32 len = options->message_len (); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting oneway transmission\n")); + + // Perform oneway transmission of data to server (correctly handles + // "incomplete writes"). + + for (ssize_t r_bytes; + (r_bytes = options->read (request, len, iteration)) > 0; + // Transmit at the proper rate. + ACE_OS::sleep (options->sleep_time ())) + if (ACE_OS::memcmp (request, + options->quit_string (), + ACE_OS::strlen (options->quit_string ())) == 0) + break; + else if (cli_stream.send_n (request, r_bytes) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "send_n")); + result = -1; + break; + } + + // Close the connection. + cli_stream.close (); + + delete [] request; + return reinterpret_cast<void *> (result); +} + +// Static function entry point to the twoway client service. + +void * +Options::twoway_client_test (void *) +{ + Options *options = OPTIONS::instance (); + + ACE_SOCK_Stream cli_stream; + + char *request = options->shared_client_test (options->port (), + cli_stream); + if (request == 0) + return 0; + + // This variable is allocated off the stack to obviate the need for + // locking. + size_t iteration = 0; + + // Keep track of return value. + int result = 0; + + // Timer business. + ACE_High_Res_Timer timer; + + ACE_INT32 len = options->message_len (); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting twoway transmission\n")); + + // Perform twoway transmission of data to server (correctly handles + // "incomplete writes"). + + for (ssize_t r_bytes; + (r_bytes = options->read (request, len, iteration)) > 0; + // Transmit at the proper rate. + ACE_OS::sleep (options->sleep_time ())) + if (ACE_OS::memcmp (request, + options->quit_string (), + ACE_OS::strlen (options->quit_string ())) == 0) + break; + + // Transmit <request> to the server. + else + { + // Note that we use the incremental feature of the + // <ACE_High_Res_Timer> so that we don't get "charged" for the + // <ACE_OS::sleep> used to control the rate at which requests + // are sent. + timer.start_incr (); + + if (cli_stream.send_n (request, r_bytes) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "send_n")); + result = -1; + break; + } + // Receive the reply from the server. Normally, it just sends + // back 24 bytes, which is typical for an IIOP reply. + else if (cli_stream.recv (request, r_bytes) <= 0) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "recv")); + result = -1; + break; + } + + timer.stop_incr (); + } + + ACE_Time_Value tv; + + timer.elapsed_time_incr (tv); + double real_time = tv.sec () * ACE_ONE_SECOND_IN_USECS + tv.usec (); + double messages_per_sec = iteration * double (ACE_ONE_SECOND_IN_USECS) / real_time; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) messages = %d\n(%t) usec-per-message = %f\n(%t) messages-per-second = %0.00f\n"), + iteration, + real_time / double (iteration), + messages_per_sec < 0 ? 0 : messages_per_sec)); + + // Close the connection. + cli_stream.close (); + + delete [] request; + return reinterpret_cast<void *> (result); +} + +ACE_THR_FUNC +Options::thr_func (void) +{ + ACE_DEBUG((LM_DEBUG,"(%P|%t) in thread func, mesg len = %d\n",this->message_len())); + if (this->oneway_ == 0) + return ACE_THR_FUNC (&Options::twoway_client_test); + else + return ACE_THR_FUNC (&Options::oneway_client_test); +} + +static int +run_client (void) +{ + // Raise the socket handle limit to the maximum. + ACE::set_handle_limit (); + +#if defined (ACE_HAS_THREADS) + ACE_DEBUG((LM_DEBUG,"(%P|%t) spawning client test thread options = %d\n", + OPTIONS::instance())); + + if (ACE_Thread_Manager::instance ()->spawn_n (OPTIONS::instance ()->threads (), + OPTIONS::instance ()->thr_func ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "spawn_n"), + 1); + else + ACE_Thread_Manager::instance ()->wait (); +#else + *(OPTIONS::instance ()->thr_func) (); +#endif /* ACE_HAS_THREADS */ + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + // Initialize the logger. + ACE_LOG_MSG->open (argv[0]); + + if (OPTIONS::instance ()->parse_args (argc, argv) == -1) + return -1; + + // Run the client + run_client (); + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) +template ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> * + ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX>::singleton_; +#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.h b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.h new file mode 100644 index 00000000000..7486c70b749 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inclient.h @@ -0,0 +1,108 @@ +// $Id$ + +// This file defines the Options class for CPP-inclient. IBM C++ compiler'd +// template auto-instantiator needs this in a separate file. + +#ifndef __CPP_INCLIENT_H +#define __CPP_INCLIENT_H + +#include "ace/SOCK_Stream.h" +#include "ace/Barrier.h" +#include "ace/Time_Value.h" + +class Options + // = TITLE + // Define the options for this test. +{ +public: + Options (void); + // Constructor. + + ~Options (void); + // Destructor. + + int parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the command-line arguments. + + const ACE_Time_Value &sleep_time (void) const; + // Return the amount of time to sleep in order to implement the + // proper transmission rates. + + u_short port (void) const; + // Port of the server. + + const ACE_TCHAR *host (void) const; + // Host of the server. + + size_t threads (void) const; + // Number of threads. + + const ACE_TCHAR *quit_string (void) const; + // String that shuts down the client/server. + + ssize_t read (void *buf, size_t len, size_t &iterations); + // Read from the appropriate location. + + size_t message_len (void) const; + // Returns the length of the message to send. + + const void *message_buf (void) const; + // Returns a pointer to the message. + + ACE_THR_FUNC thr_func (void); + // Returns a pointer to the entry point into the thread that runs + // the client test function. + +private: + int init (void); + // Initialize the message we're sending to the user and set up the + // barrier. + + char *shared_client_test (u_short port, + ACE_SOCK_Stream &cli_stream); + // Performs the shared behavior of the oneway and twoway client + // tests. + + static void *twoway_client_test (void *); + // Performs the twoway test. + + static void *oneway_client_test (void *); + // Performs the oneway test. + + const ACE_TCHAR *host_; + // Host of the server. + + u_short port_; + // Port of the server. + + ACE_Time_Value sleep_time_; + // Sleep_Time value. + + size_t threads_; + // Number of threads. + + const ACE_TCHAR *quit_string_; + // String that shuts down the client/server. + + size_t message_len_; + // Size of the message we send to the server. + + char *message_buf_; + // Pointer to the message we send to the server. + + ACE_HANDLE io_source_; + // Are we reading I/O from ACE_STDIN or from our generator? + + size_t iterations_; + // Number of iterations. + + char oneway_; + // Are we running oneway or twoway? + + // Please leave the ; inside the parenthesis to avoid Green Hills + // (and probably other) compiler warning about extra ;. + ACE_MT (ACE_Barrier *barrier_;) + // Barrier used to synchronize the start of all the threads. +}; + +#endif /* __CPP_INCLIENT_H */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp new file mode 100644 index 00000000000..43df0572f74 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp @@ -0,0 +1,582 @@ + // $Id$ + +// This example tests the features of the <ACE_SOCK_Acceptor>, +// <ACE_SOCK_Stream>, and <ACE_Svc_Handler> classes. If the platform +// supports threads it uses a thread-per-connection concurrency model. +// Otherwise, it uses a single-threaded iterative server model. + +#include "ace/OS_main.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Svc_Handler.h" +#include "ace/Singleton.h" +#include "ace/Profile_Timer.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_sys_select.h" + +#include "CPP-inserver-fancy.h" + +ACE_RCSID(SOCK_SAP, CPP_inserver_fancy, "$Id$") + +// Forward declaration. +class Handler; + +class Handler_Factory +{ + // = TITLE + // Creates the oneway or twoway handlers. +public: + Handler_Factory (void); + // Constructor. + + ~Handler_Factory (void); + // Destructor. + + int handle_events (void); + // Run the main event loop. + +private: + int init_acceptors (void); + // Initialize the acceptors. + + int create_handler (ACE_SOCK_Acceptor &acceptor, + Handler *(*handler_factory) (ACE_HANDLE), + const char *handler_type); + // Factory that creates the right kind of <Handler>. + + // = Factory functions. + static Handler *make_twoway_handler (ACE_HANDLE); + // Create a twoway handler. + + static Handler *make_oneway_handler (ACE_HANDLE); + // Create a oneway handler. + + ACE_SOCK_Acceptor twoway_acceptor_; + // Twoway acceptor factory. + + ACE_SOCK_Acceptor oneway_acceptor_; + // Oneway acceptor factory. +}; + +class Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ + // = TITLE + // Base class for the oneway and twoway handlers. + + friend class Handler_Factory; + // The factory has special permission. (to access svc ()). + +public: + virtual int open (void * = 0); + // Generic initialization method. + + virtual int close (u_long); + // Close down and delete this. + +protected: + Handler (ACE_HANDLE handle); + // Constructor. + + int parse_header_and_allocate_buffer (char *&buf, + ACE_INT32 *len); + // Implement the generic code that's called from any of the subclass + // <run> methods to get the header and the buffer to read the data. + // This method factors out common code. + + virtual int run (void) = 0; + // Hook method called by the <svc> template method to do the actual + // protocol. Must be overridden by the subclass. + + virtual int svc (void); + // Template method entry point into the handler task. + + virtual void print_results (void); + // Print the results. + + size_t total_bytes_; + // Total number of bytes received. + + size_t message_count_; + // Number of messages received. + + ACE_Profile_Timer timer_; + // Keeps track of how much time we're using. +}; + +class Twoway_Handler : public Handler +{ + // = TITLE + // Performs the twoway protocol. +public: + Twoway_Handler (ACE_HANDLE handle); + // Constructor. + +private: + virtual int run (void); + // Template Method hook called by <svc>. +}; + +class Oneway_Handler : public Handler +{ + // = TITLE +public: + Oneway_Handler (ACE_HANDLE handle); + // Constructor. + +private: + virtual int run (void); + // Template Method hook called by <svc>. + + virtual void print_results (void); + // Print the results. +}; + +u_short +Options::port (void) const +{ + return this->port_; +} + +int +Options::verbose (void) const +{ + return this->verbose_; +} + +int +Options::reply_message_len (void) const +{ + return this->reply_message_len_; +} + +Options::~Options (void) +{ +} + +Options::Options (void) + : verbose_ (0), + port_ (ACE_DEFAULT_SERVER_PORT), + reply_message_len_ (24) // Default to the approximate size of an + // GIOP reply message. +{ +} + +int +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt getopt (argc, argv, ACE_TEXT("p:r:v"), 1); + + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case 'p': + this->port_ = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'r': + this->reply_message_len_ = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'v': + this->verbose_ = 1; + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) usage: %n [-p <port>] [-v]"), + -1); + } + + return 0; +} + +// Options Singleton. +typedef ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> OPTIONS; + +Handler::Handler (ACE_HANDLE handle) + : total_bytes_ (0), + message_count_ (0) +{ + this->peer ().set_handle (handle); +} + +int +Handler::open (void *) +{ + ACE_INET_Addr cli_addr; + + // Make sure we're not in non-blocking mode. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "disable"), + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) client %s connected from %d on handle %d\n", + cli_addr.get_host_name (), + cli_addr.get_port_number (), + this->peer ().get_handle ())); + return 0; +} + +int +Handler::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) closing down %x\n", + this)); + delete this; + return 0; +} + +int +Handler::svc (void) +{ + // Timer logic. + this->timer_.start (); + + // Invoke the hook method to run the specific test. + int result = this->run (); + + this->timer_.stop (); + + this->print_results (); + + return result; +} + +int +Handler::parse_header_and_allocate_buffer (char *&request, + ACE_INT32 *len) +{ + ssize_t result = this->peer ().recv_n ((void *) len, + sizeof (ACE_INT32)); + if (result == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connected closed\n")); + return -1; + } + else if (result == -1 || result != sizeof (ACE_INT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed"), + -1); + else + { + *len = ntohl (*len); + ACE_NEW_RETURN (request, + char[*len], + -1); + } + + return 0; +} + +void +Handler::print_results (void) +{ +} + +Twoway_Handler::Twoway_Handler (ACE_HANDLE handle) + : Handler (handle) +{ +} + +// Function entry point into the twoway server task. + +int +Twoway_Handler::run (void) +{ + // Read data from client (terminate on error). + + char *request = 0; + + for (;;) + { + ACE_INT32 len = 0; + + if (parse_header_and_allocate_buffer (request, + &len) == -1) + return -1; + + // Subtract off the sizeof the length prefix. + ssize_t r_bytes = this->peer ().recv_n (request, + len - sizeof (ACE_UINT32)); + + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (OPTIONS::instance ()->verbose () + && ACE::write_n (ACE_STDOUT, + request, + r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE::write_n")); + else + { + ssize_t s_bytes = (ssize_t) OPTIONS::instance ()->reply_message_len (); + + // Don't try to send more than is in the request buffer! + if (s_bytes > r_bytes) + s_bytes = r_bytes; + + if (this->peer ().send_n (request, + s_bytes) != s_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "send_n")); + } + this->total_bytes_ += size_t (r_bytes); + this->message_count_++; + + delete [] request; + request = 0; + } + + delete [] request; + return 0; +} + +Oneway_Handler::Oneway_Handler (ACE_HANDLE handle) + : Handler (handle) +{ +} + +void +Oneway_Handler::print_results (void) +{ + ACE_Profile_Timer::ACE_Elapsed_Time et; + this->timer_.elapsed_time (et); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n"), + et.real_time, + et.user_time, + et.system_time)); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\t\tmessages = %d\n\t\ttotal bytes = %d\n\t\tmbits/sec = %f\n\t\tusec-per-message = %f\n"), + this->message_count_, + this->total_bytes_, + (((double) this->total_bytes_ * 8) / et.real_time) / (double) (1024 * 1024), + ((et.user_time + et.system_time) / (double) this->message_count_) * ACE_ONE_SECOND_IN_USECS)); +} + +// Function entry point into the oneway server task. + +int +Oneway_Handler::run (void) +{ + // Read data from client (terminate on error). + + char *request = 0; + + for (;;) + { + ACE_INT32 len = 0; + + if (parse_header_and_allocate_buffer (request, + &len) == -1) + return -1; + + // Subtract off the sizeof the length prefix. + ssize_t r_bytes = this->peer ().recv_n (request, + len - sizeof (ACE_UINT32)); + + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (OPTIONS::instance ()->verbose () + && ACE::write_n (ACE_STDOUT, + request, + r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE::write_n")); + + this->total_bytes_ += size_t (r_bytes); + this->message_count_++; + delete [] request; + request = 0; + } + + delete [] request; + return 0; +} + +// Create a twoway handler. + +Handler * +Handler_Factory::make_twoway_handler (ACE_HANDLE handle) +{ + return new Twoway_Handler (handle); +} + +// Create a oneway handler. + +Handler * +Handler_Factory::make_oneway_handler (ACE_HANDLE handle) +{ + return new Oneway_Handler (handle); +} + +int +Handler_Factory::init_acceptors (void) +{ + // Create the oneway and twoway server addresses. + ACE_INET_Addr twoway_server_addr (OPTIONS::instance ()->port ()); + ACE_INET_Addr oneway_server_addr (OPTIONS::instance ()->port () + 1); + + // Create acceptors, reuse the address. + if (this->twoway_acceptor_.open (twoway_server_addr, 1) == -1 + || this->oneway_acceptor_.open (oneway_server_addr, 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "open"), + -1); + else if (this->twoway_acceptor_.get_local_addr (twoway_server_addr) == -1 + || this->oneway_acceptor_.get_local_addr (oneway_server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "get_local_addr"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting twoway server at port %d and oneway server at port %d\n", + twoway_server_addr.get_port_number (), + oneway_server_addr.get_port_number ())); + return 0; +} + +int +Handler_Factory::create_handler (ACE_SOCK_Acceptor &acceptor, + Handler * (*handler_factory) (ACE_HANDLE), + const char *handler_type) +{ + ACE_SOCK_Stream new_stream; + + if (acceptor.accept (new_stream) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "accept"), + -1); + + Handler *handler; + + ACE_ALLOCATOR_RETURN (handler, + (*handler_factory) (new_stream.get_handle ()), + -1); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) spawning %s handler\n", + handler_type)); + + if (handler->open () == -1) + return -1; + +#if defined (ACE_MT_SAFE) + // Spawn a new thread and run the new connection in that thread of + // control using the <server> function as the entry point. + return handler->activate (); +#else + handler->svc (); + handler->close (0); + return 0; +#endif /* ACE_HAS_THREADS */ +} + +Handler_Factory::Handler_Factory (void) +{ +} + +Handler_Factory::~Handler_Factory (void) +{ + this->twoway_acceptor_.close (); + this->oneway_acceptor_.close (); +} + +// Run the main event loop. + +int +Handler_Factory::handle_events (void) +{ + if (this->init_acceptors () == -1) + return -1; + + fd_set handles; + + FD_ZERO (&handles); + FD_SET ((ACE_SOCKET) this->twoway_acceptor_.get_handle (), + &handles); + FD_SET ((ACE_SOCKET) this->oneway_acceptor_.get_handle (), + &handles); + + // Performs the iterative server activities. + + for (;;) + { + ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + fd_set temp = handles; + + int result = ACE_OS::select (int (this->oneway_acceptor_.get_handle ()) + 1, + (fd_set *) &temp, + 0, + 0, + timeout); + if (result == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "select")); + else if (result == 0 && OPTIONS::instance ()->verbose ()) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) select timed out\n")); + else + { + if (FD_ISSET ((ACE_SOCKET) this->twoway_acceptor_.get_handle (), + &temp)) + this->create_handler (this->twoway_acceptor_, + &Handler_Factory::make_twoway_handler, + "twoway"); + if (FD_ISSET ((ACE_SOCKET) this->oneway_acceptor_.get_handle (), + &temp)) + this->create_handler (this->oneway_acceptor_, + &Handler_Factory::make_oneway_handler, + "oneway"); + } + } + + ACE_NOTREACHED (return 0;) +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + OPTIONS::instance ()->parse_args (argc, argv); + + Handler_Factory server; + + return server.handle_events (); +} + +#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) +template ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> * + ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX>::singleton_; +#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.h b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.h new file mode 100644 index 00000000000..9901bcc0b20 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.h @@ -0,0 +1,43 @@ +// $Id$ + +// This file defines the Options class for CPP-inserver-fancy. +// IBM C++ compiler'd template auto-instantiator needs this in a separate file. + +#ifndef __CPP_INSERVER_FANCY_H +#define __CPP_INSERVER_FANCY_H + +class Options + // = TITLE + // Define the options for this test. +{ +public: + Options (void); + // Constructor. + + ~Options (void); + // Destructor. + + int parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the command-line arguments. + + int verbose (void) const; + // Are we running in verbose mode? + + u_short port (void) const; + // Port number that we are listening at. + + int reply_message_len (void) const; + // Size of the reply message. + +private: + int verbose_; + // Are we running in verbose mode? + + u_short port_; + // Port number we listen at. + + size_t reply_message_len_; + // Size of the reply message. +}; + +#endif /* __CPP_INSERVER_FANCY_H */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-poll.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-poll.cpp new file mode 100644 index 00000000000..a281d2e380d --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver-poll.cpp @@ -0,0 +1,207 @@ +// $Id$ + +// IPC_SAP/poll server, which illustrates how to integrate the ACE +// socket wrappers with the SVR4 <poll> system call to create a +// single-threaded concurrent server. This server program can be +// driven by the oneway test mode of CPP-inclient.cpp. + +#include "ace/OS_main.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/INET_Addr.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_poll.h" +#include "ace/OS_NS_stdio.h" + +ACE_RCSID(SOCK_SAP, CPP_inserver_poll, "$Id$") + +#if defined (ACE_HAS_POLL) + +// Should we be verbose? +static int verbose = 0; + +// Max number of open handles. +static const int MAX_HANDLES = 200; + +struct Buffer_Info +{ + void *buf_; + // Pointer to the buffer. + + size_t len_; + // Length of the buffer. +}; + +// Array of <pollfd>'s. +static struct pollfd poll_array[MAX_HANDLES]; + +// Array of <Buffer_Info>. +static Buffer_Info buffer_array[MAX_HANDLES]; + +static void +init_poll_array (void) +{ + int i; + + for (i = 0; i < MAX_HANDLES; i++) + { + poll_array[i].fd = ACE_INVALID_HANDLE; + poll_array[i].events = POLLIN; + } +} + +static int +init_buffer (size_t index) +{ + ACE_INT32 len; + + if (ACE::recv_n (poll_array[index].fd, + (void *) &len, + sizeof (ACE_INT32)) != sizeof (ACE_INT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed"), + -1); + else + { + len = ntohl (len); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reading messages of size %d from handle %d\n", + len, + poll_array[index].fd)); + + ACE_ALLOCATOR_RETURN (buffer_array[index].buf_, + ACE_OS::malloc (len), + -1); + buffer_array[index].len_ = len; + } + return 0; +} + +static void +handle_data (size_t &n_handles) +{ + // Handle pending logging messages first (s_handle + 1 is guaranteed + // to be lowest client descriptor). + + for (size_t index = 1; index < n_handles; index++) + { + if (ACE_BIT_ENABLED (poll_array[index].revents, POLLIN)) + { + // First time in, we need to initialize the buffer. + if (buffer_array[index].buf_ == 0 + && init_buffer (index) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "init_buffer")); + continue; + } + + // Read data from client (terminate on error). + + ssize_t n = ACE::recv (poll_array[index].fd, + buffer_array[index].buf_, + buffer_array[index].len_); + // <recv> will not block in this case! + + if (n == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "read failed")); + else if (n == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) closing oneway server at handle %d\n", + poll_array[index].fd)); + + // Handle client connection shutdown. + ACE_OS::close (poll_array[index].fd); + poll_array[index].fd = poll_array[--n_handles].fd; + + ACE_OS::free ((void *) buffer_array[index].buf_); + buffer_array[index].buf_ = 0; + buffer_array[index].len_ = 0; + } + else if (verbose) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) %*s", + n, + buffer_array[index].buf_)); + } + } +} + +static void +handle_connections (ACE_SOCK_Acceptor &peer_acceptor, + size_t &n_handles) +{ + if (ACE_BIT_ENABLED (poll_array[0].revents, POLLIN)) + { + ACE_SOCK_Stream new_stream; + + ACE_INET_Addr client; + ACE_Time_Value nonblock (0, 0); + + // Handle all pending connection requests (note use of "polling" + // feature that doesn't block). + + while (ACE_OS::poll (poll_array, 1, nonblock) > 0) + if (peer_acceptor.accept (new_stream, &client) == -1) + ACE_OS::perror ("accept"); + else + { + const char *s = client.get_host_name (); + + ACE_ASSERT (s != 0); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) client %s\n", + s)); + poll_array[n_handles++].fd = new_stream.get_handle (); + } + } +} + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + u_short port = ACE_DEFAULT_SERVER_PORT + 1; + + // Create a server end-point. + ACE_INET_Addr addr (port); + ACE_SOCK_Acceptor peer_acceptor (addr); + + ACE_HANDLE s_handle = peer_acceptor.get_handle (); + + init_poll_array (); + + poll_array[0].fd = s_handle; + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting oneway server at port %d\n", + port)); + + for (size_t n_handles = 1;;) + { + ACE_ENDLESS_LOOP + + // Wait for client I/O events (handle interrupts). + while (ACE_OS::poll (poll_array, n_handles) == -1 + && errno == EINTR) + continue; + + handle_data (n_handles); + handle_connections (peer_acceptor, n_handles); + } + + /* NOTREACHED */ + return 0; +} +#else +#include <stdio.h> +int ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_OS::fprintf (stderr, "This feature is not supported\n"); + return 0; +} +#endif /* ACE_HAS_POLL */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp new file mode 100644 index 00000000000..b978bb5f989 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp @@ -0,0 +1,391 @@ +// $Id$ + +// This example tests the features of the <ACE_SOCK_Acceptor>, +// <ACE_SOCK_Stream>, and <ACE_Svc_Handler> classes. If the platform +// supports threads it uses a thread-per-connection concurrency model. +// Otherwise, it uses a single-threaded iterative server model. + +#include "ace/SOCK_Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/Handle_Set.h" +#include "ace/Profile_Timer.h" +#include "ace/OS_NS_sys_select.h" +#include "ace/OS_main.h" + +ACE_RCSID(SOCK_SAP, CPP_inserver, "$Id$") + +// Are we running verbosely? +static int verbose = 0; + +static void +run_server (ACE_THR_FUNC server, + ACE_HANDLE handle) +{ +#if defined (ACE_HAS_THREADS) + // Spawn a new thread and run the new connection in that thread of + // control using the <server> function as the entry point. + if (ACE_Thread_Manager::instance ()->spawn (server, + reinterpret_cast<void *> (handle), + THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "spawn")); +#else + (*server) (reinterpret_cast<void *> (handle)); +#endif /* ACE_HAS_THREADS */ +} + +// Function entry point into the twoway server task. + +static ACE_THR_FUNC_RETURN +twoway_server (void *arg) +{ + ACE_INET_Addr cli_addr; + ACE_SOCK_Stream new_stream; + ACE_HANDLE handle = (ACE_HANDLE) (long) arg; + + new_stream.set_handle (handle); + + // Make sure we're not in non-blocking mode. + if (new_stream.disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "disable"), + 0); + else if (new_stream.get_remote_addr (cli_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "get_remote_addr"), + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) client %s connected from %d\n", + cli_addr.get_host_name (), + cli_addr.get_port_number ())); + + size_t total_bytes = 0; + size_t message_count = 0; + + char *request = 0; + + // Read data from client (terminate on error). + + for (;;) + { + ACE_INT32 len; + + ssize_t r_bytes = new_stream.recv_n ((void *) &len, + sizeof (ACE_INT32)); + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (r_bytes != sizeof (ACE_INT32)) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed")); + break; + } + else + { + len = ntohl (len); + ACE_NEW_RETURN (request, + char [len], + 0); + } + + // Subtract off the sizeof the length prefix. + r_bytes = new_stream.recv_n (request, + len - sizeof (ACE_UINT32)); + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (verbose + && ACE::write_n (ACE_STDOUT, + request, + r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE::write_n")); + else if (new_stream.send_n (request, + r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "send_n")); + + total_bytes += size_t (r_bytes); + message_count++; + + delete [] request; + request = 0; + } + + // Close new endpoint (listening endpoint stays open). + new_stream.close (); + + delete [] request; + return 0; +} + +// Function entry point into the oneway server task. + +static ACE_THR_FUNC_RETURN +oneway_server (void *arg) +{ + ACE_INET_Addr cli_addr; + ACE_SOCK_Stream new_stream; + ACE_HANDLE handle = (ACE_HANDLE) (long) arg; + + new_stream.set_handle (handle); + + // Make sure we're not in non-blocking mode. + if (new_stream.disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "disable"), + 0); + else if (new_stream.get_remote_addr (cli_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "get_remote_addr"), + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) client %s connected from %d\n", + cli_addr.get_host_name (), + cli_addr.get_port_number ())); + + // Timer business + ACE_Profile_Timer timer; + timer.start (); + + size_t total_bytes = 0; + size_t message_count = 0; + + char *request = 0; + + // Read data from client (terminate on error). + + for (;;) + { + ACE_INT32 len; + + ssize_t r_bytes = new_stream.recv_n ((void *) &len, + sizeof (ACE_INT32)); + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (r_bytes != sizeof (ACE_INT32)) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed")); + break; + } + else + { + len = ntohl (len); + ACE_NEW_RETURN (request, + char [len], + 0); + } + + // Subtract off the sizeof the length prefix. + r_bytes = new_stream.recv_n (request, + len - sizeof (ACE_UINT32)); + + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (verbose + && ACE::write_n (ACE_STDOUT, request, r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE::write_n")); + + total_bytes += size_t (r_bytes); + message_count++; + + delete [] request; + request = 0; + } + + timer.stop (); + + ACE_Profile_Timer::ACE_Elapsed_Time et; + timer.elapsed_time (et); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n"), + et.real_time, + et.user_time, + et.system_time)); + + double messages_per_sec = double (message_count) / et.real_time; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\t\tmessages = %d\n\t\ttotal bytes = %d\n\t\tmbits/sec = %f\n\t\tusec-per-message = %f\n\t\tmessages-per-second = %0.00f\n"), + message_count, + total_bytes, + (((double) total_bytes * 8) / et.real_time) / (double) (1024 * 1024), + (et.real_time / (double) message_count) * 1000000, + messages_per_sec < 0 ? 0 : messages_per_sec)); + + // Close new endpoint (listening endpoint stays open). + new_stream.close (); + + delete [] request; + return 0; +} + +static int +run_event_loop (u_short port) +{ + // Raise the socket handle limit to the maximum. + ACE::set_handle_limit (); + + // Create the oneway and twoway acceptors. + ACE_SOCK_Acceptor twoway_acceptor; + ACE_SOCK_Acceptor oneway_acceptor; + + // Create the oneway and twoway server addresses. + ACE_INET_Addr twoway_server_addr (port); + ACE_INET_Addr oneway_server_addr (port + 1); + + // Create acceptors, reuse the address. + if (twoway_acceptor.open (twoway_server_addr, 1) == -1 + || oneway_acceptor.open (oneway_server_addr, 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "open"), + 1); + // Check to see what addresses we actually got bound to! + else if (twoway_acceptor.get_local_addr (twoway_server_addr) == -1 + || oneway_acceptor.get_local_addr (oneway_server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "get_local_addr"), + 1); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting twoway server at port %d and oneway server at port %d\n", + twoway_server_addr.get_port_number (), + oneway_server_addr.get_port_number ())); + + // Keep these objects out here to prevent excessive constructor + // calls within the loop. + ACE_SOCK_Stream new_stream; + + ACE_Handle_Set handle_set; + handle_set.set_bit (twoway_acceptor.get_handle ()); + handle_set.set_bit (oneway_acceptor.get_handle ()); + + // Performs the iterative server activities. + + for (;;) + { + ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + ACE_Handle_Set temp = handle_set; + + int result = ACE_OS::select (int (oneway_acceptor.get_handle ()) + 1, + (fd_set *) temp, + 0, + 0, + timeout); + if (result == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "select")); + else if (result == 0 && verbose) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) select timed out\n")); + else + { + if (temp.is_set (twoway_acceptor.get_handle ())) + { + if (twoway_acceptor.accept (new_stream) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "accept")); + continue; + } + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) spawning twoway server\n")); + + // Run the twoway server. + run_server (twoway_server, + new_stream.get_handle ()); + } + if (temp.is_set (oneway_acceptor.get_handle ())) + { + if (oneway_acceptor.accept (new_stream) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "accept")); + continue; + } + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) spawning oneway server\n")); + + // Run the oneway server. + run_server (oneway_server, + new_stream.get_handle ()); + } + } + } + + /* NOTREACHED */ +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + u_short port = ACE_DEFAULT_SERVER_PORT; + + if (argc > 1) + port = ACE_OS::atoi (argv[1]); + + return run_event_loop (port); +} diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memclient.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memclient.cpp new file mode 100644 index 00000000000..59c4beaa487 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memclient.cpp @@ -0,0 +1,53 @@ +// $Id$ + +// This tests the features of the <ACE_MEM_Connector> and +// <ACE_MEM_Stream> classes. In addition, it can be used to test the +// oneway and twoway latency and throughput at the socket-level. This +// is useful as a baseline to compare against ORB-level performance +// for the same types of data. + +#include "ace/OS_NS_string.h" +#include "ace/MEM_Connector.h" +#include "ace/INET_Addr.h" +#include "ace/Thread_Manager.h" +#include "ace/Singleton.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" + + +ACE_RCSID(SOCK_SAP, CPP_inclient, "$Id$") + +static int +run_client (void) +{ + ACE_MEM_Connector connector; + ACE_MEM_Stream stream; + ACE_MEM_Addr server_addr (ACE_DEFAULT_SERVER_PORT); + + if (connector.connect (stream, server_addr.get_remote_addr ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("connect")), -1); + + char buf [MAXPATHLEN]; + while (fgets (buf, MAXPATHLEN, stdin) >0) + { + stream.send (buf, ACE_OS::strlen (buf)+1); + stream.recv (buf, MAXPATHLEN); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Echo: %C\n"), buf)); + } + + return 0; +} + +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_UNUSED_ARG(argc); + // Initialize the logger. + ACE_LOG_MSG->open (argv[0]); + + // Run the client + run_client (); + + return 0; +} + diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memserver.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memserver.cpp new file mode 100644 index 00000000000..4ef76e51b38 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-memserver.cpp @@ -0,0 +1,70 @@ +// $Id$ + +// This example tests the features of the <ACE_MEM_Acceptor>, +// <ACE_MEM_Stream>, and <ACE_Svc_Handler> classes. If the platform +// supports threads it uses a thread-per-connection concurrency model. +// Otherwise, it uses a single-threaded iterative server model. + +#include "ace/MEM_Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/Handle_Set.h" +#include "ace/Profile_Timer.h" + +ACE_RCSID(SOCK_SAP, CPP_inserver, "$Id$") + +static int +run_event_loop (u_short port) +{ + // Create the acceptors. + ACE_MEM_Acceptor acceptor; + + ACE_MEM_Addr server_addr (port); + + // Create acceptors, reuse the address. + if (acceptor.open (server_addr, 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "open"), + 1); + else if (acceptor.get_local_addr (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "get_local_addr"), + 1); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting server at port %d\n", + server_addr.get_port_number ())); + + // Keep these objects out here to prevent excessive constructor + // calls within the loop. + ACE_MEM_Stream new_stream; + + // blocking wait on accept. + if (acceptor.accept (new_stream) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "accept"), + -1); + + char buf[MAXPATHLEN]; + int len = 0; + while ((len = new_stream.recv (buf, MAXPATHLEN)) != -1) + { + ACE_DEBUG ((LM_DEBUG, "%s\n", buf)); + new_stream.send (buf, len); + } + + return new_stream.fini (); +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + u_short port = ACE_DEFAULT_SERVER_PORT; + + if (argc > 1) + port = ACE_OS::atoi (argv[1]); + + return run_event_loop (port); +} diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unclient.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unclient.cpp new file mode 100644 index 00000000000..9f3fdec22b9 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unclient.cpp @@ -0,0 +1,71 @@ +// $Id$ + +// ACE_LSOCK Client. + +#include "ace/LSOCK_Connector.h" +#include "ace/UNIX_Addr.h" +#include "ace/Log_Msg.h" +#include "ace/OS_main.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(SOCK_SAP, CPP_unclient, "$Id$") + +#if !defined (ACE_LACKS_UNIX_DOMAIN_SOCKETS) +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + const ACE_TCHAR *rendezvous = argc > 1 ? argv[1] : ACE_DEFAULT_RENDEZVOUS; + char buf[BUFSIZ]; + + ACE_LSOCK_Stream cli_stream; + ACE_LSOCK_Connector con; + ACE_UNIX_Addr remote_addr (rendezvous); + + // Establish the connection with server. + if (con.connect (cli_stream, remote_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("connect")), + 1); + + // Send data to server (correctly handles "incomplete writes"). + + for (int r_bytes; + (r_bytes = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0; + ) + if (cli_stream.send_n (buf, r_bytes) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("send_n")), + 1); + + // Explicitly close the writer-side of the connection. + if (cli_stream.close_writer () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("close_writer")), + 1); + + // Wait for handshake with server. + if (cli_stream.recv_n (buf, 1) != 1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("recv_n")), + 1); + + // Close the connection completely. + if (cli_stream.close () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("close")), + 1); + return 0; +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "this platform does not support UNIX-domain sockets\n"), -1); +} +#endif /* ACE_LACKS_UNIX_DOMAIN_SOCKETS */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unserver.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unserver.cpp new file mode 100644 index 00000000000..cddfe787d92 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/CPP-unserver.cpp @@ -0,0 +1,159 @@ +// $Id$ + +// This example tests the features of the ACE_LSOCK_Acceptor and +// ACE_LSOCK_Stream classes. If the platform supports threads it uses +// a thread-per-request concurrency model. + +#include "ace/LSOCK_Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/OS_main.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(SOCK_SAP, CPP_unserver, "$Id$") + +#if !defined (ACE_LACKS_UNIX_DOMAIN_SOCKETS) + +// Are we running verbosely? +static int verbose = 1; + +// Entry point into the server task. + +static void * +server (void *arg) +{ + ACE_UNIX_Addr cli_addr; + ACE_LSOCK_Stream new_stream; + ACE_HANDLE handle = (ACE_HANDLE) (long) arg; + + new_stream.set_handle (handle); + + // Make sure we're not in non-blocking mode. + if (new_stream.disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("disable")), + 0); + + if (new_stream.get_remote_addr (cli_addr) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_remote_addr"))); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) client connected from %C\n"), + cli_addr.get_path_name ())); + + // Read data from client (terminate on error). + + for (;;) + { + char buf[BUFSIZ]; + + ssize_t r_bytes = new_stream.recv (buf, sizeof buf); + + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("recv"))); + break; + + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) reached end of input, connection closed by client\n"))); + break; + } + else if (verbose && ACE::write_n (ACE_STDOUT, buf, r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("ACE::write_n"))); + else if (new_stream.send_n (buf, r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("send_n"))); + } + + // Close new endpoint (listening endpoint stays open). + if (new_stream.close () == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("close"))); + + return 0; +} + +static int +run_event_loop (const ACE_TCHAR rendezvous[]) +{ + ACE_LSOCK_Acceptor peer_acceptor; + + // Create a server address. + ACE_UNIX_Addr server_addr (rendezvous); + + ACE_OS::unlink (rendezvous); + + // Create a server. + + if (peer_acceptor.open (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + 1); + else if (peer_acceptor.get_local_addr (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("starting server %C\n"), + server_addr.get_path_name ())); + + // Keep these guys out here to prevent excessive constructor + // calls... + ACE_LSOCK_Stream new_stream; + + // Performs the iterative server activities. + + for (;;) + { + ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + + if (peer_acceptor.accept (new_stream, 0, &timeout) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("accept"))); + continue; + } + +#if defined (ACE_HAS_THREADS) + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) server, + reinterpret_cast<void *> (new_stream.get_handle ()), + THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("spawn")), + 1); +#else + server (reinterpret_cast<void *> (new_stream.get_handle ())); +#endif /* ACE_HAS_THREADS */ + } + + ACE_NOTREACHED (return 0;) +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + return run_event_loop (argc > 1 ? argv[1] : ACE_DEFAULT_RENDEZVOUS); +} +#else +int ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "this platform does not support UNIX-domain sockets\n"), -1); +} +#endif /* ACE_LACKS_UNIX_DOMAIN_SOCKETS */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/FD-unclient.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/FD-unclient.cpp new file mode 100644 index 00000000000..1c21aeb74c0 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/FD-unclient.cpp @@ -0,0 +1,60 @@ +// $Id$ + +#include "ace/OS_NS_fcntl.h" +#include "ace/LSOCK_Connector.h" +#include "ace/UNIX_Addr.h" +#include "ace/Log_Msg.h" +#include "ace/OS_main.h" + +ACE_RCSID(SOCK_SAP, FD_unclient, "$Id$") + +#if defined (ACE_HAS_MSG) && !defined (ACE_LACKS_UNIX_DOMAIN_SOCKETS) +// ACE_LSOCK Client. + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + const ACE_TCHAR *file_name = argc > 1 ? argv[1] : ACE_TEXT ("./local_data"); + const ACE_TCHAR *rendezvous = argc > 2 ? argv[2] : ACE_DEFAULT_RENDEZVOUS; + + ACE_LSOCK_Stream cli_stream; + ACE_UNIX_Addr addr (rendezvous); + + // Establish the connection with server. + ACE_LSOCK_Connector connector; + + if (connector.connect (cli_stream, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p"), ACE_TEXT ("connect")), -1); + + ACE_HANDLE handle = ACE_OS::open (file_name, O_RDONLY); + + if (handle == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p"), ACE_TEXT ("open")), -1); + + // Send handle to server (correctly handles incomplete writes). + if (cli_stream.send_handle (handle) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p"), ACE_TEXT ("send")), -1); + + char buf[BUFSIZ]; + ssize_t n = cli_stream.recv (buf, sizeof buf); + + if (n == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p"), ACE_TEXT ("recv")), -1); + else if (n == 0) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("server shutdown (bug in kernel?)\n"))); + else + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("server %*C shutdown\n"), n, buf)); + + // Explicitly close the connection. + if (cli_stream.close () == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p"), ACE_TEXT ("close")), -1); + + return 0; +} +#else +int ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("your platform must support sendmsg/recvmsg to run this test\n")), -1); +} +#endif /* ACE_HAS_MSG */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/FD-unserver.cpp b/ACE/examples/IPC_SAP/SOCK_SAP/FD-unserver.cpp new file mode 100644 index 00000000000..b629b9f0dd8 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/FD-unserver.cpp @@ -0,0 +1,100 @@ +// $Id$ + +#include "ace/LSOCK_Acceptor.h" +#include "ace/LSOCK_Stream.h" +#include "ace/UNIX_Addr.h" +#include "ace/Log_Msg.h" +#include "ace/OS_main.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_stdlib.h" + +ACE_RCSID(SOCK_SAP, FD_unserver, "$Id$") + +#if !defined (ACE_LACKS_UNIX_DOMAIN_SOCKETS) + +// ACE_LSOCK Server + +void +handle_client (ACE_LSOCK_Stream &stream) +{ + char buf[BUFSIZ]; + ACE_HANDLE handle; + + // Retrieve the socket descriptor passed from the client. + + if (stream.recv_handle (handle) == -1) + ACE_ERROR ((LM_ERROR, "%p", "recv_handle")); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) ----------------------------------------\n")); + + // Read data from client (correctly handles incomplete reads due to + // flow control). + + for (ssize_t n; + (n = ACE_OS::read (handle, buf, sizeof buf)) > 0; + ) + ACE_DEBUG ((LM_DEBUG, "%*s", n, buf)); + + ACE_OS::sprintf (buf, "%d", static_cast<int> (ACE_OS::getpid ())); + + ACE_DEBUG ((LM_DEBUG, "(%s, %d) ----------------------------------------\n", buf, ACE_OS::strlen (buf))); + + // Tell the client to shut down. + if (stream.send_n (buf, ACE_OS::strlen (buf)) == -1) + ACE_ERROR ((LM_ERROR, "%p", "send")); + + // Close new endpoint (listening endpoint stays open). + if (stream.close () == -1) + ACE_ERROR ((LM_ERROR, "%p", "close")); +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + const ACE_TCHAR *rendezvous = argc > 1 ? argv[1] : ACE_DEFAULT_RENDEZVOUS; + // Create a server. + ACE_OS::unlink (rendezvous); + ACE_UNIX_Addr addr (rendezvous); + ACE_LSOCK_Acceptor peer_acceptor (addr); + ACE_LSOCK_Stream stream; + + // Performs the concurrent server activities. + + for (;;) + { + // Create a new ACE_SOCK_Stream endpoint. + if (peer_acceptor.accept (stream) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "accept"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) accepted new connection\n")); + +#if defined (VXWORKS) + handle_client (stream); +#else + switch (ACE_OS::fork (argv[0])) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "fork"), -1); + /* NOTREACHED */ + case 0: + ACE_LOG_MSG->sync (argv[0]); + handle_client (stream); + ACE_OS::exit (0); + /* NOTREACHED */ + default: + stream.close (); + } +#endif /* VXWORKS */ + } + + ACE_NOTREACHED (return 0;) +} +#else +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, "your platform doesn't not support UNIX domain sockets\n"), -1); +} +#endif /* ACE_LACKS_UNIX_DOMAIN_SOCKETS */ diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/Makefile.am b/ACE/examples/IPC_SAP/SOCK_SAP/Makefile.am new file mode 100644 index 00000000000..50260dc7343 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/Makefile.am @@ -0,0 +1,179 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.Sock_Sap_CPP_Inclient.am +noinst_PROGRAMS = CPP-inclient + +CPP_inclient_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_inclient_SOURCES = \ + CPP-inclient.cpp \ + CPP-inclient.h + +CPP_inclient_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.Sock_Sap_CPP_Inserver.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += CPP-inserver + +CPP_inserver_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_inserver_SOURCES = \ + CPP-inserver.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +CPP_inserver_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Sock_Sap_CPP_Memclient.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += CPP-memclient + +CPP_memclient_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_memclient_SOURCES = \ + CPP-memclient.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +CPP_memclient_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Sock_Sap_CPP_Memserver.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += CPP-memserver + +CPP_memserver_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_memserver_SOURCES = \ + CPP-memserver.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +CPP_memserver_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Sock_Sap_CPP_Unclient.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += CPP-unclient + +CPP_unclient_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_unclient_SOURCES = \ + CPP-unclient.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +CPP_unclient_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Sock_Sap_CPP_Unserver.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += CPP-unserver + +CPP_unserver_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +CPP_unserver_SOURCES = \ + CPP-unserver.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +CPP_unserver_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Sock_Sap_C_Inclient.am +noinst_PROGRAMS += C-inclient + +C_inclient_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +C_inclient_SOURCES = \ + C-inclient.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +C_inclient_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.Sock_Sap_C_Inserver.am +noinst_PROGRAMS += C-inserver + +C_inserver_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +C_inserver_SOURCES = \ + C-inserver.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +C_inserver_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Makefile.Sock_Sap_FD_Unclient.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += FD-unclient + +FD_unclient_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +FD_unclient_SOURCES = \ + FD-unclient.cpp \ + CPP-inclient.h \ + CPP-inserver-fancy.h + +FD_unclient_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/README b/ACE/examples/IPC_SAP/SOCK_SAP/README new file mode 100644 index 00000000000..bbe0f20ce5a --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/README @@ -0,0 +1,46 @@ +This directory contains groups of client and server test programs that +exercise the various C++ wrappers for sockets. In general, the test +programs do more or less the same thing -- the client establishes a +connection with the server and then transfers data to the server, +which keeps printing the data until EOF is reached (e.g., user types +^D). + +Unless noted differently, the server is implemented as an "iterative +server," i.e., it only deals with one client at a time. The following +describes each set of tests in more detail: + + . C-{inclient,inserver}.cpp -- This is basically a C code + implementation that opens a connection to the server and + sends all the data from the stdin using Internet domain + sockets (i.e., TCP). + + . CPP-{inclient,inserver}.cpp -- This test is + a more sophisticated C++ wrapper version of the preceeding + "C" test using Internet domain sockets (i.e., TCP). + It allows you to test oneway and twoway socket communication + latency and throughput between two processes on the same + machine or on different machines. + + . CPP-inserver-fancy.cpp -- This program is a more glitzy + version of CPP-inserver.cpp that illustrates additional + features of ACE, such as ACE_Svc_Handler. + + . CPP-inserver-poll.cpp -- This test illustrates how to + write single-threaded concurrent servers using UNIX SVR4 + poll(). You can run this test using the CPP-inclient.cpp + program as the oneway client. + + . CPP-{unclient,unserver}.cpp -- This test is basically + a C++ wrapper version of the preceeding "C++" test using + UNIX domain sockets. Note that this test only works + between a client and server process on the same machine. + + . FD-{unclient,inclient}.cpp -- This test illustrates + how to pass file descriptors between a client and a + concurrent server process on the same machine using the ACE + C++ wrappers for UNIX domain sockets. + +For examples of the ACE SOCK_{Dgram,CODgram} and +SOCK_Dgram_{Mcast,Bcast} wrappers, please take a look in the +./examples/Reactor/{Dgram,Multicast,Ntalker} directories. + diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/local_data b/ACE/examples/IPC_SAP/SOCK_SAP/local_data new file mode 100644 index 00000000000..c0119859a28 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/local_data @@ -0,0 +1 @@ +I am Iron man! diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/run_test b/ACE/examples/IPC_SAP/SOCK_SAP/run_test new file mode 100755 index 00000000000..7f0a4dbdaa7 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/run_test @@ -0,0 +1,36 @@ +#! /bin/sh +# $Id$ +# +# Spawns CPP-inserver-fancy and CPP-inclient executables on a single host. + +usage="usage: $0 #client_threads" + +user=`whoami` +iterations=1000 + +if [ $# -ne 1 ]; then + echo $usage; + exit 1 +fi +threads=$1; + + +######## +######## Enable signal handler. +######## +trap 'kill -1 $server_pid; ' 0 1 2 15 + + +######## +######## Start CPP-inserver-fancy and save its pid. +######## +./CPP-inserver-fancy > \ + ${tmp}server.log 2>&1 & +server_pid=$! + +sleep 2; + +######## +######## Start CPP-inclient. +######## +./CPP-inclient -2 -T 100000 -m 69 -t $threads -i 100 > ${tmp}client-${threads}.log 2>&1 diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/sock_sap.mpc b/ACE/examples/IPC_SAP/SOCK_SAP/sock_sap.mpc new file mode 100644 index 00000000000..814280a515c --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/sock_sap.mpc @@ -0,0 +1,72 @@ +// -*- MPC -*- +// $Id$ + +project(*C_inclient) : aceexe { + exename = C-inclient + Source_Files { + C-inclient.cpp + } +} + +project(*C_inserver) : aceexe { + exename = C-inserver + Source_Files { + C-inserver.cpp + } +} + +project(*CPP_inclient) : aceexe { + exename = CPP-inclient + Source_Files { + CPP-inclient.cpp + } +} + +project(*CPP_inserver) : aceexe { + avoids += ace_for_tao + exename = CPP-inserver + Source_Files { + CPP-inserver.cpp + } +} + +project(*CPP_memclient) : aceexe { + avoids += ace_for_tao + exename = CPP-memclient + Source_Files { + CPP-memclient.cpp + } +} + +project(*CPP_memserver) : aceexe { + avoids += ace_for_tao + exename = CPP-memserver + Source_Files { + CPP-memserver.cpp + } +} + +project(*FD_unclient) : aceexe { + avoids += ace_for_tao + exename = FD-unclient + Source_Files { + FD-unclient.cpp + } +} + +project(*CPP_unclient) : aceexe { + avoids += ace_for_tao + exename = CPP-unclient + Source_Files { + CPP-unclient.cpp + } +} + +project(*CPP_unserver) : aceexe { + avoids += ace_for_tao + exename = CPP-unserver + Source_Files { + CPP-unserver.cpp + } +} + diff --git a/ACE/examples/IPC_SAP/SOCK_SAP/summarize b/ACE/examples/IPC_SAP/SOCK_SAP/summarize new file mode 100755 index 00000000000..ee8ffd2df25 --- /dev/null +++ b/ACE/examples/IPC_SAP/SOCK_SAP/summarize @@ -0,0 +1,45 @@ +eval '(exit $?0)' && eval 'exec perl -w -S $0 ${1+"$@"}' + & eval 'exec perl -w -S $0 $argv:q' + if 0; + +# $Id$ +# +# Summarizes results from a series of runs of run_test, with +# different numbers of clients. Example usage: +# +# $ for i in 1 2 5 10 15 20 25 30 35 40 45 50; do ./run_test $i; done +# $ ./summarize +# +# The first three lines above let this script run without specifying the +# full path to perl, as long as it is in the user's PATH. +# Taken from perlrun man page. + +@files = glob 'client-*.log'; +@total_threads = (); + +foreach $file (@files) { + my ($i); + ($i = $file) =~ s/client-(\d+).log/$1/; + push @total_threads, $i; +} + +print "No.of threads\t\tAverage Latency\n\n"; + +foreach $total_threads (sort {$a <=> $b} @total_threads) { + undef $high_latency; + + $high_latency = 0; + open (FILE, "client-${total_threads}.log") || + die "$0: unable to open \"client-${total_threads}.log\"\n"; + while ($line = <FILE>) { + if ($line =~ /.*usec-per-message = ([\d\.]+)/) + { + $high_latency += $1 ; + $number++; + } + } + close FILE; + + printf "%3d\t\t\t%8f\n", + $total_threads, $high_latency/$number; +} |