diff options
author | Ossama Othman <ossama-othman@users.noreply.github.com> | 2001-10-30 04:06:15 +0000 |
---|---|---|
committer | Ossama Othman <ossama-othman@users.noreply.github.com> | 2001-10-30 04:06:15 +0000 |
commit | 1b7580c22dbbb7ecc04aef5c251d02388439440b (patch) | |
tree | cefab9421ba69aa46855e597a092c5b65c8c2837 /tests | |
parent | 832ab264678608a7fbb7f265948217914cd59bd1 (diff) | |
download | ATCD-1b7580c22dbbb7ecc04aef5c251d02388439440b.tar.gz |
ChangeLogTag:Mon Oct 29 20:04:40 2001 Ossama Othman <ossama@uci.edu>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/Proactor_Test.cpp | 346 |
1 files changed, 215 insertions, 131 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index dedfdee9ee4..d0c996af2aa 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -31,13 +31,13 @@ ACE_RCSID (tests, #include "ace/SOCK_Connector.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" +#include "ace/Object_Manager.h" #include "ace/Get_Opt.h" #include "ace/streams.h" #include "ace/Proactor.h" #include "ace/Asynch_Acceptor.h" #include "ace/Task.h" -#include "ace/Object_Manager.h" #if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) @@ -58,7 +58,7 @@ static int disable_signal (int sigmin, int sigmax); static int proactor_type = 0; // POSIX : > 0 max number aio operations proactor, -static int max_aio_operations = 0; +static size_t max_aio_operations = 0; // both: 0 run client or server / depends on host // != 0 run client and server @@ -69,15 +69,15 @@ static const ACE_TCHAR *host = 0; // number of Senders instances static int senders = 1; -const int MAX_SENDERS = 100; -const int MAX_RECEIVERS = 1000; +const int MAX_SENDERS = 100; +const int MAX_RECEIVERS = 1000; // duplex mode: == 0 half-duplex // != 0 full duplex static int duplex = 0; // number threads in the Proactor thread pool -static int threads = 1; +static size_t threads = 1; // Port that we're receiving connections on. static u_short port = ACE_DEFAULT_SERVER_PORT; @@ -86,6 +86,10 @@ static u_short port = ACE_DEFAULT_SERVER_PORT; static int logflag = 0; // 0 STDERR, 1 FILE static int loglevel = 0; // 0 full , 1 only errors +static const size_t MIN_TIME = 1; // min 1 sec +static const size_t MAX_TIME = 3600; // max 1 hour +static size_t seconds = 2; // default time to run - 2 seconds + static char data[] = "GET / HTTP/1.1\r\n" "Accept: */*\r\n" @@ -93,7 +97,7 @@ static char data[] = "Accept-Encoding: gzip, deflate\r\n" "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n" "Connection: Keep-Alive\r\n" - "\r\n" ; + "\r\n"; // ************************************************************* // MyTask is ACE_Task resposible for : @@ -114,102 +118,172 @@ static char data[] = class MyTask : public ACE_Task<ACE_MT_SYNCH> { public: - MyTask (void): threads_ (0), proactor_ (0) {} + MyTask (void): lock_ (), sem_ (0), proactor_(0) {} - int svc (void); - void waitready (void) { this->event_.wait (); } + virtual ~MyTask() { (void) this->stop ();} + + virtual int svc (void); + + int start (size_t num_threads, + int type_proactor, + size_t max_op ); + int stop (void); -private: - void create_proactor (void); - void delete_proactor (void); private: + int create_proactor (int type_proactor, + size_t max_op); + int delete_proactor (void); + ACE_SYNCH_RECURSIVE_MUTEX lock_; - int threads_; - ACE_Proactor *proactor_; - ACE_Manual_Event event_; + ACE_Thread_Semaphore sem_; + ACE_Proactor * proactor_; + }; -void -MyTask::create_proactor (void) +int +MyTask::create_proactor (int type_proactor, size_t max_op ) { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - monitor, - this->lock_); + ACE_GUARD_RETURN ( ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_ASSERT ( this->proactor_ == 0 ); - if (this->threads_ == 0) - { #if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) - ACE_WIN32_Proactor *proactor = 0; - ACE_NEW (proactor, - ACE_WIN32_Proactor); - ACE_DEBUG ((LM_DEBUG, "(%t) Create Proactor Type = WIN32\n")); + ACE_UNUSED_ARG ( type_proactor ); + ACE_UNUSED_ARG ( max_op ); + + ACE_WIN32_Proactor *proactor = 0; + + ACE_NEW_RETURN (proactor, + ACE_WIN32_Proactor, + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Create Proactor Type = WIN32\n"))); #elif defined (ACE_HAS_AIO_CALLS) - ACE_POSIX_Proactor *proactor = 0; + ACE_POSIX_Proactor * proactor = 0; - switch (proactor_type) - { - case 1: - ACE_NEW (proactor, - ACE_POSIX_AIOCB_Proactor (max_aio_operations)); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); - break; + switch (type_proactor) + { + case 1: + ACE_NEW_RETURN (proactor, + ACE_POSIX_AIOCB_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); + break; - case 2: - ACE_NEW (proactor, - ACE_POSIX_SIG_Proactor); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); - break; + case 2: + ACE_NEW_RETURN (proactor, + ACE_POSIX_SIG_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); + break; # if defined (sun) - case 3: - ACE_NEW (proactor, - ACE_SUN_Proactor (max_aio_operations)); - ACE_DEBUG ((LM_DEBUG, "(%t) Create Proactor Type = SUN\n")); - break; + case 3: + ACE_NEW_RETURN (proactor, + ACE_SUN_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Create Proactor Type = SUN\n"))); + break; # endif /* sun */ - default: - ACE_NEW (proactor, - ACE_POSIX_SIG_Proactor); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); - break; - } - -#endif + default: + ACE_NEW_RETURN (proactor, + ACE_POSIX_SIG_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); + break; + } - ACE_NEW (this->proactor_, - ACE_Proactor (proactor, 1)); +#endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE) - ACE_Proactor::instance (this->proactor_); - this->event_.signal (); - } + ACE_NEW_RETURN ( this->proactor_, + ACE_Proactor (proactor, 1), + -1); - this->threads_++; + ACE_Proactor::instance (this->proactor_); + return 0; } -void +int MyTask::delete_proactor (void) { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - monitor, - this->lock_); + ACE_GUARD_RETURN ( ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Delete Proactor\n"))); + + ACE_Proactor::instance ((ACE_Proactor *) 0); + delete this->proactor_; + this->proactor_ = 0; + + return 0; +} + +int +MyTask::start( size_t num_threads, + int type_proactor, + size_t max_op ) +{ + if ( this->create_proactor (type_proactor, max_op) == -1 ) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to create proactor")), + -1); + + if ( this->activate (THR_NEW_LWP, num_threads) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to activate thread pool")), + -1); + + for (; num_threads > 0; num_threads-- ) + { + sem_.acquire (); + } + + return 0; +} + - if (--this->threads_ == 0) +int +MyTask::stop () +{ + if ( this->proactor_ != 0 ) { - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Delete Proactor\n"))); - ACE_Proactor::instance ((ACE_Proactor *) 0); - delete this->proactor_; - this->proactor_ = 0; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("End Proactor event loop\n"))); + + ACE_Proactor::end_event_loop (); } + + if ( this->wait () == -1 ) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to stop thread pool"))); + + if ( this->delete_proactor () == -1 ) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to delete proactor"))); + + return 0; } int @@ -217,14 +291,14 @@ MyTask::svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n"))); - this->create_proactor (); ::disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); + // signal that we are ready + sem_.release (1); + while (ACE_Proactor::event_loop_done () == 0) ACE_Proactor::run_event_loop (); - this->delete_proactor (); - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n"))); return 0; } @@ -280,12 +354,11 @@ class Acceptor : public ACE_Asynch_Acceptor<Receiver> { friend class Receiver; public: + int get_number_sessions (void) { return this->sessions_; } Acceptor (void); virtual ~Acceptor (void); - long get_number_sessions (void) { return this->sessions_; } - void stop (void); // Virtual from ACE_Asynch_Acceptor @@ -296,14 +369,14 @@ private: void on_delete_receiver (Receiver &rcvr); ACE_SYNCH_RECURSIVE_MUTEX lock_; - long sessions_; + int sessions_; Receiver *list_receivers_[MAX_RECEIVERS]; }; Acceptor::Acceptor (void) : sessions_ (0) { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); for (int i = 0; i < MAX_RECEIVERS; ++i) this->list_receivers_[i] = 0; @@ -320,7 +393,7 @@ Acceptor::stop(void) // This method can be called only after proactor event loop is done // in all threads. - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); for (int i = 0; i < MAX_RECEIVERS; ++i) { @@ -332,7 +405,7 @@ Acceptor::stop(void) void Acceptor::on_new_receiver (Receiver & rcvr) { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->sessions_++; this->list_receivers_[rcvr.index_] = &rcvr; ACE_DEBUG ((LM_DEBUG, @@ -343,7 +416,7 @@ Acceptor::on_new_receiver (Receiver & rcvr) void Acceptor::on_delete_receiver (Receiver & rcvr) { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->sessions_--; if (rcvr.index_ >= 0 @@ -352,14 +425,14 @@ Acceptor::on_delete_receiver (Receiver & rcvr) this->list_receivers_[rcvr.index_] = 0; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Receiver::~DTOR sessions_=%d\n"), + ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"), this->sessions_)); } Receiver * Acceptor::make_handler (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0); if (this->sessions_ >= MAX_RECEIVERS) return 0; @@ -397,7 +470,7 @@ Receiver::~Receiver (void) ACE_OS::closesocket (this->handle_); this->index_ = -1; - this->handle_= ACE_INVALID_HANDLE ; + this->handle_= ACE_INVALID_HANDLE; } // return true if we alive, false we commited suicide @@ -405,7 +478,7 @@ int Receiver::check_destroy (void) { { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); if (this->io_count_ > 0) return 1; @@ -437,7 +510,7 @@ Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) int Receiver::initiate_read_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, @@ -461,7 +534,7 @@ Receiver::initiate_read_stream (void) int Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); if (nbytes <= 0) { mb.release (); @@ -550,7 +623,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) mb.release (); { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; } @@ -617,7 +690,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) } { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; } @@ -631,7 +704,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) class Sender : public ACE_Service_Handler { public: - static long get_number_sessions (void) { return sessions_; } + static int get_number_sessions (void) { return sessions_; } static void init (void); static int activate (int num); static void stop (void); @@ -668,17 +741,17 @@ private: static Sender *list_senders_[MAX_SENDERS]; - static long sessions_; + static int sessions_; }; -long Sender::sessions_ = 0; +int Sender::sessions_ = 0; Sender * Sender::list_senders_[MAX_SENDERS]; void Sender::init (void) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - locker, + monitor, *ACE_Static_Object_Lock::instance ()); for (int i = 0; i < MAX_SENDERS; ++i) @@ -689,7 +762,7 @@ int Sender::activate (int num) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, - locker, + monitor, *ACE_Static_Object_Lock::instance (), -1); @@ -701,7 +774,7 @@ Sender::activate (int num) int rc = 0; - for (int i = 0; i < MAX_SENDERS && rc < num ; ++i) + for (int i = 0; i < MAX_SENDERS && rc < num; ++i) { if (list_senders_[i] != 0) continue; @@ -728,10 +801,10 @@ Sender::stop (void) // in all threads ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - locker, + monitor, *ACE_Static_Object_Lock::instance ()); - for (int i = 0; i < MAX_SENDERS ; ++i) + for (int i = 0; i < MAX_SENDERS; ++i) { delete list_senders_[i]; list_senders_[i] = 0; @@ -744,10 +817,11 @@ Sender::Sender (int index) io_count_ (0) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - locker, + monitor, *ACE_Static_Object_Lock::instance ()); + this->sessions_++; - this->list_senders_[index_] = this ; + this->list_senders_[index_] = this; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender::CTOR sessions_ = %d\n"), @@ -759,7 +833,7 @@ Sender::Sender (int index) Sender::~Sender (void) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, - locker, + monitor, *ACE_Static_Object_Lock::instance ()); if (this->handle_ != ACE_INVALID_HANDLE) @@ -771,7 +845,7 @@ Sender::~Sender (void) this->sessions_--; if (this->index_ >= 0) { - this->list_senders_[this->index_] = 0 ; + this->list_senders_[this->index_] = 0; this->index_ = -1; } @@ -785,7 +859,7 @@ int Sender::check_destroy (void) { { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); if (this->io_count_ > 0) return 1; @@ -834,7 +908,7 @@ int Sender::open_sender (const ACE_TCHAR *host, u_short port) { if (duplex != 0) // Start an asynchronous read file - this->initiate_read_stream () ; + this->initiate_read_stream (); } this->check_destroy (); @@ -845,7 +919,7 @@ int Sender::open_sender (const ACE_TCHAR *host, u_short port) int Sender::initiate_write_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); int nbytes = ACE_OS::strlen (send_buf_); @@ -874,7 +948,7 @@ Sender::initiate_write_stream (void) int Sender::initiate_read_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1); + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, @@ -958,7 +1032,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) } { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; } @@ -1027,7 +1101,7 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) } { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_); + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; } @@ -1060,6 +1134,7 @@ print_usage (int /* argc */, ACE_TCHAR *argv[]) ACE_TEXT ("\n-v log level") ACE_TEXT ("\n 0 - log all messages") ACE_TEXT ("\n 1 - log only errors and unusual cases") + ACE_TEXT ("\n-i time to run in seconds") ACE_TEXT ("\n-u show this message") ACE_TEXT ("\n"), argv[0] @@ -1112,18 +1187,26 @@ parse_args (int argc, ACE_TCHAR *argv[]) #endif threads = 3; // size of Proactor thread pool senders = 20; // number of senders - logflag = 1 ; // log to : 0 STDERR / 1 FILE + logflag = 1; // log to : 0 STDERR / 1 FILE loglevel = 0; // log level : 0 full/ 1 only errors + seconds = 2; // time to run in seconds return 0; } - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:o:n:p:d:h:s:v:ub")); + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:t:o:n:p:d:h:s:v:ub")); int c; while ((c = get_opt ()) != EOF) { switch (c) { + case 'i': // time to run + seconds = ACE_OS::atoi (get_opt.optarg); + if ( seconds < MIN_TIME ) + seconds = MIN_TIME; + if ( seconds > MAX_TIME ) + seconds = MAX_TIME; + break; case 'b': // both client and server both = 1; break; @@ -1174,38 +1257,39 @@ main (int argc, ACE_TCHAR *argv[]) ::disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); ::disable_signal (SIGPIPE, SIGPIPE); - MyTask task1; + MyTask task1; + Acceptor acceptor; - if (task1.activate (THR_NEW_LWP, threads) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p.\n"), - ACE_TEXT ("main")), - -1); + Sender::init (); - // wait for creation of Proactor - task1.waitready (); + if ( task1.start ( threads, + proactor_type, + max_aio_operations) == 0 ) + { + int rc = 0; - Acceptor acceptor; - Sender::init (); + if (both != 0 || host == 0) // Acceptor + { + // Simplify, initial read with zero size + if (acceptor.open (ACE_INET_Addr (port), 0, 1) == 0) + rc = 1; + } - int rc = 0; + if (both != 0 || host != 0) + { + if (host == 0) + host = ACE_TEXT ("localhost"); + + rc += Sender::activate (senders); + } - if (both != 0 || host == 0) // Acceptor - // Simplify, initial read with zero size - if (acceptor.open (ACE_INET_Addr (port), 0, 1) == 0) - rc = 1; + if ( rc > 0 ) + ACE_OS::sleep (seconds); - if (both != 0 || host != 0) - { - if (host == 0) - host = ACE_TEXT ("localhost"); - rc += Sender::activate (senders); } - ACE_Proactor::end_event_loop (); - - task1.wait (); + task1.stop (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nNumber of Receivers objects = %d\n") |