diff options
author | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-10-01 06:00:55 +0000 |
---|---|---|
committer | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-10-01 06:00:55 +0000 |
commit | 0cb5531fcbbfe96e7c759d65d76da2295ce3f0e0 (patch) | |
tree | 90b518c552c54ccfe7ad6f6c674db20701e51c48 /tests/Thread_Pool_Reactor_Test.cpp | |
parent | 8a4cba1447e0999486ee508450234dd9bca2a0e8 (diff) | |
download | ATCD-0cb5531fcbbfe96e7c759d65d76da2295ce3f0e0.tar.gz |
*** empty log message ***
Diffstat (limited to 'tests/Thread_Pool_Reactor_Test.cpp')
-rw-r--r-- | tests/Thread_Pool_Reactor_Test.cpp | 286 |
1 files changed, 286 insertions, 0 deletions
diff --git a/tests/Thread_Pool_Reactor_Test.cpp b/tests/Thread_Pool_Reactor_Test.cpp new file mode 100644 index 00000000000..c5a82e48164 --- /dev/null +++ b/tests/Thread_Pool_Reactor_Test.cpp @@ -0,0 +1,286 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Thread_Pool_Reactor_Test.cpp +// +// = DESCRIPTION +// This program is a torture test of thread poll reactors. +// It starts by spawning several server threads waiting to handle +// events. Several other client threads are spawned right after +// to initiate connections to server threads. Each connection +// adds a new Svc_Handler into the TP_Reactor and sends out +// several "requests" to the server thread. After the connection +// is closed, the Svc_Handler is removed from the TP_Reactor. +// Each message is treated as a separate request by the server so +// two consecutive requests might be serviced by two different +// threads. +// +// Usage: Thread_Pool_Reactor_Test [-r <hostname:port#>] +// [-s <server thr#>] [-c <client thr#>] [-d <delay>] +// [-i <client conn attempt#>] [-n <client request# per conn>] +// +// Default value: +// <hostname:port#>: ACE_DEFAULT_RENDEZVOUS +// <server thr#>: ACE_MAX_THREADS +// <client thr#>: ACE_MAX_ITERATIONS +// <client conn attempt#>: ACE_MAX_ITERATIONS +// <client req# per conn>: ACE_MAX_THREADS +// <delay>: 50 usec +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// Nanbor Wang <nanbor@cs.wustl.edu> +// +// ============================================================================ + +#include "tests/test_config.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/TP_Reactor.h" + +ACE_RCSID(tests, Atomic_Op_Test, "$Id$") + +#if defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 +USELIB("..\ace\aced.lib"); +//--------------------------------------------------------------------------- +#endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */ + +#if defined (ACE_HAS_THREADS) + +static ASYS_TCHAR *rendezvous = ASYS_TEXT ("localhost:10010"); +static size_t svr_thrno = ACE_MAX_THREADS; +static size_t cli_thrno = ACE_MAX_ITERATIONS; +static size_t cli_conn_no = ACE_MAX_ITERATIONS; +static size_t cli_req_no = ACE_MAX_THREADS; +static int req_delay = 50; +static int main_event_loop = 1; +static ACE_Reactor *main_reactor = 0; + +void +parse_arg (int argc, ASYS_TCHAR *argv[]) +{ + // @@ TODO: Support command line arguments stated above. + + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); +} + +class Acceptor_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> +{ + // This class is the Svc_Handler used by <Acceptor>. +public: + Acceptor_Handler (ACE_Thread_Manager *tm = 0); + // The default constructor makes sure the right reactor is used. + +protected: + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask = 0); +}; + +typedef ACE_Strategy_Acceptor <Acceptor_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR; +typedef ACE_Thread_Pool_Strategy <Acceptor_Handler> CONCURRENCY_STRATEGY; +typedef ACE_Svc_Handler_Pool_Strategy <Acceptor_Handler> CREATION_STRATEGY; + +Acceptor_Handler::Acceptor_Handler (ACE_Thread_Manager *thr_mgr) + : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr) +{ + // Make sure we use TP_Reactor with this class (that's the whole + // point, right?) + this->reactor (ACE_Reactor::instance ()); +} + +int +Acceptor_Handler::handle_input (ACE_HANDLE fd) +{ + ASYS_TCHAR buffer[BUFSIZ]; + int result = -1; + if ((result = this->peer ().recv (buffer, BUFSIZ * sizeof (ASYS_TCHAR))) > 0) + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) Acceptor_Handler::handle_input (fd = %x)\n"), + fd)); + buffer[result] = '\0'; + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) handle_input: input is %s\n"), + buffer)); + if (ACE_OS::strcmp (buffer, ASYS_TEXT ("shutdown")) == 0) + { + main_event_loop = 0; + main_reactor->notify (); + ACE_Reactor::end_event_loop (); + } + + return 0; + } + else + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) Acceptor_Handler: end handle input (%x)\n"), + fd)); + + return -1; +} + +int +Acceptor_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) Acceptor_Handler::handle_close (fd = %x)\n"), + fd)); + this->destroy (); + return 0; +} + +void * +svr_worker (void *) +{ + // Server thread function. + + int result = 0; + + while (!ACE_Reactor::event_loop_done ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) handling events ....\n")); + + result = ACE_Reactor::instance ()->handle_events (); + if (result < 0) + ACE_DEBUG ((LM_DEBUG, "(%t) Error handling events\n")); + } + + ACE_DEBUG ((LM_DEBUG, + "(%t) I am done handling events. Bye, bye\n")); + + return 0; +} + +void * +cli_worker (void *) +{ + // Client thread function. + ACE_INET_Addr addr (rendezvous); + ACE_SOCK_Stream stream; + ACE_SOCK_Connector connect; + ACE_Time_Value delay (0, req_delay); + + for (size_t i = 0 ; i < cli_conn_no; i++) + { + if (connect.connect (stream, addr) < 0) + { + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("(%t) %p\n"), ASYS_TEXT ("connect"))); + continue; + } + + ASYS_TCHAR *buf = ASYS_TEXT ("Message from Connection worker\n"); + + for (size_t j = 0; j < cli_req_no; ACE_OS::sleep (delay), j++) + { + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) conn_worker stream handle = %x\n"), + stream.get_handle ())); + stream.send_n (buf, (ACE_OS::strlen (buf) + 1) * sizeof (ASYS_TCHAR)); + } + stream.close (); + } + + return 0; +} + +void * +worker (void *) +{ + ACE_OS::sleep (3); + + ACE_INET_Addr addr (rendezvous); + int grp = ACE_Thread_Manager::instance ()->spawn_n (cli_thrno, + &cli_worker, + &addr); + ACE_ASSERT (grp != -1); + + ACE_Thread_Manager::instance ()->wait_grp (grp); + + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Shutting down...\n"))); + ACE_SOCK_Stream stream; + ACE_SOCK_Connector connect; + if (connect.connect (stream, addr) < 0) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("%p Error while connecting\n"), + ASYS_TEXT ("connect"))); + + char *buf = "shutdown"; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("shutdown stream handle = %x\n"), + stream.get_handle ())); + stream.send_n (buf, ACE_OS::strlen (buf) + 1); + stream.close (); + return 0; +} + +int +main (int argc, ASYS_TCHAR *argv[]) +{ + ACE_START_TEST (ASYS_TEXT ("Thread_Pool_Reactor_Test")); + parse_arg (argc, argv); + + // Changed the default + ACE_TP_Reactor sr; + ACE_Reactor new_reactor (&sr); + ACE_Reactor::instance (&new_reactor); + + // Most platforms seem to have trouble accepting connections + // simultaneously in multiple threads. Therefore, we can't + // quite use the Acceptor with the TP_Reactor. Create a + // Select_Reactor and run the event_loop in the main thread. + ACE_Select_Reactor slr; + ACE_Reactor mreactor (&slr); + main_reactor = &mreactor; + ACCEPTOR acceptor; + CONCURRENCY_STRATEGY tp_strategy; + CREATION_STRATEGY new_strategy; + ACE_INET_Addr accept_addr (rendezvous); + acceptor.open (accept_addr, + main_reactor, + &new_strategy, + 0, + 0, //&tp_strategy, + 0, 0, 0, 0); + + ACE_Thread_Manager::instance ()->spawn_n (svr_thrno, + svr_worker); + ACE_Thread_Manager::instance ()->spawn (worker); + + int result = 0; + while (result >= 0 && main_event_loop) + { + result = slr.handle_events (); + } + + ACE_ASSERT (result != -1); + + ACE_Thread_Manager::instance ()->wait (); + + ACE_OS::sleep (1); + + ACE_END_TEST; + return 0; +} +#else +int +main (int, ASYS_TCHAR *[]) +{ + ACE_START_TEST (ASYS_TEXT ("Thread_Pool_Reactor_Test")); + + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("threads not supported on this platform\n"))); + + ACE_END_TEST; + return 0; +} +#endif /* ACE_HAS_THREADS */ + + |