// $Id$ // ============================================================================ // // = LIBRARY // tests // // = FILENAME // Thread_Pool_Reactor_Test.cpp // // = DESCRIPTION // This program is a torture test of thread pool reactors. It // starts by spawning several server threads waiting to handle // events. Several other client threads are spawned right after // to initiate connections to server threads. Each connection // adds a new Svc_Handler into the TP_Reactor and sends out // several "requests" to the server thread. After the connection // is closed, the Svc_Handler is removed from the TP_Reactor. // Each message is treated as a separate request by the server so // two consecutive requests might be serviced by two different // threads. // // Usage: Thread_Pool_Reactor_Test [-r ] // [-s ] [-c ] [-d ] // [-i ] [-n ] // // Default value: // : ACE_DEFAULT_RENDEZVOUS // : ACE_MAX_THREADS // : ACE_MAX_ITERATIONS // : ACE_MAX_ITERATIONS // : ACE_MAX_THREADS // : 50 usec // // = AUTHOR // Irfan Pyarali and // Nanbor Wang // // ============================================================================ #include "tests/test_config.h" #include "ace/Get_Opt.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Acceptor.h" #include "ace/Acceptor.h" #include "ace/Thread_Manager.h" #include "ace/TP_Reactor.h" ACE_RCSID(tests, Atomic_Op_Test, "$Id$") #if defined (ACE_HAS_THREADS) #include "tests/Thread_Pool_Reactor_Test.h" typedef ACE_Strategy_Acceptor ACCEPTOR; // Accepting end point. This is actually "localhost:10010", but some // platform couldn't resolve the name so we use the IP address // directly here. static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010"); // Total number of server threads. static size_t svr_thrno = ACE_MAX_THREADS; #if defined (CHORUS) // Add platforms that can't handle too many // connection simultaneously here. #define ACE_LOAD_FACTOR /2 #else #define ACE_LOAD_FACTOR #endif // Total number of client threads. static size_t cli_thrno = ACE_MAX_THREADS ACE_LOAD_FACTOR; // Total connection attemps of a client thread. static size_t cli_conn_no = ACE_MAX_ITERATIONS ACE_LOAD_FACTOR; // Total requests a client thread sends. static size_t cli_req_no = ACE_MAX_THREADS ACE_LOAD_FACTOR; // Delay before a thread sending the next request (in msec.) static int req_delay = 50; static void parse_arg (int argc, ACE_TCHAR *argv[]) { ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("r:s:c:d:i:n:")); int c; while ((c = getopt ()) != -1) { switch (c) { case 'r': // hostname:port rendezvous = getopt.optarg; break; case 's': svr_thrno = ACE_OS::atoi (getopt.optarg); break; case 'c': cli_thrno = ACE_OS::atoi (getopt.optarg); break; case 'd': req_delay = ACE_OS::atoi (getopt.optarg); break; case 'i': cli_conn_no = ACE_OS::atoi (getopt.optarg); break; case 'n': cli_req_no = ACE_OS::atoi (getopt.optarg); break; default: ACE_ERROR ((LM_ERROR, "Usage: Thread_Pool_Reactor_Test [-r ]" "\t[-s ] [-c ] [-d ]" "\t[-i ]" "[-n ]\n")); break; } } } Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr) : ACE_Svc_Handler (thr_mgr), nr_msgs_rcvd_(0) { // Make sure we use TP_Reactor with this class (that's the whole // point, right?) this->reactor (ACE_Reactor::instance ()); } int Request_Handler::handle_input (ACE_HANDLE fd) { ACE_TCHAR buffer[BUFSIZ]; ACE_TCHAR len = 0; ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR)); if (result > 0 && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR)) == ACE_static_cast (ssize_t, len * sizeof (ACE_TCHAR))) { ++this->nr_msgs_rcvd_; ACE_DEBUG ((LM_DEBUG, "(%t) svr input; fd: 0x%x; input: %s\n", fd, buffer)); if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0) ACE_Reactor::end_event_loop (); return 0; } else ACE_DEBUG ((LM_DEBUG, "(%t) Request_Handler: 0x%x peer closed (0x%x)\n", this, fd)); return -1; } int Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) { ACE_DEBUG ((LM_DEBUG, "(%t) svr close; fd: 0x%x, rcvd %d msgs\n", fd, this->nr_msgs_rcvd_)); if (this->nr_msgs_rcvd_ != cli_req_no) ACE_ERROR((LM_ERROR, "(%t) Handler 0x%x: Expected %d messages; got %d\n", this, cli_req_no, this->nr_msgs_rcvd_)); this->destroy (); return 0; } static int reactor_event_hook (void *) { ACE_DEBUG ((LM_DEBUG, "(%t) handling events ....\n")); return 0; } static void * svr_worker (void *) { // Server thread function. int result = ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook); if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "Error handling events"), 0); ACE_DEBUG ((LM_DEBUG, "(%t) I am done handling events. Bye, bye\n")); return 0; } static void * cli_worker (void *arg) { // Client thread function. ACE_INET_Addr addr (rendezvous); ACE_SOCK_Stream stream; ACE_SOCK_Connector connect; ACE_Time_Value delay (0, req_delay); size_t len = * ACE_reinterpret_cast (ACE_TCHAR *, arg); for (size_t i = 0 ; i < cli_conn_no; i++) { if (connect.connect (stream, addr) < 0) { ACE_ERROR ((LM_ERROR, "(%t) %p\n", "connect")); continue; } for (size_t j = 0; j < cli_req_no; j++) { ACE_DEBUG ((LM_DEBUG, "(%t) conn_worker handle 0x%x, req %d\n", stream.get_handle (), j+1)); if (stream.send_n (arg, (len + 1) * sizeof (ACE_TCHAR)) == -1) { ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send_n")); continue; } ACE_OS::sleep (delay); } stream.close (); } return 0; } static void * worker (void *) { ACE_OS::sleep (3); const ACE_TCHAR *msg = ACE_TEXT ("Message from Connection worker"); ACE_TCHAR buf [BUFSIZ]; buf[0] = ACE_OS::strlen (msg) + 1; ACE_OS::strcpy (&buf[1], msg); ACE_INET_Addr addr (rendezvous); ACE_DEBUG((LM_DEBUG, "(%t) Spawning %d client threads...\n", cli_thrno)); int grp = ACE_Thread_Manager::instance ()->spawn_n (cli_thrno, &cli_worker, buf); ACE_ASSERT (grp != -1); ACE_Thread_Manager::instance ()->wait_grp (grp); ACE_DEBUG ((LM_DEBUG, "(%t) Client threads done; shutting down...\n")); ACE_SOCK_Stream stream; ACE_SOCK_Connector connect; if (connect.connect (stream, addr) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p Error while connecting\n", "connect")); const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown"); ACE_DEBUG ((LM_DEBUG, "shutdown stream handle = %x\n", stream.get_handle ())); if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send_n")); stream.close (); return 0; } int main (int argc, ACE_TCHAR *argv[]) { ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test")); parse_arg (argc, argv); // Changed the default ACE_TP_Reactor sr; ACE_Reactor new_reactor (&sr); ACE_Reactor::instance (&new_reactor); ACCEPTOR acceptor; ACE_INET_Addr accept_addr (rendezvous); if (acceptor.open (accept_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), 1); ACE_DEBUG((LM_DEBUG, ACE_TEXT ("(%t) Spawning %d server threads...\n"), svr_thrno)); ACE_Thread_Manager::instance ()->spawn_n (svr_thrno, svr_worker); ACE_Thread_Manager::instance ()->spawn (worker); ACE_Thread_Manager::instance ()->wait (); ACE_END_TEST; return 0; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Accept_Strategy; template class ACE_Concurrency_Strategy; template class ACE_Creation_Strategy; template class ACE_Scheduling_Strategy; template class ACE_Acceptor; template class ACE_Strategy_Acceptor; template class ACE_Svc_Handler; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Accept_Strategy #pragma instantiate ACE_Concurrency_Strategy #pragma instantiate ACE_Creation_Strategy #pragma instantiate ACE_Scheduling_Strategy #pragma instantiate ACE_Acceptor #pragma instantiate ACE_Strategy_Acceptor #pragma instantiate ACE_Svc_Handler #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ #else int main (int, ACE_TCHAR *[]) { ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test")); ACE_ERROR ((LM_INFO, "threads not supported on this platform\n")); ACE_END_TEST; return 0; } #endif /* ACE_HAS_THREADS */