diff options
-rw-r--r-- | ACE/ChangeLog | 110 | ||||
-rw-r--r-- | ACE/ace/Dev_Poll_Reactor.cpp | 216 | ||||
-rw-r--r-- | ACE/ace/Dev_Poll_Reactor.h | 33 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.cpp | 8 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.h | 9 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.inl | 9 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.cpp | 210 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.h | 156 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.inl | 47 | ||||
-rw-r--r-- | ACE/ace/Select_Reactor_Base.cpp | 226 | ||||
-rw-r--r-- | ACE/ace/Select_Reactor_Base.h | 32 | ||||
-rw-r--r-- | ACE/ace/ace.mpc | 1 | ||||
-rw-r--r-- | ACE/ace/ace_for_tao.mpc | 1 | ||||
-rw-r--r-- | ACE/tests/Bug_2815_Regression_Test.cpp | 499 | ||||
-rw-r--r-- | ACE/tests/Notification_Queue_Unit_Test.cpp | 256 | ||||
-rw-r--r-- | ACE/tests/run_test.lst | 2 | ||||
-rw-r--r-- | ACE/tests/tests.mpc | 14 |
17 files changed, 1414 insertions, 415 deletions
diff --git a/ACE/ChangeLog b/ACE/ChangeLog index 81556fa4559..4afaa129d2a 100644 --- a/ACE/ChangeLog +++ b/ACE/ChangeLog @@ -1,3 +1,113 @@ +Thu Feb 22 13:03:01 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * Merged changes from the bug_2815 branch. From revision 77182 to + revision 77315. + + Tue Feb 20 18:17:00 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * ace/ace_for_tao.mpc: + Add Notification_Queue.cpp to this file too. Thanks to Johnny + Willemsen for pointing this out. + + Mon Feb 19 21:53:58 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * ace/Intrusive_List.inl: + I learned a minute ago that std::swap() was kosher. Use it in + favor of ACE_Swap<>. + + Mon Feb 19 21:47:48 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * tests/run_test.lst: + * tests/Bug_2815_Regression_Test.cpp: + Adjusted the number of iterations so the test would pass on my + G4-based laptop without optimizations or inlining. The previous + numbers were just guesses anyway and I hope the new numbers will + result in no failures for the scoreboard. + + Mon Feb 19 21:29:32 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * ace/Notification_Queue.cpp: + Modified the purging algorithm. The algorithm used a temporary + list to store the elements not purged, so basically all nodes + were either moved to the free list or to the temporary list. + The new algorithm only removes the nodes that needs purging. + In the vast majority of the cases this is more efficient because + most nodes are not purged. + This resulted in a factor of 2 improvement for + tests/Bug_2815_Regression_Test, keep in mind that in this + program 1/16th of the nodes are purged. So in practice I would + expect even better results. + I also changed the order in which the free nodes are re-used, I + think LIFO order has a better chance of re-using the cache, but + I have no evidence or experiments to prove this. + + * ace/Intrusive_List.h: + * ace/Intrusive_List.cpp: + Renamed the remove_i() function to unsafe_remove() and promoted + its access to "public". The function is not safe to use in + general, thus the name, but it resulted in a factor of 2 + performance improvement for ACE_Notification_Queue. + + * tests/Bug_2815_Regression_Test.cpp: + Fixed memory leaks (in the test not the library) + + Mon Feb 19 20:32:34 UTC 2007 Carlos O'Ryan <coryan@atdesk.com> + + * ace/Notification_Queue.h: + * ace/Notification_Queue.inl: + * ace/Notification_Queue.cpp: + Re-factored to use ACE_Intrusive_List in the implementation. + This eliminates memory allocations during additions and removal + of elements. Also, many of the operations cannot fail, so the + code became smaller. + + * ace/Intrusive_List.h: + * ace/Intrusive_List.inl: + Add a swap() member function. + + Mon Feb 19 04:49:35 UTC 2007 coryan <coryan@atdesk.com> + + * ace/Notification_Queue.h: + * ace/Notification_Queue.cpp: + * ace/ace.mpc: + New class to encapsulate the implementation of a user-space + based notification queue. The code was duplicated in both + Select_Reactor.{h,cpp} and Dev_Poll_Reactor.{h,cpp} + + * tests/tests.mpc: + * tests/run_test.lst: + * tests/Notification_Queue_Unit_Test.cpp: + New unit test for the notification queue class. + + * ace/Dev_Poll_Reactor.h: + * ace/Dev_Poll_Reactor.cpp: + * ace/Select_Reactor_Base.h: + * ace/Select_Reactor_Base.cpp: + Refactored notification queue code to a new class + (ACE_Notification_Queue) + + * tests/Bug_2815_Regression_Test.cpp: + Add code to help with debugging. Basically my refactoring above + had at least one bug, and this test uncovered it. But it was + hard to debug because there was no single breakpoint to detect + when the failure condition was triggered. + + * tests: + Add new files to svn:ignore property. Also added some old + files. + + * tests/SSL: + * include/makeinclude: + Add missing files to svn:ignore + + Sun Feb 18 21:13:41 UTC 2007 coryan <coryan@atdesk.com> + + * tests/tests.mpc: + * tests/Bug_2815_Regression_Test.cpp: + Add new regression test for bug 2815. I have not added the test + to the automated test suite because (1) it fails, and (2) it is + a test to reproduce performance + Thu Feb 22 12:58:42 UTC 2007 Johnny Willemsen <jwillemsen@remedy.nl> * include/makeinclude/compiler.bor: diff --git a/ACE/ace/Dev_Poll_Reactor.cpp b/ACE/ace/Dev_Poll_Reactor.cpp index 8bf63534735..abf3a432c31 100644 --- a/ACE/ace/Dev_Poll_Reactor.cpp +++ b/ACE/ace/Dev_Poll_Reactor.cpp @@ -50,10 +50,7 @@ ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void) , notification_pipe_ () , max_notify_iterations_ (-1) #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - , alloc_queue_ () - , notify_queue_ () - , free_queue_ () - , notify_queue_lock_ () + , notification_queue_ () #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ { } @@ -85,18 +82,10 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r, #endif /* F_SETFD */ #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - return -1; - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; + if (notification_queue_.open() == -1) + { + return -1; + } if (ACE::set_flags (this->notification_pipe_.write_handle (), ACE_NONBLOCK) == -1) @@ -120,20 +109,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void) ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close"); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); + notification_queue_.reset(); #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return this->notification_pipe_.close (); @@ -154,42 +130,22 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Notification_Buffer buffer (eh, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Dev_Poll_Handler_Guard eh_guard (eh); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // Locate a free buffer in the queue. Enlarge the queue if needed. - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - return -1; - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - ++i) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } + int notification_required = + notification_queue_.push_new_notification(buffer); - ACE_ASSERT (temp != 0); - *temp = buffer; + if (notification_required == -1) + { + return -1; + } - ACE_Dev_Poll_Handler_Guard eh_guard (eh); + if (notification_required == 0) + { + eh_guard.release (); - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; + return 0; + } // Now pop the pipe to force the callback for dispatching when ready. If // the send fails due to a full pipe, don't fail - assume the already-sent @@ -239,26 +195,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notifications ( // also serves to slow down event dispatching particularly with this // ACE_Dev_Poll_Reactor. -#if 0 - ACE_HANDLE read_handle = - this->notification_pipe_.read_handle (); - - // Note that we do not check if the handle has received any events. - // Instead a non-blocking "speculative" read is performed. If the - // read returns with errno == EWOULDBLOCK then no notifications are - // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe() - // for details. - if (read_handle != ACE_INVALID_HANDLE) - { - --number_of_active_handles; - - return this->handle_input (read_handle); - } - else - return 0; -#else ACE_NOTSUP_RETURN (-1); -#endif /* 0 */ } int @@ -286,26 +223,31 @@ ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, char b; read_p = &b; to_read = 1; - ACE_Notification_Buffer *temp; - // New scope to release the guard before trying the recv(). - { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + // Before reading the byte, pop a message from the queue and queue a + // new message unless the queue is now empty. The protocol is to + // keep a byte in the pipe as long as the queue is not empty. + bool more_messages_queued = false; + ACE_Notification_Buffer next; - if (notify_queue_.is_empty ()) + int result = notification_queue_.pop_next_notification( + buffer, more_messages_queued, next); + + if (result == 0) + { return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")), - -1); - buffer = *temp; - have_one = true; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("read_notify_pipe: enqueue_head"))); - } + } + + if (result == -1) + { + return -1; + } + + if(more_messages_queued) + { + (void) ACE::send(this->notification_pipe_.write_handle(), + (char *)&next, 1 /* one byte is enough */); + } #else to_read = sizeof buffer; @@ -475,83 +417,7 @@ ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications ( #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular - // handler), and it matches the specified handler (or purging - // all), and applying the mask would totally eliminate the - // notification, then release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing - // notification mask - // is left with - // nothing when - // applying the mask. - { - if (this->free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a - // particular handler), and it matches the specified handler - // (or purging all), then apply the mask. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { - // Should be empty! - ACE_ASSERT (0); - return -1; - } - - // Now put it back in the notify queue. - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; + return notification_queue_.purge_pending_notifications(eh, mask); #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ ACE_UNUSED_ARG (eh); diff --git a/ACE/ace/Dev_Poll_Reactor.h b/ACE/ace/Dev_Poll_Reactor.h index f8c5b58f1d2..f82283a70b7 100644 --- a/ACE/ace/Dev_Poll_Reactor.h +++ b/ACE/ace/Dev_Poll_Reactor.h @@ -53,7 +53,7 @@ #include "ace/Token.h" #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -# include "ace/Unbounded_Queue.h" +# include "ace/Notification_Queue.h" #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ #if defined (ACE_HAS_DEV_POLL) @@ -283,30 +283,17 @@ protected: #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) /** - * @name Reactor Notification Attributes + * @brief A user-space queue to store the notifications. * - * This configuration queues up notifications in separate buffers - * that are in user-space, rather than stored in a pipe in the OS - * kernel. The kernel-level notifications are used only to trigger - * the Reactor to check its notification queue. This enables many - * more notifications to be stored than would otherwise be the - * case. + * The notification pipe has OS-specific size restrictions. That + * is, no more than a certain number of bytes may be stored in the + * pipe without blocking. This limit may be too small for certain + * applications. In this case, ACE can be configured to store all + * the events in user-space. The pipe is still needed to wake up + * the reactor thread, but only one event is sent through the pipe + * at a time. */ - //@{ - - /// ACE_Notification_Buffers are allocated in chunks. Each time a chunk is - /// allocated, the chunk is added to alloc_queue_ so it can be freed later. - /// Each individual ACE_Notification_Buffer is added to the free_queue_ - /// when it's free. Those in use for queued notifications are placed on the - /// notify_queue_. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; - - /// Synchronization for handling of queues. - ACE_SYNCH_MUTEX notify_queue_lock_; - - //@} + ACE_Notification_Queue notification_queue_; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; diff --git a/ACE/ace/Intrusive_List.cpp b/ACE/ace/Intrusive_List.cpp index 4a374c9b045..c0006792ce4 100644 --- a/ACE/ace/Intrusive_List.cpp +++ b/ACE/ace/Intrusive_List.cpp @@ -69,7 +69,7 @@ ACE_Intrusive_List<T>::pop_front (void) T *node = this->head_; if (node == 0) return 0; - this->remove_i (node); + this->unsafe_remove (node); return node; } @@ -79,7 +79,7 @@ ACE_Intrusive_List<T>::pop_back (void) T *node = this->tail_; if (node == 0) return 0; - this->remove_i (node); + this->unsafe_remove (node); return node; } @@ -90,14 +90,14 @@ ACE_Intrusive_List<T>::remove (T *node) { if (node == i) { - this->remove_i (node); + this->unsafe_remove (node); return; } } } template<class T> void -ACE_Intrusive_List<T>::remove_i (T *node) +ACE_Intrusive_List<T>::unsafe_remove (T *node) { if (node->prev () != 0) node->prev ()->next (node->next ()); diff --git a/ACE/ace/Intrusive_List.h b/ACE/ace/Intrusive_List.h index 82789110cfa..59d0c9054a8 100644 --- a/ACE/ace/Intrusive_List.h +++ b/ACE/ace/Intrusive_List.h @@ -99,14 +99,17 @@ public: */ void remove (T *node); -private: - /// Remove a element from the list + /// Swap two lists + void swap(ACE_Intrusive_List<T> & rhs); + + /// Remove a element from the list without checking /** * No attempts are performed to check if T* really belongs to the * list. The effects of removing an invalid element are unspecified */ - void remove_i (T *node); + void unsafe_remove (T *node); +private: /** @name Disallow copying * */ diff --git a/ACE/ace/Intrusive_List.inl b/ACE/ace/Intrusive_List.inl index cc3ffc70109..f76370917e9 100644 --- a/ACE/ace/Intrusive_List.inl +++ b/ACE/ace/Intrusive_List.inl @@ -2,6 +2,8 @@ // // $Id$ +#include <algorithm> + ACE_BEGIN_VERSIONED_NAMESPACE_DECL template<class T> ACE_INLINE int @@ -28,4 +30,11 @@ ACE_Intrusive_List<T>::tail (void) const return this->tail_; } +template<class T> ACE_INLINE void +ACE_Intrusive_List<T>::swap(ACE_Intrusive_List<T> & rhs) +{ + std::swap(head_, rhs.head_); + std::swap(tail_, rhs.tail_); +} + ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/Notification_Queue.cpp b/ACE/ace/Notification_Queue.cpp new file mode 100644 index 00000000000..0b2ca59e831 --- /dev/null +++ b/ACE/ace/Notification_Queue.cpp @@ -0,0 +1,210 @@ +// $Id$ + +#include "ace/Notification_Queue.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Notification_Queue.inl" +#endif /* __ACE_INLINE__ */ + +ACE_Notification_Queue:: +ACE_Notification_Queue() + : ACE_Copy_Disabled() + , alloc_queue_() + , notify_queue_() + , free_queue_() +{ +} + +ACE_Notification_Queue:: +~ACE_Notification_Queue() +{ + reset(); +} + +int +ACE_Notification_Queue:: +open() +{ + ACE_TRACE ("ACE_Notification_Queue::open"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (!this->free_queue_.is_empty ()) + return 0; + + return allocate_more_buffers(); +} + +void +ACE_Notification_Queue:: +reset() +{ + ACE_TRACE ("ACE_Notification_Queue::reset"); + + // Free up the dynamically allocated resources. + ACE_Notification_Queue_Node **b = 0; + + for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + { + delete [] *b; + *b = 0; + } + + this->alloc_queue_.reset (); + + // Swap with an empty list to reset the contents + Buffer_List().swap(notify_queue_); + Buffer_List().swap(free_queue_); +} + +int ACE_Notification_Queue:: +allocate_more_buffers() +{ + ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers"); + + ACE_Notification_Queue_Node *temp = 0; + + ACE_NEW_RETURN (temp, + ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp) == -1) + { + delete [] temp; + return -1; + } + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) + { + free_queue_.push_front(temp + i); + } + + return 0; +} + +int +ACE_Notification_Queue:: +purge_pending_notifications(ACE_Event_Handler * eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (this->notify_queue_.is_empty ()) + return 0; + + int number_purged = 0; + ACE_Notification_Queue_Node * node = notify_queue_.head(); + while(node != 0) + { + if (!node->matches_for_purging(eh)) + { + // Easy case, skip to the next node + node = node->next(); + continue; + } + + if (!node->mask_disables_all_notifications(mask)) + { + // ... another easy case, skip this node too, but clear the + // mask first ... + node->clear_mask(mask); + node = node->next(); + continue; + } + + // ... this is the more complicated case, we want to remove the + // node from the notify_queue_ list. First save the next node + // on the list: + ACE_Notification_Queue_Node * next = node->next(); + + // ... then remove it ... + notify_queue_.unsafe_remove(node); + ++number_purged; + + // ... release resources ... + ACE_Event_Handler *event_handler = node->get().eh_; + event_handler->remove_reference (); + + // ... now this is a free node ... + free_queue_.push_front(node); + + // ... go to the next node, if there is one ... + node = next; + } + + return number_purged; +} + +int ACE_Notification_Queue:: +push_new_notification( + ACE_Notification_Buffer const & buffer) +{ + ACE_TRACE ("ACE_Notification_Queue::push_new_notification"); + + bool notification_required = false; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + // No pending notifications. + if (this->notify_queue_.is_empty ()) + notification_required = true; + + if (free_queue_.is_empty()) + { + if (allocate_more_buffers() == -1) + { + return -1; + } + } + + ACE_Notification_Queue_Node * node = + free_queue_.pop_front(); + + ACE_ASSERT (node != 0); + node->set(buffer); + + notify_queue_.push_back(node); + + if (!notification_required) + { + return 0; + } + + return 1; +} + +int +ACE_Notification_Queue::pop_next_notification( + ACE_Notification_Buffer & current, + bool & more_messages_queued, + ACE_Notification_Buffer & next) +{ + ACE_TRACE ("ACE_Notification_Queue::pop_next_notification"); + + more_messages_queued = false; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (notify_queue_.is_empty ()) + { + return 0; + } + + ACE_Notification_Queue_Node * node = + notify_queue_.pop_front(); + + current = node->get(); + free_queue_.push_front(node); + + if(!this->notify_queue_.is_empty()) + { + more_messages_queued = true; + next = notify_queue_.head()->get(); + } + + return 1; +} diff --git a/ACE/ace/Notification_Queue.h b/ACE/ace/Notification_Queue.h new file mode 100644 index 00000000000..a0348822060 --- /dev/null +++ b/ACE/ace/Notification_Queue.h @@ -0,0 +1,156 @@ +#ifndef ACE_NOTIFICATION_QUEUE_H +#define ACE_NOTIFICATION_QUEUE_H + +#include /**/ "ace/pre.h" + +/** + * @file Notification_Queue.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@atdesk.com> + */ +#include "ace/Copy_Disabled.h" +#include "ace/Event_Handler.h" +#include "ace/Intrusive_List.h" +#include "ace/Intrusive_List_Node.h" +#include "ace/Unbounded_Queue.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class ACE_Notification_Queue_Node + * + * @brief Helper class + */ +class ACE_Export ACE_Notification_Queue_Node + : public ACE_Intrusive_List_Node<ACE_Notification_Queue_Node> +{ +public: + /** + * @brief Constructor + */ + ACE_Notification_Queue_Node(); + + /** + * @brief Modifier change the contained buffer + */ + void set(ACE_Notification_Buffer const & rhs); + + /** + * @brief Accessor, fetch the contained buffer + */ + ACE_Notification_Buffer const & get() const; + + /** + * @brief Checks if the event handler matches the purge condition + */ + bool matches_for_purging(ACE_Event_Handler * eh) const; + + /** + * @brief Return true if clearing the mask would leave no + * notifications to deliver. + */ + bool mask_disables_all_notifications(ACE_Reactor_Mask mask); + + /** + * @brief Clear the notifications specified by @c mask + */ + void clear_mask(ACE_Reactor_Mask mask); + +private: + ACE_Notification_Buffer contents_; +}; + +/** + * @class ACE_Notification_Queue + * + * @brief Implements a user-space queue to send Reactor notifications. + * + * The ACE_Reactor uses a pipe to send wake up the thread running the + * event loop from other threads. This pipe can be limited in size + * under some operating systems. For some applications, this limit + * presents a problem. A user-space notification queue is used to + * overcome those limitations. The queue tries to use as few + * resources on the pipe as possible, while keeping all the data in + * user space. + * + * This code was refactored from Select_Reactor_Base. + */ +class ACE_Export ACE_Notification_Queue : private ACE_Copy_Disabled +{ +public: + ACE_Notification_Queue(); + ~ACE_Notification_Queue(); + + /** + * @brief Pre-allocate resources in the queue + */ + int open(); + + /** + * @brief Release all resources in the queue + */ + void reset(); + + /** + * @brief Remove all elements in the queue matching @c eh and @c mask + * + * I suggest reading the documentation in ACE_Reactor to find a more + * detailed description. This is just a helper function. + */ + int purge_pending_notifications(ACE_Event_Handler * eh, + ACE_Reactor_Mask mask); + + /** + * @brief Add a new notification to the queue + * + * @return -1 on failure, 1 if a new message should be sent through + * the pipe and 0 otherwise. + */ + int push_new_notification(ACE_Notification_Buffer const & buffer); + + /** + * @brief Extract the next notification from the queue + * + * @return -1 on failure, 1 if a message was popped, 0 otherwise + */ + int pop_next_notification( + ACE_Notification_Buffer & current, + bool & more_messages_queued, + ACE_Notification_Buffer & next); + +private: + /** + * @brief Allocate more memory for the queue + */ + int allocate_more_buffers(); + +private: + /// Keeps track of allocated arrays of type + /// ACE_Notification_Buffer. The idea is to amortize allocation + /// costs by allocating multiple ACE_Notification_Buffer objects at + /// a time. + ACE_Unbounded_Queue <ACE_Notification_Queue_Node*> alloc_queue_; + + typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> Buffer_List; + + /// Keeps track of all pending notifications. + Buffer_List notify_queue_; + + /// Keeps track of all free buffers. + Buffer_List free_queue_; + + /// Synchronization for handling of queues. + ACE_SYNCH_MUTEX notify_queue_lock_; +}; + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "ace/Notification_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* ACE_NOTIFICATION_QUEUE_H */ diff --git a/ACE/ace/Notification_Queue.inl b/ACE/ace/Notification_Queue.inl new file mode 100644 index 00000000000..b7ef09a820f --- /dev/null +++ b/ACE/ace/Notification_Queue.inl @@ -0,0 +1,47 @@ +// $Id$ + +ACE_INLINE ACE_Notification_Queue_Node:: +ACE_Notification_Queue_Node() + : ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>() + , contents_(0, 0) +{ +} + +ACE_INLINE void +ACE_Notification_Queue_Node:: +set(ACE_Notification_Buffer const & rhs) +{ + contents_ = rhs; +} + +ACE_INLINE ACE_Notification_Buffer const & +ACE_Notification_Queue_Node:: +get() const +{ + return contents_; +} + +ACE_INLINE bool +ACE_Notification_Queue_Node:: +matches_for_purging(ACE_Event_Handler * eh) const +{ + return (0 != get().eh_) && (0 == eh || eh == get().eh_); +} + +ACE_INLINE bool +ACE_Notification_Queue_Node:: +mask_disables_all_notifications(ACE_Reactor_Mask mask) +{ + // the existing notification mask is left with nothing when applying + // the mask + return ACE_BIT_DISABLED (get().mask_, ~mask); +} + +ACE_INLINE void +ACE_Notification_Queue_Node:: +clear_mask(ACE_Reactor_Mask mask) +{ + ACE_CLR_BITS(contents_.mask_, mask); +} + + diff --git a/ACE/ace/Select_Reactor_Base.cpp b/ACE/ace/Select_Reactor_Base.cpp index 94fa673b84e..0306fc7007a 100644 --- a/ACE/ace/Select_Reactor_Base.cpp +++ b/ACE/ace/Select_Reactor_Base.cpp @@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp = 0; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular handler), - // and it matches the specified handler (or purging all), - // and applying the mask would totally eliminate the notification, then - // release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask - // is left with nothing when - // applying the mask - { - if (-1 == this->free_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - ACE_Event_Handler *event_handler = temp->eh_; - event_handler->remove_reference (); - - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a particularhandler), - // and it matches the specified handler (or purging all), then - // apply the mask - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { // should be empty! - ACE_ASSERT (0); - return -1; - } - - // now put it back in the notify queue - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; + return notification_queue_.purge_pending_notifications(eh, mask); #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ ACE_UNUSED_ARG (eh); @@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, #endif /* F_SETFD */ #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp = 0; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - { - delete [] temp; - return -1; - } - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; - + if (notification_queue_.open() == -1) + { + return -1; + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // There seems to be a Win32 bug with this... Set this into @@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void) ACE_TRACE ("ACE_Select_Reactor_Notify::close"); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b = 0; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); + notification_queue_.reset(); #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return this->notification_pipe_.close (); @@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler, ACE_Notification_Buffer buffer (event_handler, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Artificial scope to limit the duration of the mutex. - { - bool notification_required = false; + int notification_required = + notification_queue_.push_new_notification(buffer); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // No pending notifications. - if (this->notify_queue_.is_empty ()) - notification_required = true; - - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1 = 0; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - ++i) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } - - ACE_ASSERT (temp != 0); - *temp = buffer; - - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; + if (notification_required == -1) + { + return -1; + } - if (!notification_required) - { - // No failures. - safe_handler.release (); + if (notification_required == 0) + { + // No failures, the handler is now owned by the notification queue + safe_handler.release (); - return 0; - } + return 0; } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ @@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // Dispatch one message from the notify queue, and put another in // the pipe if one is available. Remember, the idea is to keep // exactly one message in the pipe at a time. - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - ACE_Notification_Buffer *temp = 0; + bool more_messages_queued = false; + ACE_Notification_Buffer next; + + result = notification_queue_.pop_next_notification( + buffer, more_messages_queued, next); - if (notify_queue_.is_empty ()) + if (result == 0) + { return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - bool write_next_buffer = false; - ACE_Notification_Buffer ** next = 0; - - if(!this->notify_queue_.is_empty()) - { - // The queue is not empty, need to queue another message. - this->notify_queue_.get (next, 0); - write_next_buffer = true; - } - - if(write_next_buffer) - { - (void) ACE::send( - this->notification_pipe_.write_handle(), - (char *)*next, sizeof(ACE_Notification_Buffer)); - } - } + } + if (result == -1) + { + return -1; + } + + if(more_messages_queued) + { + (void) ACE::send(this->notification_pipe_.write_handle(), + (char *)&next, sizeof(ACE_Notification_Buffer)); + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // If eh == 0 then another thread is unblocking the diff --git a/ACE/ace/Select_Reactor_Base.h b/ACE/ace/Select_Reactor_Base.h index 9ec4e642210..bb12154ca8c 100644 --- a/ACE/ace/Select_Reactor_Base.h +++ b/ACE/ace/Select_Reactor_Base.h @@ -27,7 +27,7 @@ #include "ace/Reactor_Impl.h" #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -# include "ace/Unbounded_Queue.h" +# include "ace/Notification_Queue.h" #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ #ifdef ACE_WIN32 @@ -253,24 +253,18 @@ protected: int max_notify_iterations_; #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // = This configuration queues up notifications in separate buffers that - // are in user-space, rather than stored in a pipe in the OS - // kernel. The kernel-level notifications are used only to trigger - // the Reactor to check its notification queue. This enables many - // more notifications to be stored than would otherwise be the case. - - /// Keeps track of allocated arrays of type - /// ACE_Notification_Buffer. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; - - /// Keeps track of all pending notifications. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; - - /// Keeps track of all free buffers. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; - - /// Synchronization for handling of queues. - ACE_SYNCH_MUTEX notify_queue_lock_; + /** + * @brief A user-space queue to store the notifications. + * + * The notification pipe has OS-specific size restrictions. That + * is, no more than a certain number of bytes may be stored in the + * pipe without blocking. This limit may be too small for certain + * applications. In this case, ACE can be configured to store all + * the events in user-space. The pipe is still needed to wake up + * the reactor thread, but only one event is sent through the pipe + * at a time. + */ + ACE_Notification_Queue notification_queue_; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc index 6f67e730e44..d91138a1ae9 100644 --- a/ACE/ace/ace.mpc +++ b/ACE/ace/ace.mpc @@ -125,6 +125,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca Mutex.cpp Netlink_Addr.cpp Notification_Strategy.cpp + Notification_Queue.cpp Obchunk.cpp Object_Manager.cpp Object_Manager_Base.cpp diff --git a/ACE/ace/ace_for_tao.mpc b/ACE/ace/ace_for_tao.mpc index 85798f0692f..76737dc380c 100644 --- a/ACE/ace/ace_for_tao.mpc +++ b/ACE/ace/ace_for_tao.mpc @@ -87,6 +87,7 @@ project(ACE_FOR_TAO) : acedefaults, install, svcconf, uuid, versioned_namespace, MMAP_Memory_Pool.cpp Mutex.cpp Notification_Strategy.cpp + Notification_Queue.cpp Obchunk.cpp Object_Manager.cpp Object_Manager_Base.cpp diff --git a/ACE/tests/Bug_2815_Regression_Test.cpp b/ACE/tests/Bug_2815_Regression_Test.cpp new file mode 100644 index 00000000000..4c4eb0a3630 --- /dev/null +++ b/ACE/tests/Bug_2815_Regression_Test.cpp @@ -0,0 +1,499 @@ +/** + * @file Bug_2815_Regression_Test.cpp + * + * $Id$ + * + * Verify that the notification queue can be used with large numbers + * of event handlers. + * + * Normally the ACE_Reactor uses a pipe to implement the notify() + * methods. ACE can be compiled with + * ACE_HAS_REACTOR_NOTIFICATION_QUEUE, with this configuration flag + * the Reactor uses a user-space queue to contain the notifications. + * A single message is sent through the pipe to indicate "pipe not + * empty." + * + * In this configuration, if an event handler is removed + * from the Reactor the user-space queue has to be searched for + * pending notifications and the notifications must be removed. + * + * The original implementation used a naive algorithm to search and + * remove the handlers, which resulted in very high overhead when + * removing handlers while having a very long notification queue. + * + * @author Carlos O'Ryan <coryan@atdesk.com> + * + */ + +#include "test_config.h" +#include "ace/Reactor.h" +#include "ace/TP_Reactor.h" +#include "ace/Select_Reactor.h" + +ACE_RCSID(tests, + Bug_2815_Regression_Test, "$Id$") + +class One_Shot_Handler; + +/** + * @class Driver + * + * @brief Main driver for the test, generates notification events and + * verifies they are received correctly. + * + */ +class Driver +{ +public: + Driver(ACE_Reactor * reactor, + int max_notifications, + char const *test_name); + + /// Run the test + void run (void); + + /// One of the sub-handlers has received a notification + void notification_received (); + + /// One of the sub-handlers has decided to skip several notifications + void notifications_skipped (int skip_count); + + /** + * @brief Return the reactor configured for this test + */ + ACE_Reactor * reactor (); + +private: + /** + * @brief Implement a single iteration. + * + * Each iteration of the test consists of sending multiple + * notifications simultaneously. + */ + void send_notifications (void); + + /** + * @brief Return true if the test is finished. + */ + bool done (void) const; + + /** + * @brief Return true if there are more iterations to run. + */ + bool more_iterations () const; + + /** + * @brief Return true if the current iteration is completed. + */ + bool current_iteration_done () const; + + /** + * @brief Run one iteration of the test, each iteration doubles + * the number of events. + */ + int run_one_iteration (void); + + /** + * @brief Initialize a bunch of One_Shot_Handlers + */ + void initialize_handlers( + int nhandlers, One_Shot_Handler ** handlers); + + /** + * @brief Dispatch events to the One_Shot_Handlers + */ + void notify_handlers( + int nhandlers, One_Shot_Handler ** handlers); + + /** + * @brief Helpful for debugging + * + * The number of notifications received, skipped and sent are + * subject to simple invariants. During debugging any violation of + * those invariants indicates a problem in the application or the + * Reactor. + */ + void check_notification_invariants(); + + /// A good place to set break points. + void invariant_failed(); + +private: + /** + * @brief The reactor used in this test + */ + ACE_Reactor * reactor_; + + /** + * @brief The maximum number of notifications in any single + * iteration. + */ + int max_notifications_; + + /** + * @brief The name of the test + */ + char const * test_name_; + + /** + * @brief Number of notifications received + */ + int notifications_sent_; + + /** + * @brief Number of notifications sent + */ + int notifications_recv_; + + /** + * @brief Number of notifications skipped because + * the handler was removed + */ + int notifications_skipped_; + + /** + * @brief Number of notifications sent on each iteration + */ + int notifications_curr_; +}; + +/** + * @class One_Shot_Handler + * + * @brief A handler that removes itself from the reactor + * after its first notification. + * + * To demonstrate the problems with the first implementation of + * the notification queue we generate multiple event handlers. + * Then we generate multiple notifications for each, but the handlers + * remove themselves from the reactor when the first notification + * is delivered. This causes a lot of activity in the notification + * queue. + * + */ +class One_Shot_Handler : public ACE_Event_Handler +{ +public: + One_Shot_Handler( + Driver * master_handler, + char const * test_name, + int id); + + /// Increase the number of expected notifications + void notification_queued(); + + /// Receive the notifications, but remove itself from the reactor on + /// on the first one. + virtual int handle_exception(ACE_HANDLE); + +private: + /// The driver for this test, communicate results to it + Driver * master_handler_; + + /// The number of expected notifications + int expected_notifications_; + + /// Identify the test and handler for debugging and better error output + char const * test_name_; + int id_; +}; + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Bug_2815_Regression_Test")); + +#if !defined(ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Notification queue disabled, ") + ACE_TEXT ("small test version, ") + ACE_TEXT ("which is of no practical use\n"))); + + int max_notifications = 16; +#else + int max_notifications = 512 * 1024; +#endif /* ACE_HAS_THREADS */ + + { + ACE_Reactor select_reactor ( + new ACE_Select_Reactor, + 1); + + Driver handler(&select_reactor, + max_notifications, + "Select_Reactor"); + + handler.run (); + } + + { + ACE_Reactor tp_reactor (new ACE_TP_Reactor, + 1); + Driver handler(&tp_reactor, + max_notifications, + "TP_Reactor"); + handler.run(); + } + + ACE_END_TEST; + + return 0; +} + +Driver::Driver ( + ACE_Reactor * reactor, + int max_notifications, + char const * test_name) + : reactor_(reactor) + , max_notifications_(max_notifications) + , test_name_(test_name) + , notifications_sent_(0) + , notifications_recv_(0) + , notifications_skipped_(0) + , notifications_curr_(1) +{ +} + +void +Driver::run (void) +{ + while(more_iterations()) + { + if(run_one_iteration() == -1) + { + return; + } + + notifications_curr_ *= 2; + } + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Test %C passed sent=%d, recv=%d, skip=%d\n"), + test_name_, + notifications_sent_, + notifications_recv_, + notifications_skipped_)); +} + +void +Driver::notification_received () +{ + ++notifications_recv_; + check_notification_invariants(); +} + +void +Driver::notifications_skipped (int skip_count) +{ + notifications_skipped_ += skip_count; + check_notification_invariants(); +} + +ACE_Reactor * +Driver::reactor() +{ + return reactor_; +} + +void +Driver::send_notifications (void) +{ + int const nhandlers = 16; + One_Shot_Handler * handlers[nhandlers]; + initialize_handlers(nhandlers, handlers); + + for (int i = 0; i != notifications_curr_; ++i) + { + notify_handlers(nhandlers, handlers); + } +} + +bool +Driver::done (void) const +{ + return !more_iterations() && current_iteration_done(); +} + +bool +Driver::more_iterations() const +{ + return notifications_curr_ < max_notifications_; +} + +bool +Driver::current_iteration_done() const +{ + return notifications_sent_ == (notifications_recv_ + notifications_skipped_); +} + +int +Driver::run_one_iteration (void) +{ + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Running iteration with %d events for %C test\n"), + notifications_curr_, + test_name_)); + + send_notifications (); + + // Run for 30 seconds or until the test is done. + + ACE_Time_Value const timeout(30,0); + + while (!current_iteration_done()) + { + ACE_Time_Value start = ACE_OS::gettimeofday(); + ACE_Time_Value interval(1,0); + reactor()->run_reactor_event_loop(interval); + ACE_Time_Value end = ACE_OS::gettimeofday(); + + if (end - start >= timeout) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Test %C failed due to timeout ") + ACE_TEXT (" sent=%d,recv=%d,skip=%d\n"), + test_name_, + notifications_sent_, + notifications_recv_, + notifications_skipped_)); + return -1; + } + } + + return 0; +} + +void +Driver::initialize_handlers( + int nhandlers, One_Shot_Handler ** handlers) +{ + for (int j = 0; j != nhandlers; ++j) + { + handlers[j] = new One_Shot_Handler(this, test_name_, j); + } +} + +void +Driver::notify_handlers( + int nhandlers, One_Shot_Handler ** handlers) +{ + for(int i = 0; i != nhandlers; ++i) + { + if(reactor()->notify (handlers[i]) == -1) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("Cannot send notifications in %C test (%d/%d)\n"), + test_name_, i, notifications_curr_)); + return; + } + handlers[i]->notification_queued(); + + ++notifications_sent_; + } +} + +void Driver:: +check_notification_invariants() +{ + if (notifications_sent_ < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("The number of notifications sent (%d)") + ACE_TEXT(" should be positive\n"), + notifications_sent_)); + invariant_failed(); + } + + if (notifications_recv_ < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("The number of notifications received (%d)") + ACE_TEXT(" should be positive\n"), + notifications_recv_)); + invariant_failed(); + } + + if (notifications_skipped_ < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("The number of notifications skipped (%d)") + ACE_TEXT(" should be positive\n"), + notifications_skipped_)); + invariant_failed(); + } + + if (notifications_sent_ < notifications_recv_) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("Too many notifications received (%d)") + ACE_TEXT(" vs sent (%d)\n"), + notifications_recv_, notifications_sent_)); + invariant_failed(); + } + + if (notifications_sent_ < notifications_skipped_) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("Too many notifications skipped (%d)") + ACE_TEXT(" vs sent (%d)\n"), + notifications_skipped_, notifications_sent_)); + invariant_failed(); + } + + if (notifications_skipped_ + notifications_recv_ > notifications_sent_) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("Too many notifications skipped (%d)") + ACE_TEXT(" and received (%d) vs sent (%d)\n"), + notifications_skipped_, notifications_recv_, + notifications_sent_)); + invariant_failed(); + } +} + +void Driver:: +invariant_failed() +{ + // Just a good place to set a breakpoint +} + +// ============================================ + +One_Shot_Handler::One_Shot_Handler( + Driver * master_handler, + char const * test_name, int id) + : ACE_Event_Handler(master_handler->reactor()) + , master_handler_(master_handler) + , expected_notifications_(0) + , test_name_(test_name) + , id_(id) +{ +} + +void One_Shot_Handler:: +notification_queued() +{ + ++expected_notifications_; +} + +int One_Shot_Handler:: +handle_exception(ACE_HANDLE) +{ + --expected_notifications_; + master_handler_->notification_received(); + + int r = reactor()->purge_pending_notifications(this); + if (r >= 0) + { + master_handler_->notifications_skipped(r); + delete this; + return 0; + } + + ACE_ERROR((LM_ERROR, + ACE_TEXT ("Cannot remove handler %d in %C test\n"), + id_, test_name_)); + + delete this; + return 0; +} diff --git a/ACE/tests/Notification_Queue_Unit_Test.cpp b/ACE/tests/Notification_Queue_Unit_Test.cpp new file mode 100644 index 00000000000..cc138e3fc03 --- /dev/null +++ b/ACE/tests/Notification_Queue_Unit_Test.cpp @@ -0,0 +1,256 @@ +/** + * @file Notification_Queue_Unit_Test.cpp + * + * $Id$ + * + * A unit test for the ACE_Notification_Queue class. + * + * @author Carlos O'Ryan <coryan@atdesk.com> + * + */ + +#include "test_config.h" +#include "ace/Notification_Queue.h" + +ACE_RCSID(tests, + Notification_Queue_Unit_Test, "$Id$") + +#define TEST_LIST \ + ACTION(null_test) \ + ACTION(pop_returns_element_pushed) \ + ACTION(purge_empty_queue) \ + ACTION(purge_with_no_matches) \ + ACTION(purge_with_single_match) \ + ACTION(purge_with_multiple_matches) \ + +// Declare all the tests +#define ACTION(TEST_NAME) void TEST_NAME (char const * test_name); +TEST_LIST +#undef ACTION + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Notification_Queue_Unit_Test")); + + // Call all the tests +#define ACTION(TEST_NAME) TEST_NAME (#TEST_NAME); +TEST_LIST +#undef ACTION + + ACE_END_TEST; + + return 0; +} + +// There are far more elegant ways to do this. Ideally one would use +// an existing framework (Boost.Test, TUT, CppTest). But this will +// do for our purposes +#define TEST_INTEGER_EQUAL(X, Y, MSG) \ + do { \ + if ((X) == (Y)) break; \ + ACE_ERROR ((LM_ERROR, \ + ACE_TEXT("%C in (%C %N:%l) %C (%d) != %C (%d)\n"), \ + ACE_TEXT(MSG), test_name, \ + ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \ + } while(0) +#define TEST_INTEGER_NOT_EQUAL(X, Y, MSG) \ + do { \ + if ((X) != (Y)) break; \ + ACE_ERROR ((LM_ERROR, \ + ACE_TEXT("%C in (%C %N:%l) %C (%d) == %C (%d)\n"), \ + ACE_TEXT(MSG), test_name, \ + ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \ + } while(0) +#define TEST_ASSERT(PREDICATE, MESSAGE) \ + do { \ + if ((PREDICATE)) break; \ + ACE_ERROR ((LM_ERROR, \ + ACE_TEXT("Assertion failure in (%C %N:%l) %C %C\n"), \ + test_name, ACE_TEXT(#PREDICATE), MESSAGE )); \ + } while(0) + + +void null_test(char const * test_name) +{ + ACE_Notification_Queue queue; + + TEST_INTEGER_EQUAL(0, 0, "Test framework failure"); + TEST_INTEGER_NOT_EQUAL(1, 0, "Test framework failure"); + TEST_ASSERT(true, ACE_TEXT("True is still true")); +} + +class Event_Handler : public ACE_Event_Handler +{ +public: + Event_Handler(int event_handler_id) + : ACE_Event_Handler() + , id (event_handler_id) + { + } + + int id; +}; + +void pop_returns_element_pushed(char const * test_name) +{ + ACE_Notification_Queue queue; + + Event_Handler eh1(1); + Event_Handler eh2(2); + Event_Handler eh3(2); + + int result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::READ_MASK)); + TEST_ASSERT(result == 1, "push[1] should return 1"); + + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + TEST_ASSERT(result == 0, "push[2] should return 0"); + + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh3, + ACE_Event_Handler::READ_MASK + |ACE_Event_Handler::WRITE_MASK)); + TEST_ASSERT(result == 0, "push[3] should return 0"); + + ACE_Notification_Buffer current; + bool more_messages_queued; + ACE_Notification_Buffer next; + + result = queue.pop_next_notification(current, more_messages_queued, next); + TEST_ASSERT(result == 1, "pop[0] should return 1"); + TEST_ASSERT(more_messages_queued, "pop[0] should have more messages"); + + TEST_INTEGER_EQUAL(current.eh_, &eh1, "Wrong handler extracted"); + TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK, + "Wrong mask extracted"); + + result = queue.pop_next_notification(current, more_messages_queued, next); + TEST_ASSERT(result == 1, "pop[1] should return 1"); + TEST_ASSERT(more_messages_queued, "pop[1] should have more messages"); + + TEST_INTEGER_EQUAL(current.eh_, &eh2, "Wrong handler extracted"); + TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::WRITE_MASK, + "Wrong mask extracted"); + + result = queue.pop_next_notification(current, more_messages_queued, next); + TEST_ASSERT(result == 1, "pop[2] should return 1"); + TEST_ASSERT(!more_messages_queued, "pop[2] should not have more messages"); + + TEST_INTEGER_EQUAL(current.eh_, &eh3, "Wrong handler extracted"); + TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK + |ACE_Event_Handler::WRITE_MASK, + "Wrong mask extracted"); + + more_messages_queued = true; + result = queue.pop_next_notification(current, more_messages_queued, next); + TEST_ASSERT(result == 0, "pop[3] should return 0"); + TEST_ASSERT(!more_messages_queued, "pop[3] should not have more messages"); +} + +void purge_empty_queue(char const * test_name) +{ + ACE_Notification_Queue queue; + + Event_Handler eh1(1); + + int result = queue.purge_pending_notifications(&eh1, + ACE_Event_Handler::READ_MASK); + TEST_ASSERT(result == 0, "purge of empty queue should return 0"); +} + +void purge_with_no_matches(char const * test_name) +{ + ACE_Notification_Queue queue; + + Event_Handler eh1(1); + Event_Handler eh2(2); + + int result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::READ_MASK)); + + result = queue.purge_pending_notifications(&eh2, + ACE_Event_Handler::READ_MASK); + TEST_ASSERT(result == 0, "purge of eh2 should return 0"); + + result = queue.purge_pending_notifications(&eh1, + ACE_Event_Handler::WRITE_MASK); + TEST_ASSERT(result == 0, "purge of eh1/WRITE should return 0"); +} + +void purge_with_single_match(char const * test_name) +{ + ACE_Notification_Queue queue; + + Event_Handler eh1(1); + Event_Handler eh2(2); + + int result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::READ_MASK + |ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::READ_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + + result = queue.purge_pending_notifications(&eh2, + ACE_Event_Handler::READ_MASK); + TEST_INTEGER_EQUAL(result, 1, "purge of eh2/READ should return 1"); + + result = queue.purge_pending_notifications(&eh1, + ACE_Event_Handler::READ_MASK); + TEST_INTEGER_EQUAL(result, 0, "purge of eh1/READ should return 0"); +} + +void purge_with_multiple_matches(char const * test_name) +{ + ACE_Notification_Queue queue; + + Event_Handler eh1(1); + Event_Handler eh2(2); + + int result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::READ_MASK + |ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh1, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::READ_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + result = queue.push_new_notification( + ACE_Notification_Buffer(&eh2, + ACE_Event_Handler::WRITE_MASK)); + + result = queue.purge_pending_notifications(&eh2, + ACE_Event_Handler::WRITE_MASK); + TEST_INTEGER_EQUAL(result, 3, "purge of eh2/WRITE should return 3"); + + result = queue.purge_pending_notifications(&eh1, + ACE_Event_Handler::WRITE_MASK); + TEST_INTEGER_EQUAL(result, 1, "purge of eh1/WRITE should return 1"); +} + diff --git a/ACE/tests/run_test.lst b/ACE/tests/run_test.lst index e2639f4c3d4..4d0b911b48b 100644 --- a/ACE/tests/run_test.lst +++ b/ACE/tests/run_test.lst @@ -36,6 +36,7 @@ Bug_2497_Regression_Test Bug_2540_Regression_Test Bug_2659_Regression_Test: !ST Bug_2653_Regression_Test: !ST +Bug_2815_Regression_Test CDR_Array_Test: !ACE_FOR_TAO CDR_File_Test: !ACE_FOR_TAO CDR_Test @@ -96,6 +97,7 @@ Naming_Test: !LynxOS !Unicos !VxWorks !nsk !ACE_FOR_TAO Network_Adapters_Test: !DISABLE_ToFix_LynxOS_PPC !DISABLE_ToFix_LynxOS_x86 New_Fail_Test: ALL !DISABLED NonBlocking_Conn_Test +Notification_Queue_Unit_Test Notify_Performance_Test: !nsk !ACE_FOR_TAO OS_Test Object_Manager_Test diff --git a/ACE/tests/tests.mpc b/ACE/tests/tests.mpc index c79b0d36eaf..e9b4437b94a 100644 --- a/ACE/tests/tests.mpc +++ b/ACE/tests/tests.mpc @@ -234,6 +234,13 @@ project(Bug_2653_Regression_Test) : acetest { } } +project(Bug_2815_Regression_Test) : acetest { + exename = Bug_2815_Regression_Test + Source_Files { + Bug_2815_Regression_Test.cpp + } +} + project(Bug_2820_Regression_Test) : acetest { exename = Bug_2820_Regression_Test Source_Files { @@ -631,6 +638,13 @@ project(New Fail Test) : acetest { } } +project(Notification Queue Unit Test) : acetest { + exename = Notification_Queue_Unit_Test + Source_Files { + Notification_Queue_Unit_Test.cpp + } +} + project(Notify Performance Test) : acetest { avoids += ace_for_tao exename = Notify_Performance_Test |