diff options
Diffstat (limited to 'ACE/tests/SSL/SSL_Asynch_Stream_Test.cpp')
-rw-r--r-- | ACE/tests/SSL/SSL_Asynch_Stream_Test.cpp | 478 |
1 files changed, 478 insertions, 0 deletions
diff --git a/ACE/tests/SSL/SSL_Asynch_Stream_Test.cpp b/ACE/tests/SSL/SSL_Asynch_Stream_Test.cpp new file mode 100644 index 00000000000..58556a44f0b --- /dev/null +++ b/ACE/tests/SSL/SSL_Asynch_Stream_Test.cpp @@ -0,0 +1,478 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests/SSL +// +// = FILENAME +// SSL_Asynch_Stream_Test.cpp +// +// = DESCRIPTION +// This program is a functionality test of ACE_SSL_Asynch_Stream. +// It demonstrates one proper use case of ACE_SSL_Asynch_Stream in the +// Proactor framework and validates its basic functionality. +// +// Usage: SSL_Asynch_Stream_Test [-r <hostname:port#>] +// [-t <num threads>] [-d <delay>] +// [-i <client conn attempt#>] [-n <client request# per conn>] +// +// Default value: +// <hostname:port#>: ACE_DEFAULT_SERVER_HOST:ACE_DEFAULT_PORT +// <num threads>: ACE_MAX_THREADS +// <client conn attempt#>: ACE_MAX_ITERATIONS +// <client req# per conn>: 20 +// <delay>: 0 usec +// +// = AUTHOR +// Steve Huston <shuston@riverace.com> +// +// ============================================================================ + +#include "tests/test_config.h" +#include "ace/Default_Constants.h" +#include "ace/OS_NS_string.h" +#include "ace/Event_Handler.h" +#include "ace/Get_Opt.h" +#include "ace/Proactor.h" +#include "ace/Reactor.h" +#include "ace/Thread_Manager.h" +#include "ace/INET_Addr.h" +#include "ace/SSL/SSL_Asynch_Stream.h" +#include "ace/SSL/SSL_SOCK_Connector.h" +#include "ace/SSL/SSL_SOCK_Acceptor.h" +#include "ace/SSL/SSL_SOCK_Stream.h" + +ACE_RCSID(tests, SSL_Asynch_Stream_Test, "$Id$") + +#if defined (ACE_HAS_THREADS) && ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms + // supporting POSIX aio calls. + +class Client_Handler : public ACE_Handler +{ +public: + Client_Handler () + : msgs_sent_ (0), + stream_ (ACE_SSL_Asynch_Stream::ST_CLIENT), + block_ (1024) {} + ~Client_Handler (); + + int open (ACE_HANDLE); + +private: + virtual void handle_write_stream (const ACE_SSL_Asynch_Write_Stream_Result &result); + +private: + size_t msgs_sent_; + ACE_SSL_Asynch_Stream stream_; + ACE_Message_Block block_; +}; + +class Server_Handler : public ACE_Handler +{ +public: + Server_Handler () + : msgs_rcvd_ (0), + stream_ (ACE_SSL_Asynch_Stream::ST_SERVER), + block_ (1024) {} + ~Server_Handler (); + + int open (ACE_HANDLE); + +private: + virtual void handle_read_stream (const ACE_SSL_Asynch_Read_Stream_Result &result); + +private: + size_t msgs_rcvd_; + ACE_SSL_Asynch_Stream stream_; + ACE_Message_Block block_; +}; + +class Server_Acceptor : public ACE_Event_Handler +{ +public: + int open (const ACE_INET_Addr &listen_addr); + + // Called when a new connection is ready to accept. + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + +private: + ACE_SSL_SOCK_Acceptor acceptor_; +}; + +// Accepting end point. This is actually "localhost:10010", but some +// platform couldn't resolve the name so we use the IP address +// directly here. +static const ACE_TCHAR *rendezvous = \ + ACE_DEFAULT_SERVER_HOST ACE_TEXT (":") ACE_DEFAULT_SERVER_PORT_STR; + +// Total number of proactor threads. +static size_t num_threads = ACE_MAX_THREADS; + +#if defined (CHORUS) // Add platforms that can't handle too many + // connection simultaneously here. +#define ACE_LOAD_FACTOR /2 +#else +#define ACE_LOAD_FACTOR +#endif + +// Number of client connections to attempt. +static size_t cli_conn_no = ACE_MAX_ITERATIONS ACE_LOAD_FACTOR; + +// Number of requests each client connection sends. +static size_t cli_req_no = ACE_MAX_THREADS ACE_LOAD_FACTOR; + +// Delay before a thread sending the next request (in msec.) +static int req_delay = 0; + +// This is the string sent from client to server. +static const char *test_string = "SSL_Asynch_Stream_Test!"; + +static void +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("r:t:d:i:n:")); + + int c; + + while ((c = getopt ()) != -1) + { + switch (c) + { + case 'r': // hostname:port + rendezvous = getopt.opt_arg (); + break; + case 't': + num_threads = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'd': + req_delay = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'i': + cli_conn_no = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'n': + cli_req_no = ACE_OS::atoi (getopt.opt_arg ()); + break; + default: + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Usage: %s [-r <hostname:port#>]") + ACE_TEXT ("\t[-t <nr threads>] [-d <delay>]") + ACE_TEXT ("\t[-i <client conn attempt#>]") + ACE_TEXT ("\t[-n <client request# per conn>]\n"), + argv[0])); + break; + } + } +} + +Client_Handler::~Client_Handler () +{ + if (this->stream_.handle () != ACE_INVALID_HANDLE) + { + if (this->msgs_sent_ != cli_req_no) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Client handle %d sent %d messages; ") + ACE_TEXT ("expected %d\n"), + this->stream_.handle (), + this->msgs_sent_, + cli_req_no)); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Client handle %d sent %d messages; ") + ACE_TEXT ("closing connection\n"), + this->stream_.handle (), + cli_req_no)); + } + this->stream_.close (); +} + +int +Client_Handler::open (ACE_HANDLE handle) +{ + if (this->stream_.open (*this, handle) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Client_Handler: %p\n"), + ACE_TEXT ("open")), + -1); + this->block_.copy (test_string); + if (this->stream_.write (this->block_, this->block_.length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Client_Handler: %p\n"), + ACE_TEXT ("initiate write")), + -1); + return 0; +} + +void +Client_Handler::handle_write_stream + (const ACE_SSL_Asynch_Write_Stream_Result &result) +{ + if (!result.success ()) + { + errno = result.error (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Client handle %d: %p\n"), + this->stream_.handle (), + ACE_TEXT ("write"))); + delete this; + return; + } + ACE_Message_Block &b = result.message_block (); + bool send_again = true; + if (b.length () == 0) + { + // All block's data sent; rewind the read pointer and send it again + // until we've sent the configured number of times. + ++this->msgs_sent_; + if (this->msgs_sent_ == cli_req_no) + send_again = false; // All done + else + b.rd_ptr (b.base ()); + } + + if (send_again) + { + if (this->stream_.write (this->block_, this->block_.length ()) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Client_Handler: %p\n"), + ACE_TEXT ("initiate write"))); + delete this; + } + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Client handle %d done sending\n"), + this->stream_.handle ())); + delete this; + } + return; +} + +Server_Handler::~Server_Handler () +{ + if (this->stream_.handle () != ACE_INVALID_HANDLE) + { + if (this->msgs_rcvd_ != cli_req_no) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Server handle %d received %d messages; ") + ACE_TEXT ("expected %d\n"), + this->stream_.handle (), + this->msgs_rcvd_, + cli_req_no)); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Server handle %d received %d messages; ") + ACE_TEXT ("closing connection\n"), + this->stream_.handle (), + cli_req_no)); + } + this->stream_.close (); +} + +int +Server_Handler::open (ACE_HANDLE handle) +{ + if (this->stream_.open (*this, handle) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Server_Handler: %p\n"), + ACE_TEXT ("open")), + -1); + if (this->stream_.read (this->block_, this->block_.space () - 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Server_Handler: %p\n"), + ACE_TEXT ("read")), + -1); + return 0; +} + +void +Server_Handler::handle_read_stream + (const ACE_SSL_Asynch_Read_Stream_Result &result) +{ + if (!result.success ()) + { + errno = result.error (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Server handle %d: %p\n"), + this->stream_.handle (), + ACE_TEXT ("read"))); + delete this; + return; + } + if (result.bytes_transferred () == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Server handle %d closed by peer\n"), + this->stream_.handle ())); + delete this; + return; + } + + // Scan through the received data for the expected string. There may be + // multiples and/or partials. Count up how many arrive before the connection + // is closed. + // The read operation left one byte space at the end so we can insert a + // nul terminator to ease scanning. + ACE_Message_Block &b = result.message_block (); + *(b.wr_ptr ()) = '\0'; + size_t test_string_len = ACE_OS::strlen (test_string); + while (b.length () >= test_string_len) + { + if (0 != ACE_OS::strncmp (b.rd_ptr (), test_string, test_string_len)) + ACE_ERROR_BREAK ((LM_ERROR, + ACE_TEXT ("(%t) Read string: %C; expected: %C\n"), + b.rd_ptr (), + test_string)); + b.rd_ptr (test_string_len); + } + b.crunch (); + if (this->stream_.read (b, b.space () - 1) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Server_Handler: %p\n"), + ACE_TEXT ("read"))); + delete this; + } + return; +} + + +int +Server_Acceptor::open (const ACE_INET_Addr &listen_addr) +{ + if (this->acceptor_.open (listen_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("listen")), + -1); + return 0; +} + +int +Server_Acceptor::handle_input (ACE_HANDLE) +{ + ACE_SSL_SOCK_Stream new_stream; + if (this->acceptor_.accept (new_stream) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("accept")), + -1); + Server_Handler *new_handler = 0; + ACE_NEW_RETURN (new_handler, Server_Handler, -1); + if (new_handler->open (new_stream.get_handle ()) != 0) + delete new_handler; + + return 0; +} + +int +Server_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + this->acceptor_.close (); + return 0; +} + + +static ACE_THR_FUNC_RETURN +proactor_loop (void *) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Start handling events.\n"))); + + int result = + ACE_Proactor::instance ()->proactor_run_event_loop (); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error handling events")), + 0); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Done handling events.\n"))); + + return 0; +} + +static ACE_THR_FUNC_RETURN +start_clients (void *) +{ + // Client thread function. + ACE_INET_Addr addr (rendezvous); + ACE_SSL_SOCK_Stream stream; + ACE_SSL_SOCK_Connector connect; + + for (size_t i = 0 ; i < cli_conn_no; i++) + { + if (connect.connect (stream, addr) < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("connect"))); + continue; + } + + Client_Handler *new_handler = 0; + ACE_NEW_RETURN (new_handler, Client_Handler, (ACE_THR_FUNC_RETURN)-1); + if (new_handler->open (stream.get_handle ()) != 0) + delete new_handler; + stream.set_handle (ACE_INVALID_HANDLE); + } + + return 0; +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("SSL_Asynch_Stream_Test")); + + ACE_SSL_Context *context = ACE_SSL_Context::instance (); + // Note - the next two strings are naked on purpose... the arguments to + // the ACE_SSL_Context methods are const char *, not ACE_TCHAR *. + context->certificate ("dummy.pem", SSL_FILETYPE_PEM); + context->private_key ("key.pem", SSL_FILETYPE_PEM); + + parse_args (argc, argv); + + Server_Acceptor acceptor; + ACE_INET_Addr accept_addr (rendezvous); + + if (acceptor.open (accept_addr) == -1) + return 1; + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Listening at %s\n"), rendezvous)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Spawning %d proactor threads\n"), + num_threads)); + ACE_Thread_Manager::instance ()->spawn_n (num_threads, proactor_loop); + ACE_Thread_Manager::instance ()->spawn (start_clients); + + ACE_Time_Value loop_limit (20); + ACE_Reactor::instance ()->run_reactor_event_loop (loop_limit); + ACE_Thread_Manager::instance ()->wait (); + + // Check for num connections up/down. + + ACE_END_TEST; + return 0; +} + +#else +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("SSL_Asynch_Stream_Test")); + + ACE_ERROR ((LM_INFO, + ACE_TEXT ("This test requires threads and AIO which are not ") + ACE_TEXT ("supported on this platform\n"))); + + ACE_END_TEST; + return 0; +} +#endif /* ACE_HAS_THREADS && (WIN32 || AIO) */ |