diff options
Diffstat (limited to 'ACE/tests/Reactor_Performance_Test.cpp')
-rw-r--r-- | ACE/tests/Reactor_Performance_Test.cpp | 418 |
1 files changed, 418 insertions, 0 deletions
diff --git a/ACE/tests/Reactor_Performance_Test.cpp b/ACE/tests/Reactor_Performance_Test.cpp new file mode 100644 index 00000000000..47148b56cf6 --- /dev/null +++ b/ACE/tests/Reactor_Performance_Test.cpp @@ -0,0 +1,418 @@ + +//============================================================================= +/** + * @file Reactor_Performance_Test.cpp + * + * $Id$ + * + * This test is used to time the dispatching mechanisms of the + * <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and + * <ACE_Select_Reactor> can be tested. + * + * + * @author Irfan Pyarali <irfan@cs.wustl.edu> + */ +//============================================================================= + + +#include "test_config.h" +#include "Reactor_Performance_Test.h" +#include "ace/Profile_Timer.h" +#include "ace/Get_Opt.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Acceptor.h" +#include "ace/Connector.h" +#include "ace/Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Select_Reactor.h" +#include "ace/Auto_Ptr.h" + + + +#if defined (ACE_HAS_THREADS) + +static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz"; + +// Number of client (user) threads +static int opt_nconnections = 5; + +// Number of data exchanges +static int opt_nloops = 200; + +// Use the WFMO_Reactor +static int opt_wfmo_reactor = 0; + +// Use the Select_Reactor +static int opt_select_reactor = 0; + +// Extra debug messages +static int opt_debug = 0; + +int Read_Handler::waiting_ = 0; + +void +Read_Handler::set_countdown (int nconnections) +{ + Read_Handler::waiting_ = nconnections; +} + +// Initialize the Svc_Handler +int +Read_Handler::open (void *) +{ + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Read_Handler::open, cannot set non blocking mode\n")), + -1); + + if (reactor ()->register_handler (this, READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Read_Handler::open, cannot register handler\n")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) created svc_handler for handle %d\n"), + get_handle ())); + return 0; +} + +// Handle incoming data +int +Read_Handler::handle_input (ACE_HANDLE handle) +{ + ACE_UNUSED_ARG (handle); + char buf[BUFSIZ]; + + while (1) + { + ssize_t result = this->peer ().recv (buf, sizeof (buf) - 1); + + if (result > 0) + { + if (opt_debug) + { + buf[result] = 0; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Read_Handler::handle_input: %s\n"), + buf)); + } + } + else if (result < 0) + { + if (errno == EWOULDBLOCK) + return 0; + else + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("handle_input: %p (errno: %d)\n"), + ACE_TEXT ("recv"), ACE_ERRNO_GET)); + + // This will cause handle_close to get called. + return -1; + } + } + else // result == 0 + { + // This will cause handle_close to get called. + return -1; + } + } + + ACE_NOTREACHED (return 0); +} + +// Handle connection shutdown. + +int +Read_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_UNUSED_ARG (handle); + ACE_UNUSED_ARG (close_mask); + + // Reduce count. + waiting_--; + + // If no connections are open. + if (waiting_ == 0) + ACE_Reactor::instance ()->end_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Read_Handler::handle_close closing down\n"))); + + // Shutdown + this->destroy (); + return 0; +} + +int +Write_Handler::open (void *) +{ + return 0; +} + +int +Write_Handler::send_data (void) +{ + int send_size = sizeof (ACE_ALPHABET) - 1; + + if (this->peer ().send_n (ACE_ALPHABET, + send_size) != send_size) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("send_n")), + -1); + return 0; +} + +// Connection factories +typedef ACE_Connector<Write_Handler, ACE_SOCK_CONNECTOR> CONNECTOR; +typedef ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR; + +// Execute the client tests. +void * +client (void *arg) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) running client\n"))); + + ACE_INET_Addr *connection_addr = + reinterpret_cast<ACE_INET_Addr *> (arg); + CONNECTOR connector; + + int i; + + // Automagic memory cleanup. + Write_Handler **temp_writers = 0; + ACE_NEW_RETURN (temp_writers, + Write_Handler *[opt_nconnections], + 0); + ACE_Auto_Basic_Array_Ptr <Write_Handler *> writers (temp_writers); + + ACE_TCHAR *temp_failed = 0; + ACE_NEW_RETURN (temp_failed, + ACE_TCHAR[opt_nconnections], + 0); + ACE_Auto_Basic_Array_Ptr <ACE_TCHAR> failed_svc_handlers (temp_failed); + + // Automagic memory cleanup. + ACE_INET_Addr *temp_addresses; + ACE_NEW_RETURN (temp_addresses, + ACE_INET_Addr [opt_nconnections], + 0); + ACE_Auto_Array_Ptr <ACE_INET_Addr> addresses (temp_addresses); + + // Initialize array. + for (i = 0; i < opt_nconnections; i++) + { + writers[i] = 0; + addresses[i] = *connection_addr; + } + + // Connection all <opt_nconnections> svc_handlers + int result = connector.connect_n (opt_nconnections, + writers.get (), + addresses.get (), + failed_svc_handlers.get ()); + if (result == -1) + { + // Print out the connections that failed... + for (i = 0; i < opt_nconnections; i++) + if (failed_svc_handlers.get ()[i]) + { + ACE_INET_Addr failed_addr = addresses.get()[i]; + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) connection failed to %s, %d\n"), + failed_addr.get_host_name (), + failed_addr.get_port_number ())); + } + return 0; + } + + // If no connections failed (result == 0) then there should be valid + // ACE_Svc_handler pointers in each writers[] position. Iterate to + // send data + for (int j = 0; j < opt_nloops; j++) + for (i = 0; i < opt_nconnections; i++) + if (writers[i]->send_data () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("writer::send_data")), + 0); + // Cleanup + for (i = 0; i < opt_nconnections; i++) + writers[i]->destroy (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) finishing client\n"))); + return 0; +} + +// Sets up the correct reactor (based on platform and options). + +void +create_reactor (void) +{ + ACE_Reactor_Impl *impl = 0; + + if (opt_wfmo_reactor) + { +#if defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 == 1) + ACE_NEW (impl, + ACE_WFMO_Reactor); +#endif /* ACE_HAS_WINSOCK2 == 1 */ + } + else if (opt_select_reactor) + ACE_NEW (impl, + ACE_Select_Reactor); + + ACE_Reactor *reactor = 0; + ACE_NEW (reactor, + ACE_Reactor (impl)); + ACE_Reactor::instance (reactor); +} + +// Print stats. + +void +print_results (ACE_Profile_Timer::ACE_Elapsed_Time &et) +{ + const ACE_TCHAR *reactor_type = 0; + + if (opt_wfmo_reactor) + reactor_type = ACE_TEXT ("WFMO_Reactor"); + else if (opt_select_reactor) + reactor_type = ACE_TEXT ("Select_Reactor"); + else + reactor_type = ACE_TEXT ("Platform's default Reactor"); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\n\tReactor_Performance Test statistics:\n\n"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\tReactor Type: %s\n"), + reactor_type)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\tConnections: %d\n"), + opt_nconnections)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\tIteration per connection: %d\n"), + opt_nloops)); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\n\tTiming results:\n"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n\n"), + et.real_time, + et.user_time, + et.system_time)); +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Reactor_Performance_Test")); + + //FUZZ: disable check_for_lack_ACE_OS + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("dswc:l:"), 1); + for (int c; (c = getopt ()) != -1; ) + //FUZZ: enble check_for_lack_ACE_OS + switch (c) + { + case 's': + opt_select_reactor = 1; + break; + case 'w': + opt_wfmo_reactor = 1; + break; + case 'c': + opt_nconnections = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'l': + opt_nloops = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'd': + opt_debug = 1; + break; + } + + // Sets up the correct reactor (based on platform and options). + create_reactor (); + + // Manage memory automagically. + auto_ptr<ACE_Reactor> reactor (ACE_Reactor::instance ()); + auto_ptr<ACE_Reactor_Impl> impl; + + // If we are using other that the default implementation, we must + // clean up. + if (opt_select_reactor || opt_wfmo_reactor) + { + auto_ptr<ACE_Reactor_Impl> auto_impl (ACE_Reactor::instance ()->implementation ()); + impl = auto_impl; + } + + Read_Handler::set_countdown (opt_nconnections); + + // Acceptor + ACCEPTOR acceptor; + ACE_INET_Addr server_addr; + + // Bind acceptor to any port and then find out what the port was. + ACE_INET_Addr local_addr (ACE_sap_any_cast (const ACE_INET_Addr &)); + if (acceptor.open (local_addr) == -1 + || acceptor.acceptor ().get_local_addr (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("open")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) starting server at port %d\n"), + server_addr.get_port_number ())); + + ACE_INET_Addr connection_addr (server_addr.get_port_number (), + ACE_DEFAULT_SERVER_HOST); + + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (client), + (void *) &connection_addr, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("thread create failed"))); + + ACE_Time_Value run_limit (opt_nloops / 10); + + ACE_Profile_Timer timer; + timer.start (); + const int status = + ACE_Reactor::instance ()->run_reactor_event_loop (run_limit); + timer.stop (); + + ACE_Profile_Timer::ACE_Elapsed_Time et; + timer.elapsed_time (et); + + // Print results + print_results (et); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) waiting for the client thread...\n"))); + + ACE_Thread_Manager::instance ()->wait (); + + ACE_END_TEST; + return status; +} + +#else +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Reactor_Performance_Test")); + + ACE_ERROR ((LM_INFO, + ACE_TEXT ("threads not supported on this platform\n"))); + + ACE_END_TEST; + return 0; +} +#endif /* ACE_HAS_THREADS */ |