From 99aa8c60282c7b8072eb35eb9ac815702f5bf586 Mon Sep 17 00:00:00 2001 From: "William R. Otte" Date: Tue, 4 Mar 2008 14:51:23 +0000 Subject: undoing accidental deletion --- ACE/tests/Thread_Pool_Reactor_Test.cpp | 345 +++++++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 ACE/tests/Thread_Pool_Reactor_Test.cpp (limited to 'ACE/tests/Thread_Pool_Reactor_Test.cpp') diff --git a/ACE/tests/Thread_Pool_Reactor_Test.cpp b/ACE/tests/Thread_Pool_Reactor_Test.cpp new file mode 100644 index 00000000000..e60233777e7 --- /dev/null +++ b/ACE/tests/Thread_Pool_Reactor_Test.cpp @@ -0,0 +1,345 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Thread_Pool_Reactor_Test.cpp +// +// = DESCRIPTION +// This program is a torture test of thread pool reactors. It +// starts by spawning several server threads waiting to handle +// events. Several other client threads are spawned right after +// to initiate connections to server threads. Each connection +// adds a new Svc_Handler into the TP_Reactor and sends out +// several "requests" to the server thread. After the connection +// is closed, the Svc_Handler is removed from the TP_Reactor. +// Each message is treated as a separate request by the server so +// two consecutive requests might be serviced by two different +// threads. +// +// Usage: Thread_Pool_Reactor_Test [-r ] +// [-s ] [-c ] [-d ] +// [-i ] [-n ] +// +// Default value: +// : ACE_DEFAULT_RENDEZVOUS +// : ACE_MAX_THREADS +// : ACE_MAX_ITERATIONS +// : ACE_MAX_ITERATIONS +// : ACE_MAX_THREADS +// : 50 usec +// +// = AUTHOR +// Irfan Pyarali and +// Nanbor Wang +// +// ============================================================================ + +#include "tests/test_config.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Get_Opt.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/TP_Reactor.h" + +ACE_RCSID(tests, Atomic_Op_Test, "$Id$") + +#if defined (ACE_HAS_THREADS) + +#include "tests/Thread_Pool_Reactor_Test.h" +typedef ACE_Strategy_Acceptor ACCEPTOR; + +// Accepting end point. This is actually "localhost:10010", but some +// platform couldn't resolve the name so we use the IP address +// directly here. +static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010"); + +// Total number of server threads. +static size_t svr_thrno = ACE_MAX_THREADS; + +#if defined (ACE_VXWORKS) // default network parameters (MAX_BINDS and system buffers) are too small for full test + // Add platforms that can't handle too many + // connection simultaneously here. +#define ACE_LOAD_FACTOR /2 +#else +#define ACE_LOAD_FACTOR +#endif + +// Total number of client threads. +static size_t cli_thrno = ACE_MAX_THREADS ACE_LOAD_FACTOR; + +// Total connection attemps of a client thread. +static size_t cli_conn_no = ACE_MAX_ITERATIONS ACE_LOAD_FACTOR; + +// Total requests a client thread sends. +static size_t cli_req_no = ACE_MAX_THREADS ACE_LOAD_FACTOR; + +// Delay before a thread sending the next request (in msec.) +static int req_delay = 50; + +static void +parse_arg (int argc, ACE_TCHAR *argv[]) +{ + //FUZZ: disable check_for_lack_ACE_OS + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("r:s:c:d:i:n:")); + + int c; + + while ((c = getopt ()) != -1) + { + //FUZZ: enable check_for_lack_ACE_OS + switch (c) + { + case 'r': // hostname:port + rendezvous = getopt.opt_arg (); + break; + case 's': + svr_thrno = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'c': + cli_thrno = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'd': + req_delay = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'i': + cli_conn_no = ACE_OS::atoi (getopt.opt_arg ()); + break; + case 'n': + cli_req_no = ACE_OS::atoi (getopt.opt_arg ()); + break; + default: + ACE_ERROR ((LM_ERROR, + "Usage: Thread_Pool_Reactor_Test [-r ]" + "\t[-s ] [-c ] [-d ]" + "\t[-i ]" + "[-n ]\n")); + break; + } + } +} + +Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr) + : ACE_Svc_Handler (thr_mgr), + nr_msgs_rcvd_(0) +{ + // Make sure we use TP_Reactor with this class (that's the whole + // point, right?) + this->reactor (ACE_Reactor::instance ()); +} + +int +Request_Handler::handle_input (ACE_HANDLE fd) +{ + ACE_TCHAR buffer[BUFSIZ]; + ACE_TCHAR len = 0; + ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR)); + + if (result > 0 + && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR)) + == static_cast (len * sizeof (ACE_TCHAR))) + { + ++this->nr_msgs_rcvd_; + + ACE_DEBUG ((LM_DEBUG, + "(%t) svr input; fd: 0x%x; input: %s\n", + fd, + buffer)); + if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0) + ACE_Reactor::instance()->end_reactor_event_loop (); + return 0; + } + else + ACE_DEBUG ((LM_DEBUG, + "(%t) Request_Handler: 0x%x peer closed (0x%x)\n", + this, fd)); + return -1; +} + +int +Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) svr close; fd: 0x%x, rcvd %d msgs\n", + fd, + this->nr_msgs_rcvd_)); + if (this->nr_msgs_rcvd_ != cli_req_no) + ACE_ERROR((LM_ERROR, + "(%t) Handler 0x%x: Expected %d messages; got %d\n", + this, + cli_req_no, + this->nr_msgs_rcvd_)); + this->destroy (); + return 0; +} + +static int +reactor_event_hook (ACE_Reactor *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) handling events ....\n")); + + return 0; +} + +static ACE_THR_FUNC_RETURN +svr_worker (void *) +{ + // Server thread function. + int result = + ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "Error handling events"), + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%t) I am done handling events. Bye, bye\n")); + + return 0; +} + +static ACE_THR_FUNC_RETURN +cli_worker (void *arg) +{ + // Client thread function. + ACE_INET_Addr addr (rendezvous); + ACE_SOCK_Stream stream; + ACE_SOCK_Connector connect; + ACE_Time_Value delay (0, req_delay); + size_t len = * reinterpret_cast (arg); + + for (size_t i = 0 ; i < cli_conn_no; i++) + { + if (connect.connect (stream, addr) < 0) + { + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "connect")); + continue; + } + + for (size_t j = 0; j < cli_req_no; j++) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) conn_worker handle 0x%x, req %d\n", + stream.get_handle (), + j+1)); + if (stream.send_n (arg, + (len + 1) * sizeof (ACE_TCHAR)) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "send_n")); + continue; + } + ACE_OS::sleep (delay); + } + + stream.close (); + } + + return 0; +} + +static ACE_THR_FUNC_RETURN +worker (void *) +{ + ACE_OS::sleep (3); + const ACE_TCHAR *msg = ACE_TEXT ("Message from Connection worker"); + ACE_TCHAR buf [BUFSIZ]; + buf[0] = static_cast ((ACE_OS::strlen (msg) + 1)); + ACE_OS::strcpy (&buf[1], msg); + + ACE_INET_Addr addr (rendezvous); + + ACE_DEBUG((LM_DEBUG, + "(%t) Spawning %d client threads...\n", + cli_thrno)); + int grp = ACE_Thread_Manager::instance ()->spawn_n (cli_thrno, + &cli_worker, + buf); + ACE_ASSERT (grp != -1); + + ACE_Thread_Manager::instance ()->wait_grp (grp); + + ACE_DEBUG ((LM_DEBUG, + "(%t) Client threads done; shutting down...\n")); + ACE_SOCK_Stream stream; + ACE_SOCK_Connector connect; + + if (connect.connect (stream, addr) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p Error while connecting\n", + "connect")); + + const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown"); + + ACE_DEBUG ((LM_DEBUG, + "shutdown stream handle = %x\n", + stream.get_handle ())); + + if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "send_n")); + + stream.close (); + + return 0; +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test")); + parse_arg (argc, argv); + + // Changed the default + ACE_TP_Reactor sr; + ACE_Reactor new_reactor (&sr); + ACE_Reactor::instance (&new_reactor); + + ACCEPTOR acceptor; + ACE_INET_Addr accept_addr (rendezvous); + + if (acceptor.open (accept_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + 1); + + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("(%t) Spawning %d server threads...\n"), + svr_thrno)); + ACE_Thread_Manager::instance ()->spawn_n (svr_thrno, + svr_worker); + ACE_Thread_Manager::instance ()->spawn (worker); + + ACE_Thread_Manager::instance ()->wait (); + + ACE_END_TEST; + return 0; +} + +#else +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test")); + + ACE_ERROR ((LM_INFO, + "threads not supported on this platform\n")); + + ACE_END_TEST; + return 0; +} +#endif /* ACE_HAS_THREADS */ -- cgit v1.2.1