diff options
Diffstat (limited to 'ACE/tests/Reactor_Notify_Test.cpp')
-rw-r--r-- | ACE/tests/Reactor_Notify_Test.cpp | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/ACE/tests/Reactor_Notify_Test.cpp b/ACE/tests/Reactor_Notify_Test.cpp new file mode 100644 index 00000000000..a24bf4770a8 --- /dev/null +++ b/ACE/tests/Reactor_Notify_Test.cpp @@ -0,0 +1,493 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Reactor_Notify_Test.cpp +// +// = DESCRIPTION +// This is a test that illustrates how the <ACE_Reactor>'s +// <notify> method works under various <max_notify_iterations> +// settings. It also tests that the <disable_notify_pipe> option +// works correctly. Moreover, if the $ACE_ROOT/ace/config.h file +// has the ACE_HAS_REACTOR_NOTIFICATION_QUEUE option enabled this +// test will also exercise this feature. +// +// = AUTHOR +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// ============================================================================ + +#include "test_config.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Synch_Traits.h" +#include "ace/Task.h" +#include "ace/Pipe.h" +#include "ace/Auto_Ptr.h" +#include "ace/Reactor.h" +#include "ace/Select_Reactor.h" +#include "ace/Thread_Semaphore.h" + +ACE_RCSID(tests, Reactor_Notify_Test, "$Id$") + +#if defined (ACE_HAS_THREADS) + +static const int LONG_TIMEOUT = 10; +static const int SHORT_TIMEOUT = 2; + +class Supplier_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Supplier_Task (int disable_notify_pipe, + const ACE_Time_Value &tv); + // Constructor. + + ~Supplier_Task (void); + // Destructor. + + virtual int open (void * = 0); + // Make this an Active Object. + + virtual int close (u_long); + // Close down the supplier. + + virtual int svc (void); + // Generates events and sends them to the <Reactor>'s <notify> + // method. + + virtual int handle_exception (ACE_HANDLE); + // Releases the <waiter_> semaphore when called by the <Reactor>'s + // notify handler. + + virtual int handle_output (ACE_HANDLE); + // Called every time through the main <ACE_Reactor> event loop to + // illustrate the difference between "limited" and "unlimited" + // notification. + + void release (void); + // Release the <waiter_>. + +private: + int perform_notifications (int notifications); + // Perform the notifications. + + ACE_Thread_Semaphore waiter_; + // Used to hand-shake between the <Supplier_Task> and the + // <Reactor>'s notify mechanism. + + ACE_Pipe pipe_; + // We use this pipe just to get a handle that is always "active," + // i.e., the <ACE_Reactor> will always dispatch its <handle_output> + // method. + + int disable_notify_pipe_; + // Keeps track of whether the notification pipe in the <ACE_Reactor> + // has been diabled or not. + + int long_timeout_; + // Keeps track of whether we're running with a <LONG_TIMEOUT>, which + // is used for the ACE_HAS_REACTOR_NOTIFICATION_QUEUE portion of + // this test. +}; + +void +Supplier_Task::release (void) +{ + this->waiter_.release (); +} + +Supplier_Task::Supplier_Task (int disable_notify_pipe, + const ACE_Time_Value &tv) + : waiter_ ((unsigned int) 0), // Make semaphore "locked" by default. + disable_notify_pipe_ (disable_notify_pipe), + long_timeout_ (tv.sec () == LONG_TIMEOUT) +{ +} + +int +Supplier_Task::open (void *) +{ + // Create the pipe. + int result; + + result = this->pipe_.open (); + ACE_ASSERT (result != -1); + + // Register the pipe's write handle with the <Reactor> for writing. + // This should mean that it's always "active." + if (long_timeout_ == 0) + { + result = ACE_Reactor::instance ()->register_handler + (this->pipe_.write_handle (), + this, + ACE_Event_Handler::WRITE_MASK); + ACE_ASSERT (result != -1); + } + + // Make this an Active Object. + result = this->activate (THR_BOUND | THR_DETACHED); + ACE_ASSERT (result != -1); + return 0; +} + +int +Supplier_Task::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Supplier_Task::close\n"))); + + int result; + + if (long_timeout_ == 0) + { + result = ACE_Reactor::instance ()->remove_handler + (this->pipe_.write_handle (), + ACE_Event_Handler::WRITE_MASK); + ACE_ASSERT (result != -1); + } + else + { + // Wait to be told to shutdown by the main thread. + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) waiting to be shutdown by main thread\n"))); + result = this->waiter_.acquire (); + ACE_ASSERT (result != -1); + } + return 0; +} + +Supplier_Task::~Supplier_Task (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) ~Supplier_Task\n"))); + this->pipe_.close (); +} + +int +Supplier_Task::perform_notifications (int notifications) +{ + ACE_Reactor::instance ()->max_notify_iterations (notifications); + + size_t iterations = ACE_MAX_ITERATIONS; + + if (this->long_timeout_) + { + iterations *= (iterations * iterations * 2); +#if defined (ACE_VXWORKS) + // scale down otherwise the test won'y finish in time + iterations /= 4; +#endif + } + + for (size_t i = 0; i < iterations; i++) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) notifying reactor on iteration %d\n"), + i)); + + int result; + + // Notify the Reactor, which will call <handle_exception>. + result = ACE_Reactor::instance ()->notify (this); + if (result == -1) + { + if (errno == ETIME) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("notify"))); + else + ACE_ASSERT (result != -1); + } + + // Wait for our <handle_exception> method to release the + // semaphore. + if (this->long_timeout_ == 0 + && this->disable_notify_pipe_ == 0) + { + result = this->waiter_.acquire (); + ACE_ASSERT (result != -1); + } + } + + return 0; +} + +int +Supplier_Task::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** starting unlimited notifications test\n"))); + + // Allow an unlimited number of iterations per + // <ACE_Reactor::notify>. + this->perform_notifications (-1); + + if (this->long_timeout_ == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** starting limited notifications test\n"))); + + // Only allow 1 iteration per <ACE_Reactor::notify> + this->perform_notifications (1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** exiting thread test\n"))); + } + return 0; +} + +int +Supplier_Task::handle_exception (ACE_HANDLE handle) +{ + ACE_ASSERT (handle == ACE_INVALID_HANDLE); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) handle_exception\n"))); + + this->waiter_.release (); + return 0; +} + +int +Supplier_Task::handle_output (ACE_HANDLE handle) +{ + ACE_ASSERT (handle == this->pipe_.write_handle ()); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) handle_output\n"))); + + // This function is called by the main thread, believe it or not :-) + // That's because the pipe's write handle is always active. Thus, + // we can give the <Supplier_Task> a chance to run in its own + // thread. + ACE_OS::thr_yield (); + + return 0; +} + +static int +run_test (int disable_notify_pipe, + const ACE_Time_Value &tv) +{ + // Create special reactors with the appropriate flags enabled. + + ACE_Select_Reactor *reactor_impl = 0; + if (disable_notify_pipe) + ACE_NEW_RETURN (reactor_impl, + ACE_Select_Reactor (0, 0, 1), + -1); + else + ACE_NEW_RETURN (reactor_impl, + ACE_Select_Reactor, + -1); + + ACE_Reactor *reactor; + ACE_NEW_RETURN (reactor, + ACE_Reactor (reactor_impl, 1), // Delete implementation + -1); + + // Make sure this stuff gets cleaned up when this function exits. + auto_ptr<ACE_Reactor> r (reactor); + + // Set the Singleton Reactor. + ACE_Reactor *orig_reactor = ACE_Reactor::instance (reactor); + ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); + ACE_ASSERT (ACE_Reactor::instance () == reactor); + + Supplier_Task task (disable_notify_pipe, + tv); + ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); + + int result; + + result = task.open (); + ACE_ASSERT (result != -1); + + if (tv.sec () == LONG_TIMEOUT) + // Sleep for a while so that the <ACE_Reactor>'s notification + // buffers will fill up! + ACE_OS::sleep (tv); + + int shutdown = 0; + + // Run the event loop that handles the <handle_output> and + // <handle_exception> notifications. + for (int iteration = 1; + shutdown == 0; + iteration++) + { + ACE_Time_Value timeout (tv); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) starting handle_events() on iteration %d") + ACE_TEXT (" with time-out = %d seconds\n"), + iteration, + timeout.sec ())); + + // Use a timeout to inform the Reactor when to shutdown. + switch (ACE_Reactor::instance ()->handle_events (timeout)) + { + case -1: + if (! disable_notify_pipe) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("reactor"))); + shutdown = 1; + break; + /* NOTREACHED */ + case 0: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) handle_events timed out\n"))); + shutdown = 1; + break; + /* NOTREACHED */ + default: + break; + /* NOTREACHED */ + } + } + + if (tv.sec () == LONG_TIMEOUT) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) releasing supplier task thread\n"))); + task.release (); + } + ACE_Reactor::instance (orig_reactor); + return 0; +} + +#endif /* ACE_HAS_THREADS */ + +class Purged_Notify : public ACE_Event_Handler +{ + // = TITLE + // <run_notify_purge_test> tests the reactor's + // purge_pending_notifications function. It does 2 notifications, + // and explicitly cancels one, and deletes the other's event + // handler, which should cause it to be cancelled as well. + + virtual int handle_exception (ACE_HANDLE = ACE_INVALID_HANDLE) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Got a notify that should have been purged!\n")), + 0); + } +}; + +static int +run_notify_purge_test (void) +{ + int status; + ACE_Reactor *r = ACE_Reactor::instance (); + { + Purged_Notify n1; + Purged_Notify *n2; + + ACE_NEW_RETURN (n2, Purged_Notify, -1); + auto_ptr<Purged_Notify> ap (n2); + + // First test: + // Notify EXCEPT, and purge ALL + r->notify (&n1); // the mask is EXCEPT_MASK + + status = r->purge_pending_notifications (&n1); + if (status == -1 && errno == ENOTSUP) + return 0; // Select Reactor w/o ACE_HAS_REACTOR_NOTIFICATION_QUEUE + if (status != 1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Purged %d notifies; expected 1\n"), + status)); + // Second test: + // Notify READ twice, and WRITE once, and purge READ and WRITE - should purge 3 times. + r->notify (&n1, ACE_Event_Handler::READ_MASK); + r->notify (&n1, ACE_Event_Handler::READ_MASK); + r->notify (&n1, ACE_Event_Handler::WRITE_MASK); + status = r->purge_pending_notifications + (&n1, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK); + if (status != 3) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Purged %d notifies; expected 3\n"), + status)); + // Third test: + // Notify READ on 2 handlers, and purge READ|WRITE on all handlers. Should purge 2 + r->notify (&n1, ACE_Event_Handler::READ_MASK); + r->notify (n2, ACE_Event_Handler::READ_MASK); + status = r->purge_pending_notifications + (0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK); + if (status != 2) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Purged %d notifies; expected 2\n"), + status)); + // Forth test: + // Notify EXCEPT and WRITE, purge READ. Should not purge + r->notify (&n1); // the mask is EXCEPT_MASK + r->notify (&n1, ACE_Event_Handler::WRITE_MASK); + status = r->purge_pending_notifications + (&n1, ACE_Event_Handler::READ_MASK); + if (status != 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Purged %d notifies; expected 0\n"), + status)); + // Fifth test: + r->notify (n2); + // <ap> destructor should cause n2's notify to be cancelled. + } + + ACE_Time_Value t (1); + status = r->handle_events (t); // Should be nothing to do, and time out + return status < 0 ? 1 : 0; // Return 0 for all ok, else error +} + + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Reactor_Notify_Test")); + + // To automatically delete the ACE_Reactor instance at program + // termination: + auto_ptr<ACE_Reactor> r (ACE_Reactor::instance ()); + + int test_result = 0; // Innocent until proven guilty + + test_result = run_notify_purge_test (); + if (test_result == 0) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("purge_pending_notifications test OK\n"))); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("purge_pending_notifications test FAIL\n"))); + +#if defined (ACE_HAS_THREADS) + ACE_Time_Value timeout (SHORT_TIMEOUT); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) running tests with notify pipe enabled") + ACE_TEXT (" and time-out = %d seconds\n"), + timeout.sec ())); + run_test (0, timeout); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) running tests with notify pipe disabled") + ACE_TEXT (" and time-out = %d seconds\n"), + timeout.sec ())); + run_test (1, timeout); + + timeout.set (LONG_TIMEOUT, 0); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) running tests with reactor notification ") + ACE_TEXT ("pipe enabled\n") + ACE_TEXT (" and time-out = %d seconds\n"), + timeout.sec ())); + run_test (0, timeout); + +#else + ACE_ERROR ((LM_INFO, + ACE_TEXT ("threads not supported on this platform\n"))); +#endif /* ACE_HAS_THREADS */ + ACE_END_TEST; + return test_result; +} + |