diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-09-01 05:27:46 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-09-01 05:27:46 +0000 |
commit | 7c9df2b94260de0683498f181e8550f6382ad702 (patch) | |
tree | 3ecd37f7db1de5869eecacf0074a491ac5200705 | |
parent | 026bcd6b9b76835c8cdc1d4d32fafc25f5fd85ab (diff) | |
download | ATCD-7c9df2b94260de0683498f181e8550f6382ad702.tar.gz |
ChangeLogTag:Wed Sep 1 00:05:04 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r-- | ChangeLog-99b | 64 | ||||
-rw-r--r-- | ace/Containers_T.cpp | 2 | ||||
-rw-r--r-- | ace/Containers_T.h | 3 | ||||
-rw-r--r-- | ace/Event_Handler.h | 2 | ||||
-rw-r--r-- | ace/OS.h | 4 | ||||
-rw-r--r-- | ace/Process_Manager.cpp | 1 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 164 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 15 | ||||
-rw-r--r-- | tests/Reactor_Notify_Test.cpp | 314 |
9 files changed, 434 insertions, 135 deletions
diff --git a/ChangeLog-99b b/ChangeLog-99b index 68655b36a09..d6a3906880c 100644 --- a/ChangeLog-99b +++ b/ChangeLog-99b @@ -1,19 +1,41 @@ +Wed Sep 1 00:05:04 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> -Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> + * tests/Reactor_Notify_Test.cpp: Modified this test to exercise + the new user-level notification queueing in the ACE_Reactor. + +Tue Aug 31 16:05:14 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> - * ace/OS.h: - Added the ACE_HAS_NO_THROW_SPEC check in order to suppress the - generation of throw specs to placate some compilers - Thank to Andreas Geisler <Andreas.Geisler@erls.siemens.de> for - suggesting this addition + * ace/Process_Manager.cpp (handle_close): Added an ACE_UNUSED_ARG + to keep the compiler from warning that the "handle" parameter + isn't used. + * ace/OS.h: Added a new #defined called + ACE_REACTOR_NOTIFICATION_ARRAY_SIZE that designates the size of + the array of ACE_Notification_Buffers used by the new Reactor + user-level notification queue feature. + + * ace/Select_Reactor_Base: Added support for a user-level + notification queue that can buffer very large amounts of pending + notifications, i.e., well beyond the 64 kbyte limit imposed by + the use of a socket. This feature is only enabled if + ACE_HAS_REACTOR_NOTIFICATION_QUEUE is enabled in a config.h + file. Thanks to Detlef Becker <Detlef.Becker@med.siemens.de> + for suggesting this and contributing a prototype implementation. + +Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> + + * ace/OS.h: Added the ACE_HAS_NO_THROW_SPEC check in order to + suppress the generation of throw specs to placate some compilers + and make it easier for CORBA developers to by lazy wrt C++ + exception throw specs ;-). Thank to Andreas Geisler + <Andreas.Geisler@erls.siemens.de> for suggesting this addition 1999-08-31 Vishal Kachroo <vishal@cs.wustl.edu> - Modified the test to use a single handler registered for - QOS_MASK|READ_MASK instead of two separate handlers. Had to - remove the following files and add other files. - + Modified the ACE GQoS test to use a single handler registered + for QOS_MASK|READ_MASK instead of two separate handlers. Had to + remove the following files and add other files. + * ACE_wrappers/examples/QOS/QOS_Event_Handler.cpp * ACE_wrappers/examples/QOS/Read_Handler.h * ACE_wrappers/examples/QOS/QOS_Event_Handler.h @@ -23,30 +45,30 @@ Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> The Sender event handler is only registered for QOS events. It uses the ACE_OS::sendto () as soon as it receives a QOS event. - + * ACE_wrappers/examples/QOS/Sender_QOS_Event_Handler.cpp * ACE_wrappers/examples/QOS/Sender_QOS_Event_Handler.h The Receiver event handler is registered for both QOS as well as - READ events since it has to initially wait for a PATH RSVP message - from the sender (QOS event, finally!!) and then read from the - same QOS enabled socket. - + READ events since it has to initially wait for a PATH RSVP + message from the sender (QOS event, finally!!) and then read + from the same QOS enabled socket. + * ACE_wrappers/examples/QOS/Receiver_QOS_Event_Handler.cpp * ACE_wrappers/examples/QOS/Receiver_QOS_Event_Handler.cpp The client and server have been modified to subscribe to the - same multicast session. The RSVP SP on the receiver side sends - RESV messages at the earliest indication of QOS parameters AND - existence of a matching PATH state. This state is found only if - a multicast socket has been created (by the sender) with a - matching "multicast session" address. + same multicast session. The RSVP SP on the receiver side sends + RESV messages at the earliest indication of QOS parameters AND + existence of a matching PATH state. This state is found only if + a multicast socket has been created (by the sender) with a + matching "multicast session" address. * ACE_wrappers/examples/QOS/server.cpp: * ACE_wrappers/examples/QOS/client.cpp: Modified the following to incorporate the removal and addition - of files listed above. + of files listed above. * ACE_wrappers/examples/QOS/QOS.dsw * ACE_wrappers/examples/QOS/client.dsp diff --git a/ace/Containers_T.cpp b/ace/Containers_T.cpp index 9956bdcdb52..c21860c5ff1 100644 --- a/ace/Containers_T.cpp +++ b/ace/Containers_T.cpp @@ -416,7 +416,7 @@ ACE_Unbounded_Queue<T>::dump (void) const for (ACE_Unbounded_Queue_Iterator<T> iter (*(ACE_Unbounded_Queue<T> *) this); iter.next (item) != 0; iter.advance ()) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++)); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } diff --git a/ace/Containers_T.h b/ace/Containers_T.h index 410c1d23306..222d3a4dc34 100644 --- a/ace/Containers_T.h +++ b/ace/Containers_T.h @@ -475,7 +475,8 @@ public: // = Additional utility methods. void reset (void); - // Reset the <ACE_Unbounded_Queue> to be empty. + // Reset the <ACE_Unbounded_Queue> to be empty and release all its + // dynamically allocated resources. int get (T *&item, size_t slot = 0) const; // Get the <slot>th element in the set. Returns -1 if the element diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h index 3ed97652830..1f33a298f36 100644 --- a/ace/Event_Handler.h +++ b/ace/Event_Handler.h @@ -10,7 +10,7 @@ // Event_Handler.h // // = AUTHOR -// Doug Schmidt +// Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -56,6 +56,10 @@ enum ACE_Recyclable_State #define ACE_DEFAULT_SERVICE_REPOSITORY_SIZE 1024 #endif /* ACE_DEFAULT_SERVICE_REPOSITORY_SIZE */ +#if !defined (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE) +#define ACE_REACTOR_NOTIFICATION_ARRAY_SIZE 1024 +#endif /* ACE_REACTOR_NOTIFICATION_ARRAY_SIZE */ + // Do not change these values wantonly since GPERF depends on them.. #define ACE_ASCII_SIZE 128 #define ACE_EBCDIC_SIZE 256 diff --git a/ace/Process_Manager.cpp b/ace/Process_Manager.cpp index 961d9b25345..111c2d19ef8 100644 --- a/ace/Process_Manager.cpp +++ b/ace/Process_Manager.cpp @@ -340,6 +340,7 @@ ACE_Process_Manager::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask) { ACE_TRACE ("ACE_Process_Manager::handle_close"); + ACE_UNUSED_ARG (handle); ACE_ASSERT (handle == this->dummy_handle_); diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 670de5d78b9..017262e78ac 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -517,6 +517,22 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, if (this->notification_pipe_.open () == -1) return -1; +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer *temp; + + ACE_NEW_RETURN (temp, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_set_.enqueue_head (temp) == -1) + return -1; + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++) + if (free_set_.enqueue_head (temp + i) == -1) + return -1; + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + // There seems to be a Win32 bug with this... Set this into // non-blocking mode. if (ACE::set_flags (this->notification_pipe_.read_handle (), @@ -539,6 +555,21 @@ int ACE_Select_Reactor_Notify::close (void) { ACE_TRACE ("ACE_Select_Reactor_Notify::close"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Free up the dynamically allocated resources. + ACE_Notification_Buffer **b; + + for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_set_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + delete [] *b; + + this->alloc_set_.reset (); + this->notify_set_.reset (); + this->free_set_.reset (); +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + return this->notification_pipe_.close (); } @@ -555,6 +586,58 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, return 0; else { +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer buffer (eh, mask); + int notification_required = 0; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + // No pending notifications. + if (this->notify_set_.is_empty ()) + notification_required = 1; + + ACE_Notification_Buffer *temp = 0; + + if (free_set_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; + + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_set_.enqueue_head (temp1) == -1) + return -1; + + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_set_.enqueue_head (temp1 + i); + + temp = temp1; + } + + ACE_ASSERT (temp != 0); + *temp = buffer; + + if (notify_set_.enqueue_tail (temp) == -1) + return -1; + + if (notification_required) + { + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + if (n == -1) + return -1; + } + return 0; +#else ACE_Notification_Buffer buffer (eh, mask); ssize_t n = ACE::send (this->notification_pipe_.write_handle (), @@ -565,6 +648,7 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, return -1; else return 0; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ } } @@ -624,11 +708,68 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) return -1; } +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the <notify_set_>. + for (;;) + { + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + if (notify_set_.is_empty ()) + break; + else if (notify_set_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_set_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("enqueue_head")), + -1); + } + + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + int result = 0; + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + } +#else // If eh == 0 then another thread is unblocking the - // ACE_Select_Reactor to update the ACE_Select_Reactor's + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the ACE_Event_Handler pointer - // we've been passed. + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. if (buffer.eh_ != 0) { int result = 0; @@ -662,6 +803,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) ACE_Event_Handler::EXCEPT_MASK); } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ number_dispatched++; // Bail out if we've reached the <notify_threshold_>. Note that @@ -679,7 +821,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) // Enqueue ourselves into the list of waiting threads. When we // reacquire the token we'll be off and running again with ownership // of the token. The postcondition of this call is that - // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); + // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. this->select_reactor_->renew (); return number_dispatched; } @@ -781,3 +923,17 @@ ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle, } return omask; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>; +template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>; +template class ACE_Node <ACE_Notification_Buffer *>; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *> +#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *> +#pragma instantiate ACE_Node <ACE_Notification_Buffer *> +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index 10e333cb144..45e87f3be84 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -169,6 +169,21 @@ private: // dispatch the <ACE_Event_Handlers> that are passed in via the // notify pipe before breaking out of its <recv> loop. By default, // this is set to -1, which means "iterate until the pipe is empty." + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_set_; + // Keeps track of allocated arrays of type + // <ACE_Notification_Buffer>. + + ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_set_; + // Keeps track of all pending notifications. + + ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_set_; + // Keeps track of all free buffers. + + ACE_SYNCH_MUTEX notify_queue_lock_; + // synchronization for handling of queues +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; class ACE_Export ACE_Select_Reactor_Handler_Repository diff --git a/tests/Reactor_Notify_Test.cpp b/tests/Reactor_Notify_Test.cpp index 5879c6e1bef..50a4583400a 100644 --- a/tests/Reactor_Notify_Test.cpp +++ b/tests/Reactor_Notify_Test.cpp @@ -6,16 +6,18 @@ // tests // // = FILENAME -// Reactors_Test.cpp +// Reactor_Notify_Test.cpp // // = DESCRIPTION // This is a test that illustrates how the <ACE_Reactor>'s <notify> // method works under various <max_notify_iterations> settings. // It also tests that the <disable_notify_pipe> option works -// correctly. +// correctly. Moreover, if the $ACE_ROOT/ace/config.h file has +// the ACE_HAS_REACTOR_NOTIFICATION_QUEUE option enabled this +// test will also exercise this feature. // // = AUTHOR -// Douglas C. Schmidt +// Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -23,6 +25,7 @@ #include "ace/Synch.h" #include "ace/Task.h" #include "ace/Pipe.h" +#include "ace/Auto_Ptr.h" #include "ace/Select_Reactor.h" ACE_RCSID(tests, Reactor_Notify_Test, "$Id$") @@ -34,10 +37,14 @@ USELIB("..\ace\aced.lib"); #if defined (ACE_HAS_THREADS) +static const int LONG_TIMEOUT = 10; +static const int SHORT_TIMEOUT = 2; + class Supplier_Task : public ACE_Task<ACE_MT_SYNCH> { public: - Supplier_Task (int disable_notify_pipe); + Supplier_Task (int disable_notify_pipe, + const ACE_Time_Value &tv); // Constructor. ~Supplier_Task (void); @@ -62,6 +69,9 @@ public: // illustrate the difference between "limited" and "unlimited" // notification. + void release (void); + // Release the <waiter_>. + private: int perform_notifications (int notifications); // Perform the notifications. @@ -71,17 +81,31 @@ private: // <Reactor>'s notify mechanism. ACE_Pipe pipe_; - // We use this pipe just so we can get a handle that is always - // "active." + // We use this pipe just to get a handle that is always "active," + // i.e., the <ACE_Reactor> will always dispatch its <handle_output> + // method. int disable_notify_pipe_; // Keeps track of whether the notification pipe in the <ACE_Reactor> // has been diabled or not. + + int long_timeout_; + // Keeps track of whether we're running with a <LONG_TIMEOUT>, which + // is used for the ACE_HAS_REACTOR_NOTIFICATION_QUEUE portion of + // this test. }; -Supplier_Task::Supplier_Task (int disable_notify_pipe) +void +Supplier_Task::release (void) +{ + this->waiter_.release (); +} + +Supplier_Task::Supplier_Task (int disable_notify_pipe, + const ACE_Time_Value &tv) : waiter_ (0), // Make semaphore "locked" by default. - disable_notify_pipe_ (disable_notify_pipe) + disable_notify_pipe_ (disable_notify_pipe), + long_timeout_ (tv.sec () == LONG_TIMEOUT) { } @@ -89,48 +113,59 @@ int Supplier_Task::open (void *) { // Create the pipe. - if (this->pipe_.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("open failed")), - -1); + int result; + + result = this->pipe_.open (); + ACE_ASSERT (result != -1); + // Register the pipe's write handle with the <Reactor> for writing. // This should mean that it's always "active." - else if (ACE_Reactor::instance ()->register_handler - (this->pipe_.write_handle (), - this, - ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("register_handler failed")), - -1); + if (long_timeout_ == 0) + { + result = ACE_Reactor::instance ()->register_handler + (this->pipe_.write_handle (), + this, + ACE_Event_Handler::WRITE_MASK); + ACE_ASSERT (result != -1); + } + // Make this an Active Object. - else if (this->activate (THR_BOUND | THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("activate failed")), - -1); - else - return 0; + result = this->activate (THR_BOUND | THR_DETACHED); + ACE_ASSERT (result != -1); + return 0; } int Supplier_Task::close (u_long) { - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) Supplier_Task::close\n"))); - - if (ACE_Reactor::instance ()->remove_handler - (this->pipe_.write_handle (), - ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("remove_handler failed"))); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) Supplier_Task::close\n"))); + + int result; + + if (long_timeout_ == 0) + { + result = ACE_Reactor::instance ()->remove_handler + (this->pipe_.write_handle (), + ACE_Event_Handler::WRITE_MASK); + ACE_ASSERT (result != -1); + } + else + { + // Wait to be told to shutdown by the main thread. + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) waiting to be shutdown by main thread\n"))); + result = this->waiter_.acquire (); + ACE_ASSERT (result != -1); + } return 0; } Supplier_Task::~Supplier_Task (void) { - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) ~Supplier_Task\n"))); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) ~Supplier_Task\n"))); this->pipe_.close (); } @@ -139,26 +174,41 @@ Supplier_Task::perform_notifications (int notifications) { ACE_Reactor::instance ()->max_notify_iterations (notifications); - for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++) + int iterations = ACE_MAX_ITERATIONS; + + if (this->long_timeout_) + iterations *= (iterations * iterations * iterations); + + for (size_t i = 0; i < iterations; i++) { ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) notifying reactor\n"))); + ASYS_TEXT ("(%t) notifying reactor on iteration %d\n"), + i)); + + int result; + // Notify the Reactor, which will call <handle_exception>. - if (ACE_Reactor::instance ()->notify (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("notify")), - -1); + result = ACE_Reactor::instance ()->notify (this); + if (result == -1) + { + if (errno == ETIME) + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) %p\n"), + ASYS_TEXT ("notify"))); + else + ACE_ASSERT (result = -1); + } // Wait for our <handle_exception> method to release the // semaphore. - else if (this->disable_notify_pipe_ == 0 - && this->waiter_.acquire () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("acquire")), - -1); + if (this->long_timeout_ == 0 + && this->disable_notify_pipe_ == 0) + { + result = this->waiter_.acquire (); + ACE_ASSERT (result != -1); + } } + return 0; } @@ -170,19 +220,19 @@ Supplier_Task::svc (void) // Allow an unlimited number of iterations per // <ACE_Reactor::notify>. - if (this->perform_notifications (-1) == -1) - return -1; - - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) **** starting limited notifications test\n"))); - - // Only allow 1 iteration per <ACE_Reactor::notify> + this->perform_notifications (-1); - if (this->perform_notifications (1) == -1) - return -1; + if (this->long_timeout_ == 0) + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) **** starting limited notifications test\n"))); - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) **** exiting thread test\n"))); + // Only allow 1 iteration per <ACE_Reactor::notify> + this->perform_notifications (1); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) **** exiting thread test\n"))); + } return 0; } @@ -205,75 +255,103 @@ Supplier_Task::handle_output (ACE_HANDLE handle) ASYS_TEXT ("(%t) handle_output\n"))); // This function is called by the main thread, believe it or not :-) - // That's because the pipe's write handle is always active. So, - // give the <Supplier_Task> a chance to run. + // That's because the pipe's write handle is always active. Thus, + // we can give the <Supplier_Task> a chance to run in its own + // thread. ACE_OS::thr_yield (); return 0; } -static void -run_test (int disable_notify_pipe) +static int +run_test (int disable_notify_pipe, + const ACE_Time_Value &tv) { // Create special reactors with the appropriate flags enabled. ACE_Select_Reactor *reactor_impl = 0; if (disable_notify_pipe) - ACE_NEW (reactor_impl, - ACE_Select_Reactor (0, 0, 1)); + ACE_NEW_RETURN (reactor_impl, + ACE_Select_Reactor (0, 0, 1), + -1); else - ACE_NEW (reactor_impl, - ACE_Select_Reactor); + ACE_NEW_RETURN (reactor_impl, + ACE_Select_Reactor, + -1); ACE_Reactor *reactor; - ACE_NEW (reactor, - ACE_Reactor (reactor_impl)); + ACE_NEW_RETURN (reactor, + ACE_Reactor (reactor_impl), + -1); + + // Make sure this stuff gets cleaned up when this function exits. + auto_ptr<ACE_Reactor> r (reactor); + auto_ptr<ACE_Select_Reactor> ri (reactor_impl); // Set the Singleton Reactor. ACE_Reactor::instance (reactor); - ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); ACE_ASSERT (ACE_Reactor::instance () == reactor); - Supplier_Task task (disable_notify_pipe); + Supplier_Task task (disable_notify_pipe, + tv); ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); - if (task.open () == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("(%t) open failed\n"))); - else + int result; + + result = task.open (); + ACE_ASSERT (result != -1); + + if (tv.sec () == LONG_TIMEOUT) + // Sleep for a while so that the <ACE_Reactor>'s notification + // buffers will fill up! + ACE_OS::sleep (tv); + + int shutdown = 0; + + // Run the event loop that handles the <handle_output> and + // <handle_exception> notifications. + for (int iteration = 1; + shutdown == 0; + iteration++) { - int shutdown = 0; + ACE_Time_Value timeout (tv); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) starting handle_events() on iteration %d") + ASYS_TEXT (" with timeout = %d seconds\n"), + iteration, + timeout.sec ())); - // Run the event loop that handles the <handle_output> and - // <handle_exception> notifications. - for (int iteration = 1; shutdown == 0; iteration++) + // Use a timeout to inform the Reactor when to shutdown. + switch (ACE_Reactor::instance ()->handle_events (timeout)) { - ACE_Time_Value timeout (2); - - // Use a timeout to inform the Reactor when to shutdown. - switch (ACE_Reactor::instance ()->handle_events (timeout)) - { - case -1: - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("reactor"))); - shutdown = 1; - break; - /* NOTREACHED */ - case 0: - shutdown = 1; - break; - /* NOTREACHED */ - default: - break; - /* NOTREACHED */ - } + case -1: + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("(%t) %p\n"), + ASYS_TEXT ("reactor"))); + shutdown = 1; + break; + /* NOTREACHED */ + case 0: + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) handle_events timed out\n"))); + shutdown = 1; + break; + /* NOTREACHED */ + default: + break; + /* NOTREACHED */ } } - delete reactor_impl; - delete reactor; + if (tv.sec () == LONG_TIMEOUT) + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) releasing supplier task thread\n"))); + task.release (); + } + return 0; } #endif /* ACE_HAS_THREADS */ @@ -284,12 +362,26 @@ main (int, ASYS_TCHAR *[]) ACE_START_TEST (ASYS_TEXT ("Reactor_Notify_Test")); #if defined (ACE_HAS_THREADS) + ACE_Time_Value timeout (SHORT_TIMEOUT); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) running tests with notify pipe enabled") + ASYS_TEXT (" and timeout = %d seconds\n"), + timeout.sec ())); + run_test (0, timeout); + ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) running tests with notify pipe enabled\n"))); - run_test (0); + ASYS_TEXT ("(%t) running tests with notify pipe diabled") + ASYS_TEXT (" and timeout = %d seconds\n"), + timeout.sec ())); + run_test (1, timeout); + + timeout.set (LONG_TIMEOUT, 0); ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) running tests with notify pipe disabled\n"))); - run_test (1); + ASYS_TEXT ("(%t) running tests with reactor notification pipe enabled\n") + ASYS_TEXT (" and timeout = %d seconds\n"), + timeout.sec ())); + run_test (0, timeout); + #else ACE_ERROR ((LM_INFO, ASYS_TEXT ("threads not supported on this platform\n"))); @@ -297,3 +389,11 @@ main (int, ASYS_TCHAR *[]) ACE_END_TEST; return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class auto_ptr<ACE_Reactor>; +template class auto_ptr<ACE_Select_Reactor>; +#else +#pragma instantiate auto_ptr<ACE_Reactor> +#pragma instantiate auto_ptr<ACE_Select_Reactor> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |