diff options
author | Steve Huston <shuston@riverace.com> | 2010-02-04 15:56:09 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2010-02-04 15:56:09 +0000 |
commit | 92f81e71a40021f42b36376b24894ccd09745b98 (patch) | |
tree | 13a10f1f239ec5cf43dc2dd5976b2f1416ac20d6 /ACE/tests/Reactor_Fairness_Test.cpp | |
parent | 381a942f035821902725148480fa6cc9d86e0331 (diff) | |
download | ATCD-92f81e71a40021f42b36376b24894ccd09745b98.tar.gz |
ChangeLogTag:Thu Feb 4 15:32:24 UTC 2010 Steve Huston <shuston@riverace.com>
Diffstat (limited to 'ACE/tests/Reactor_Fairness_Test.cpp')
-rw-r--r-- | ACE/tests/Reactor_Fairness_Test.cpp | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/ACE/tests/Reactor_Fairness_Test.cpp b/ACE/tests/Reactor_Fairness_Test.cpp new file mode 100644 index 00000000000..c286f6fd061 --- /dev/null +++ b/ACE/tests/Reactor_Fairness_Test.cpp @@ -0,0 +1,398 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Reactor_Performance_Test.cpp +// +// = DESCRIPTION +// This test is used to time the dispatching mechanisms of the +// <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and +// <ACE_Select_Reactor> can be tested. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "test_config.h" +#include "Reactor_Fairness_Test.h" +#include "ace/Get_Opt.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Acceptor.h" +#include "ace/Reactor.h" +#include "ace/Dev_Poll_Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Select_Reactor.h" +#include "ace/TP_Reactor.h" +#include "ace/Auto_Ptr.h" +#include "ace/Numeric_Limits.h" +#include "ace/Signal.h" +#include "ace/Atomic_Op.h" +#include "ace/Thread_Mutex.h" + +ACE_RCSID(tests, Reactor_Fairness_Test, "$Id$") + +#if defined (ACE_HAS_THREADS) + +namespace { + + const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz"; + + // Number of connections to run + int opt_nconnections = 5; + + // How many seconds to run the test on each reactor + int opt_secs = 30; + + // How many thread to run in the reactor loop + int opt_reactor_threads = 3; + + // Extra debug messages + int opt_debug = 0; + + ACE_Atomic_Op<ACE_Thread_Mutex, int> reactor_thread_nr = 0; + + // Class to collect and report on data handling for each test pass. + struct Result_Set { + int nr_conns; + typedef ACE_Array_Map<ACE_HANDLE, unsigned int> report_map; + report_map reports; + + void reset (int n_connections) // Reset for next run + { + reports.clear (); + nr_conns = n_connections; + } + + void report (ACE_HANDLE h, unsigned int chunks) + { + std::pair<ACE_HANDLE, unsigned int> newval (h, chunks); + reports.insert (newval); + } + + // Return 1 if this looks like a failure wrt fairness. + int analyze_reports (void) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Results (%d entries):\n"), + reports.size())); + unsigned int max_chunks = 0; + unsigned int min_chunks = ACE_Numeric_Limits<unsigned int>::max(); + for (report_map::iterator iter = reports.begin(); + iter != reports.end (); + ++iter) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT (" handle %d: %u\n"), + (*iter).first, (*iter).second)); + if ((*iter).second > max_chunks) + max_chunks = (*iter).second; + if ((*iter).second < min_chunks) + min_chunks = (*iter).second; + } + if ((max_chunks - min_chunks) > max_chunks / 10) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Too much unfairness (max %u, min %u)\n"), + max_chunks, + min_chunks), + 1); + return 0; + } + }; + Result_Set results; +} + +// Handle incoming data +int +Read_Handler::handle_input (ACE_HANDLE h) +{ + char buf[BUFSIZ]; + ssize_t result = this->peer ().recv (buf, ACE_OS::strlen(ACE_ALPHABET)); + if (opt_debug) + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("(%t) Read_Handler::handle_input h %d, result %b\n"), + h, result)); + if (result > 0) + { + if (opt_debug) + { + buf[result] = 0; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Read_Handler::handle_input: h %d: %C\n"), + h, + buf)); + } + ++this->chunks_in; + } + else if (result < 0) + { + if (errno == EWOULDBLOCK) + return 0; + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("handle_input: h %d: %p (errno: %d)\n"), + h, ACE_TEXT ("recv"), ACE_ERRNO_GET)); + + // This will cause handle_close to get called. + return -1; + } + } + else // result == 0 + { + if (opt_debug) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Handle %d closing\n"), h)); + // This will cause handle_close to get called. + return -1; + } + + return 0; +} + +// Handle connection shutdown. + +int +Read_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask /*close_mask*/) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Read_Handler handle %d close; %u chunks\n"), + handle, chunks_in)); + results.report (handle, this->chunks_in); + + // Shutdown + this->destroy (); + return 0; +} + +// Pump data as fast as possible to all the sockets. +ACE_THR_FUNC_RETURN +sender (void *arg) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) running sender\n"))); + + // Ensure an error, not a signal, on broken pipe. + ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN); + ACE_Sig_Action original_action; + no_sigpipe.register_action (SIGPIPE, &original_action); + + ACE_INET_Addr *connection_addr = + reinterpret_cast<ACE_INET_Addr *> (arg); + + int i; + + // Automagic memory cleanup. + ACE_SOCK_Stream *temp_socks = 0; + ACE_NEW_RETURN (temp_socks, + ACE_SOCK_Stream [opt_nconnections], + 0); + ACE_Auto_Basic_Array_Ptr <ACE_SOCK_Stream> socks (temp_socks); + + // Connection all <opt_nconnections> connections before sending data. + ACE_SOCK_Connector c; + for (i = 0; i < opt_nconnections; i++) + { + if (c.connect (socks[i], *connection_addr) == -1) + { + if (errno != ECONNREFUSED || i == 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) conn %d %p\n"), + ACE_TEXT ("connect"))); + while (--i >= 0) + socks[i].close (); + break; + } + } + socks[i].enable (ACE_NONBLOCK); + } + if (i < opt_nconnections) + return 0; + + // Keep blasting data on all possible connections until this thread + // is canceled. If we manage to overrun the receiver on all sockets, + // sleep a bit for the receivers to catch up. + ACE_thread_t me = ACE_Thread::self (); + ACE_Thread_Manager *tm = ACE_Thread_Manager::instance (); + size_t send_cnt = ACE_OS::strlen (ACE_ALPHABET); + bool fail = false; + while (!tm->testcancel (me) && !fail) + { + bool sent_something = false; + for (i = 0; i < opt_nconnections; i++) + { + ssize_t cnt = socks[i].send (ACE_ALPHABET, send_cnt); + if (opt_debug) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) h %d sent %b\n"), + socks[i].get_handle(), + cnt)); + if (cnt > 0) + { + sent_something = true; + continue; + } + if (errno == EWOULDBLOCK) + continue; + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p; giving up\n"), + ACE_TEXT ("sender"))); + fail = true; + break; + } + if (!fail && !sent_something) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Full sockets... pausing...\n"))); + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Resuming sending.\n"))); + } + } + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Done sending.\n"))); + for (i = 0; i < opt_nconnections; i++) + socks[i].close (); + return 0; +} + +ACE_THR_FUNC_RETURN +reactor_loop (void *p) +{ + ACE_Reactor *r = reinterpret_cast<ACE_Reactor *> (p); + int me = reactor_thread_nr++; + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Reactor loop %d starting...\n"), me)); + if (me == 0) + r->owner (ACE_Thread::self ()); + if (r->run_reactor_event_loop () == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("reactor"))); + else + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) reactor thread %d ending\n"), me)); + return 0; +} + +void +run (ACE_Reactor_Impl &ri, const ACE_TCHAR *what, bool tp = true) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting test with %s\n"), what)); + + ACE_Reactor r (&ri); + ACE_Thread_Manager *tm = ACE_Thread_Manager::instance (); + ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> acceptor; + + // Bind acceptor to any port and then find out what the port was. + ACE_INET_Addr server_addr; + if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &), &r) == -1 + || acceptor.acceptor ().get_local_addr (server_addr) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("acceptor open"))); + return; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) starting server at port %d\n"), + server_addr.get_port_number ())); + + reactor_thread_nr = 0; // Reset for new set + if (-1 == tm->spawn_n (tp ? opt_reactor_threads : 1, reactor_loop, &r)) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("reactor thread spawn"))); + acceptor.close(); + return; + } + + ACE_INET_Addr connection_addr (server_addr.get_port_number (), + ACE_DEFAULT_SERVER_HOST); + + int sender_grp = tm->spawn (sender, &connection_addr); + if (-1 == sender_grp) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("sender spawn"))); + } + else + { + ACE_OS::sleep (opt_secs); + tm->cancel_grp (sender_grp); + } + r.end_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) waiting for the test threads...\n"))); + tm->wait (); +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test")); + + //FUZZ: disable check_for_lack_ACE_OS + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("c:s:t:d"), 1); + for (int c; (c = getopt ()) != -1; ) + //FUZZ: enble check_for_lack_ACE_OS + switch (c) + { + case 'c': + opt_nconnections = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 's': + opt_secs = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 't': + opt_reactor_threads = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'd': + opt_debug = 1; + break; + } + + // Run the test once for each reactor type available. + int fails = 0; + results.reset (opt_nconnections); + { + ACE_Select_Reactor r; + run (r, ACE_TEXT ("Select Reactor"), false); // No thread pool + } + fails += results.analyze_reports (); + + results.reset (opt_nconnections); + { + ACE_TP_Reactor r; + run (r, ACE_TEXT ("TP Reactor")); + } + fails += results.analyze_reports (); + + results.reset (opt_nconnections); + { + ACE_Dev_Poll_Reactor r; + run (r, ACE_TEXT ("Dev_Poll Reactor")); + } + fails += results.analyze_reports (); + + + ACE_END_TEST; + return fails; +} + +#else +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Reactor_Performance_Test")); + + ACE_ERROR ((LM_INFO, + ACE_TEXT ("threads not supported on this platform\n"))); + + ACE_END_TEST; + return 0; +} +#endif /* ACE_HAS_THREADS */ |