summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-22 13:45:54 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-22 13:45:54 +0000
commita4ce5d5c13ec3e7c2f3455304e53f6af92b34870 (patch)
tree2128a629e7a519bc526326d980ddd72f90aed92d
parent50a5c517acdb0fba8221a0d273d1206bbfcb8f64 (diff)
downloadATCD-a4ce5d5c13ec3e7c2f3455304e53f6af92b34870.tar.gz
Thu Feb 22 13:03:01 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
-rw-r--r--ACE/ChangeLog110
-rw-r--r--ACE/ace/Dev_Poll_Reactor.cpp216
-rw-r--r--ACE/ace/Dev_Poll_Reactor.h33
-rw-r--r--ACE/ace/Intrusive_List.cpp8
-rw-r--r--ACE/ace/Intrusive_List.h9
-rw-r--r--ACE/ace/Intrusive_List.inl9
-rw-r--r--ACE/ace/Notification_Queue.cpp210
-rw-r--r--ACE/ace/Notification_Queue.h156
-rw-r--r--ACE/ace/Notification_Queue.inl47
-rw-r--r--ACE/ace/Select_Reactor_Base.cpp226
-rw-r--r--ACE/ace/Select_Reactor_Base.h32
-rw-r--r--ACE/ace/ace.mpc1
-rw-r--r--ACE/ace/ace_for_tao.mpc1
-rw-r--r--ACE/tests/Bug_2815_Regression_Test.cpp499
-rw-r--r--ACE/tests/Notification_Queue_Unit_Test.cpp256
-rw-r--r--ACE/tests/run_test.lst2
-rw-r--r--ACE/tests/tests.mpc14
17 files changed, 1414 insertions, 415 deletions
diff --git a/ACE/ChangeLog b/ACE/ChangeLog
index 81556fa4559..4afaa129d2a 100644
--- a/ACE/ChangeLog
+++ b/ACE/ChangeLog
@@ -1,3 +1,113 @@
+Thu Feb 22 13:03:01 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * Merged changes from the bug_2815 branch. From revision 77182 to
+ revision 77315.
+
+ Tue Feb 20 18:17:00 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * ace/ace_for_tao.mpc:
+ Add Notification_Queue.cpp to this file too. Thanks to Johnny
+ Willemsen for pointing this out.
+
+ Mon Feb 19 21:53:58 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * ace/Intrusive_List.inl:
+ I learned a minute ago that std::swap() was kosher. Use it in
+ favor of ACE_Swap<>.
+
+ Mon Feb 19 21:47:48 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * tests/run_test.lst:
+ * tests/Bug_2815_Regression_Test.cpp:
+ Adjusted the number of iterations so the test would pass on my
+ G4-based laptop without optimizations or inlining. The previous
+ numbers were just guesses anyway and I hope the new numbers will
+ result in no failures for the scoreboard.
+
+ Mon Feb 19 21:29:32 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * ace/Notification_Queue.cpp:
+ Modified the purging algorithm. The algorithm used a temporary
+ list to store the elements not purged, so basically all nodes
+ were either moved to the free list or to the temporary list.
+ The new algorithm only removes the nodes that needs purging.
+ In the vast majority of the cases this is more efficient because
+ most nodes are not purged.
+ This resulted in a factor of 2 improvement for
+ tests/Bug_2815_Regression_Test, keep in mind that in this
+ program 1/16th of the nodes are purged. So in practice I would
+ expect even better results.
+ I also changed the order in which the free nodes are re-used, I
+ think LIFO order has a better chance of re-using the cache, but
+ I have no evidence or experiments to prove this.
+
+ * ace/Intrusive_List.h:
+ * ace/Intrusive_List.cpp:
+ Renamed the remove_i() function to unsafe_remove() and promoted
+ its access to "public". The function is not safe to use in
+ general, thus the name, but it resulted in a factor of 2
+ performance improvement for ACE_Notification_Queue.
+
+ * tests/Bug_2815_Regression_Test.cpp:
+ Fixed memory leaks (in the test not the library)
+
+ Mon Feb 19 20:32:34 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * ace/Notification_Queue.h:
+ * ace/Notification_Queue.inl:
+ * ace/Notification_Queue.cpp:
+ Re-factored to use ACE_Intrusive_List in the implementation.
+ This eliminates memory allocations during additions and removal
+ of elements. Also, many of the operations cannot fail, so the
+ code became smaller.
+
+ * ace/Intrusive_List.h:
+ * ace/Intrusive_List.inl:
+ Add a swap() member function.
+
+ Mon Feb 19 04:49:35 UTC 2007 coryan <coryan@atdesk.com>
+
+ * ace/Notification_Queue.h:
+ * ace/Notification_Queue.cpp:
+ * ace/ace.mpc:
+ New class to encapsulate the implementation of a user-space
+ based notification queue. The code was duplicated in both
+ Select_Reactor.{h,cpp} and Dev_Poll_Reactor.{h,cpp}
+
+ * tests/tests.mpc:
+ * tests/run_test.lst:
+ * tests/Notification_Queue_Unit_Test.cpp:
+ New unit test for the notification queue class.
+
+ * ace/Dev_Poll_Reactor.h:
+ * ace/Dev_Poll_Reactor.cpp:
+ * ace/Select_Reactor_Base.h:
+ * ace/Select_Reactor_Base.cpp:
+ Refactored notification queue code to a new class
+ (ACE_Notification_Queue)
+
+ * tests/Bug_2815_Regression_Test.cpp:
+ Add code to help with debugging. Basically my refactoring above
+ had at least one bug, and this test uncovered it. But it was
+ hard to debug because there was no single breakpoint to detect
+ when the failure condition was triggered.
+
+ * tests:
+ Add new files to svn:ignore property. Also added some old
+ files.
+
+ * tests/SSL:
+ * include/makeinclude:
+ Add missing files to svn:ignore
+
+ Sun Feb 18 21:13:41 UTC 2007 coryan <coryan@atdesk.com>
+
+ * tests/tests.mpc:
+ * tests/Bug_2815_Regression_Test.cpp:
+ Add new regression test for bug 2815. I have not added the test
+ to the automated test suite because (1) it fails, and (2) it is
+ a test to reproduce performance
+
Thu Feb 22 12:58:42 UTC 2007 Johnny Willemsen <jwillemsen@remedy.nl>
* include/makeinclude/compiler.bor:
diff --git a/ACE/ace/Dev_Poll_Reactor.cpp b/ACE/ace/Dev_Poll_Reactor.cpp
index 8bf63534735..abf3a432c31 100644
--- a/ACE/ace/Dev_Poll_Reactor.cpp
+++ b/ACE/ace/Dev_Poll_Reactor.cpp
@@ -50,10 +50,7 @@ ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
, notification_pipe_ ()
, max_notify_iterations_ (-1)
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- , alloc_queue_ ()
- , notify_queue_ ()
- , free_queue_ ()
- , notify_queue_lock_ ()
+ , notification_queue_ ()
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
{
}
@@ -85,18 +82,10 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- return -1;
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
if (ACE::set_flags (this->notification_pipe_.write_handle (),
ACE_NONBLOCK) == -1)
@@ -120,20 +109,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -154,42 +130,22 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Notification_Buffer buffer (eh, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // Locate a free buffer in the queue. Enlarge the queue if needed.
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- return -1;
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_ASSERT (temp != 0);
- *temp = buffer;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- ACE_Dev_Poll_Handler_Guard eh_guard (eh);
+ if (notification_required == 0)
+ {
+ eh_guard.release ();
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ return 0;
+ }
// Now pop the pipe to force the callback for dispatching when ready. If
// the send fails due to a full pipe, don't fail - assume the already-sent
@@ -239,26 +195,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
// also serves to slow down event dispatching particularly with this
// ACE_Dev_Poll_Reactor.
-#if 0
- ACE_HANDLE read_handle =
- this->notification_pipe_.read_handle ();
-
- // Note that we do not check if the handle has received any events.
- // Instead a non-blocking "speculative" read is performed. If the
- // read returns with errno == EWOULDBLOCK then no notifications are
- // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
- // for details.
- if (read_handle != ACE_INVALID_HANDLE)
- {
- --number_of_active_handles;
-
- return this->handle_input (read_handle);
- }
- else
- return 0;
-#else
ACE_NOTSUP_RETURN (-1);
-#endif /* 0 */
}
int
@@ -286,26 +223,31 @@ ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
char b;
read_p = &b;
to_read = 1;
- ACE_Notification_Buffer *temp;
- // New scope to release the guard before trying the recv().
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+ // Before reading the byte, pop a message from the queue and queue a
+ // new message unless the queue is now empty. The protocol is to
+ // keep a byte in the pipe as long as the queue is not empty.
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
- if (notify_queue_.is_empty ())
+ int result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
+
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")),
- -1);
- buffer = *temp;
- have_one = true;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: enqueue_head")));
- }
+ }
+
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, 1 /* one byte is enough */);
+ }
#else
to_read = sizeof buffer;
@@ -475,83 +417,7 @@ ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular
- // handler), and it matches the specified handler (or purging
- // all), and applying the mask would totally eliminate the
- // notification, then release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
- // notification mask
- // is left with
- // nothing when
- // applying the mask.
- {
- if (this->free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a
- // particular handler), and it matches the specified handler
- // (or purging all), then apply the mask.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- {
- // Should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // Now put it back in the notify queue.
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
diff --git a/ACE/ace/Dev_Poll_Reactor.h b/ACE/ace/Dev_Poll_Reactor.h
index f8c5b58f1d2..f82283a70b7 100644
--- a/ACE/ace/Dev_Poll_Reactor.h
+++ b/ACE/ace/Dev_Poll_Reactor.h
@@ -53,7 +53,7 @@
#include "ace/Token.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#if defined (ACE_HAS_DEV_POLL)
@@ -283,30 +283,17 @@ protected:
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
/**
- * @name Reactor Notification Attributes
+ * @brief A user-space queue to store the notifications.
*
- * This configuration queues up notifications in separate buffers
- * that are in user-space, rather than stored in a pipe in the OS
- * kernel. The kernel-level notifications are used only to trigger
- * the Reactor to check its notification queue. This enables many
- * more notifications to be stored than would otherwise be the
- * case.
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
*/
- //@{
-
- /// ACE_Notification_Buffers are allocated in chunks. Each time a chunk is
- /// allocated, the chunk is added to alloc_queue_ so it can be freed later.
- /// Each individual ACE_Notification_Buffer is added to the free_queue_
- /// when it's free. Those in use for queued notifications are placed on the
- /// notify_queue_.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
-
- //@}
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ACE/ace/Intrusive_List.cpp b/ACE/ace/Intrusive_List.cpp
index 4a374c9b045..c0006792ce4 100644
--- a/ACE/ace/Intrusive_List.cpp
+++ b/ACE/ace/Intrusive_List.cpp
@@ -69,7 +69,7 @@ ACE_Intrusive_List<T>::pop_front (void)
T *node = this->head_;
if (node == 0)
return 0;
- this->remove_i (node);
+ this->unsafe_remove (node);
return node;
}
@@ -79,7 +79,7 @@ ACE_Intrusive_List<T>::pop_back (void)
T *node = this->tail_;
if (node == 0)
return 0;
- this->remove_i (node);
+ this->unsafe_remove (node);
return node;
}
@@ -90,14 +90,14 @@ ACE_Intrusive_List<T>::remove (T *node)
{
if (node == i)
{
- this->remove_i (node);
+ this->unsafe_remove (node);
return;
}
}
}
template<class T> void
-ACE_Intrusive_List<T>::remove_i (T *node)
+ACE_Intrusive_List<T>::unsafe_remove (T *node)
{
if (node->prev () != 0)
node->prev ()->next (node->next ());
diff --git a/ACE/ace/Intrusive_List.h b/ACE/ace/Intrusive_List.h
index 82789110cfa..59d0c9054a8 100644
--- a/ACE/ace/Intrusive_List.h
+++ b/ACE/ace/Intrusive_List.h
@@ -99,14 +99,17 @@ public:
*/
void remove (T *node);
-private:
- /// Remove a element from the list
+ /// Swap two lists
+ void swap(ACE_Intrusive_List<T> & rhs);
+
+ /// Remove a element from the list without checking
/**
* No attempts are performed to check if T* really belongs to the
* list. The effects of removing an invalid element are unspecified
*/
- void remove_i (T *node);
+ void unsafe_remove (T *node);
+private:
/** @name Disallow copying
*
*/
diff --git a/ACE/ace/Intrusive_List.inl b/ACE/ace/Intrusive_List.inl
index cc3ffc70109..f76370917e9 100644
--- a/ACE/ace/Intrusive_List.inl
+++ b/ACE/ace/Intrusive_List.inl
@@ -2,6 +2,8 @@
//
// $Id$
+#include <algorithm>
+
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
template<class T> ACE_INLINE int
@@ -28,4 +30,11 @@ ACE_Intrusive_List<T>::tail (void) const
return this->tail_;
}
+template<class T> ACE_INLINE void
+ACE_Intrusive_List<T>::swap(ACE_Intrusive_List<T> & rhs)
+{
+ std::swap(head_, rhs.head_);
+ std::swap(tail_, rhs.tail_);
+}
+
ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/Notification_Queue.cpp b/ACE/ace/Notification_Queue.cpp
new file mode 100644
index 00000000000..0b2ca59e831
--- /dev/null
+++ b/ACE/ace/Notification_Queue.cpp
@@ -0,0 +1,210 @@
+// $Id$
+
+#include "ace/Notification_Queue.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_Notification_Queue::
+ACE_Notification_Queue()
+ : ACE_Copy_Disabled()
+ , alloc_queue_()
+ , notify_queue_()
+ , free_queue_()
+{
+}
+
+ACE_Notification_Queue::
+~ACE_Notification_Queue()
+{
+ reset();
+}
+
+int
+ACE_Notification_Queue::
+open()
+{
+ ACE_TRACE ("ACE_Notification_Queue::open");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (!this->free_queue_.is_empty ())
+ return 0;
+
+ return allocate_more_buffers();
+}
+
+void
+ACE_Notification_Queue::
+reset()
+{
+ ACE_TRACE ("ACE_Notification_Queue::reset");
+
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Queue_Node **b = 0;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ {
+ delete [] *b;
+ *b = 0;
+ }
+
+ this->alloc_queue_.reset ();
+
+ // Swap with an empty list to reset the contents
+ Buffer_List().swap(notify_queue_);
+ Buffer_List().swap(free_queue_);
+}
+
+int ACE_Notification_Queue::
+allocate_more_buffers()
+{
+ ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
+
+ ACE_Notification_Queue_Node *temp = 0;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_queue_.enqueue_head (temp) == -1)
+ {
+ delete [] temp;
+ return -1;
+ }
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
+ {
+ free_queue_.push_front(temp + i);
+ }
+
+ return 0;
+}
+
+int
+ACE_Notification_Queue::
+purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (this->notify_queue_.is_empty ())
+ return 0;
+
+ int number_purged = 0;
+ ACE_Notification_Queue_Node * node = notify_queue_.head();
+ while(node != 0)
+ {
+ if (!node->matches_for_purging(eh))
+ {
+ // Easy case, skip to the next node
+ node = node->next();
+ continue;
+ }
+
+ if (!node->mask_disables_all_notifications(mask))
+ {
+ // ... another easy case, skip this node too, but clear the
+ // mask first ...
+ node->clear_mask(mask);
+ node = node->next();
+ continue;
+ }
+
+ // ... this is the more complicated case, we want to remove the
+ // node from the notify_queue_ list. First save the next node
+ // on the list:
+ ACE_Notification_Queue_Node * next = node->next();
+
+ // ... then remove it ...
+ notify_queue_.unsafe_remove(node);
+ ++number_purged;
+
+ // ... release resources ...
+ ACE_Event_Handler *event_handler = node->get().eh_;
+ event_handler->remove_reference ();
+
+ // ... now this is a free node ...
+ free_queue_.push_front(node);
+
+ // ... go to the next node, if there is one ...
+ node = next;
+ }
+
+ return number_purged;
+}
+
+int ACE_Notification_Queue::
+push_new_notification(
+ ACE_Notification_Buffer const & buffer)
+{
+ ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
+
+ bool notification_required = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+ if (this->notify_queue_.is_empty ())
+ notification_required = true;
+
+ if (free_queue_.is_empty())
+ {
+ if (allocate_more_buffers() == -1)
+ {
+ return -1;
+ }
+ }
+
+ ACE_Notification_Queue_Node * node =
+ free_queue_.pop_front();
+
+ ACE_ASSERT (node != 0);
+ node->set(buffer);
+
+ notify_queue_.push_back(node);
+
+ if (!notification_required)
+ {
+ return 0;
+ }
+
+ return 1;
+}
+
+int
+ACE_Notification_Queue::pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next)
+{
+ ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
+
+ more_messages_queued = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (notify_queue_.is_empty ())
+ {
+ return 0;
+ }
+
+ ACE_Notification_Queue_Node * node =
+ notify_queue_.pop_front();
+
+ current = node->get();
+ free_queue_.push_front(node);
+
+ if(!this->notify_queue_.is_empty())
+ {
+ more_messages_queued = true;
+ next = notify_queue_.head()->get();
+ }
+
+ return 1;
+}
diff --git a/ACE/ace/Notification_Queue.h b/ACE/ace/Notification_Queue.h
new file mode 100644
index 00000000000..a0348822060
--- /dev/null
+++ b/ACE/ace/Notification_Queue.h
@@ -0,0 +1,156 @@
+#ifndef ACE_NOTIFICATION_QUEUE_H
+#define ACE_NOTIFICATION_QUEUE_H
+
+#include /**/ "ace/pre.h"
+
+/**
+ * @file Notification_Queue.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ */
+#include "ace/Copy_Disabled.h"
+#include "ace/Event_Handler.h"
+#include "ace/Intrusive_List.h"
+#include "ace/Intrusive_List_Node.h"
+#include "ace/Unbounded_Queue.h"
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/**
+ * @class ACE_Notification_Queue_Node
+ *
+ * @brief Helper class
+ */
+class ACE_Export ACE_Notification_Queue_Node
+ : public ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>
+{
+public:
+ /**
+ * @brief Constructor
+ */
+ ACE_Notification_Queue_Node();
+
+ /**
+ * @brief Modifier change the contained buffer
+ */
+ void set(ACE_Notification_Buffer const & rhs);
+
+ /**
+ * @brief Accessor, fetch the contained buffer
+ */
+ ACE_Notification_Buffer const & get() const;
+
+ /**
+ * @brief Checks if the event handler matches the purge condition
+ */
+ bool matches_for_purging(ACE_Event_Handler * eh) const;
+
+ /**
+ * @brief Return true if clearing the mask would leave no
+ * notifications to deliver.
+ */
+ bool mask_disables_all_notifications(ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Clear the notifications specified by @c mask
+ */
+ void clear_mask(ACE_Reactor_Mask mask);
+
+private:
+ ACE_Notification_Buffer contents_;
+};
+
+/**
+ * @class ACE_Notification_Queue
+ *
+ * @brief Implements a user-space queue to send Reactor notifications.
+ *
+ * The ACE_Reactor uses a pipe to send wake up the thread running the
+ * event loop from other threads. This pipe can be limited in size
+ * under some operating systems. For some applications, this limit
+ * presents a problem. A user-space notification queue is used to
+ * overcome those limitations. The queue tries to use as few
+ * resources on the pipe as possible, while keeping all the data in
+ * user space.
+ *
+ * This code was refactored from Select_Reactor_Base.
+ */
+class ACE_Export ACE_Notification_Queue : private ACE_Copy_Disabled
+{
+public:
+ ACE_Notification_Queue();
+ ~ACE_Notification_Queue();
+
+ /**
+ * @brief Pre-allocate resources in the queue
+ */
+ int open();
+
+ /**
+ * @brief Release all resources in the queue
+ */
+ void reset();
+
+ /**
+ * @brief Remove all elements in the queue matching @c eh and @c mask
+ *
+ * I suggest reading the documentation in ACE_Reactor to find a more
+ * detailed description. This is just a helper function.
+ */
+ int purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Add a new notification to the queue
+ *
+ * @return -1 on failure, 1 if a new message should be sent through
+ * the pipe and 0 otherwise.
+ */
+ int push_new_notification(ACE_Notification_Buffer const & buffer);
+
+ /**
+ * @brief Extract the next notification from the queue
+ *
+ * @return -1 on failure, 1 if a message was popped, 0 otherwise
+ */
+ int pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next);
+
+private:
+ /**
+ * @brief Allocate more memory for the queue
+ */
+ int allocate_more_buffers();
+
+private:
+ /// Keeps track of allocated arrays of type
+ /// ACE_Notification_Buffer. The idea is to amortize allocation
+ /// costs by allocating multiple ACE_Notification_Buffer objects at
+ /// a time.
+ ACE_Unbounded_Queue <ACE_Notification_Queue_Node*> alloc_queue_;
+
+ typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> Buffer_List;
+
+ /// Keeps track of all pending notifications.
+ Buffer_List notify_queue_;
+
+ /// Keeps track of all free buffers.
+ Buffer_List free_queue_;
+
+ /// Synchronization for handling of queues.
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+};
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_NOTIFICATION_QUEUE_H */
diff --git a/ACE/ace/Notification_Queue.inl b/ACE/ace/Notification_Queue.inl
new file mode 100644
index 00000000000..b7ef09a820f
--- /dev/null
+++ b/ACE/ace/Notification_Queue.inl
@@ -0,0 +1,47 @@
+// $Id$
+
+ACE_INLINE ACE_Notification_Queue_Node::
+ACE_Notification_Queue_Node()
+ : ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>()
+ , contents_(0, 0)
+{
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+set(ACE_Notification_Buffer const & rhs)
+{
+ contents_ = rhs;
+}
+
+ACE_INLINE ACE_Notification_Buffer const &
+ACE_Notification_Queue_Node::
+get() const
+{
+ return contents_;
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+matches_for_purging(ACE_Event_Handler * eh) const
+{
+ return (0 != get().eh_) && (0 == eh || eh == get().eh_);
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+mask_disables_all_notifications(ACE_Reactor_Mask mask)
+{
+ // the existing notification mask is left with nothing when applying
+ // the mask
+ return ACE_BIT_DISABLED (get().mask_, ~mask);
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+clear_mask(ACE_Reactor_Mask mask)
+{
+ ACE_CLR_BITS(contents_.mask_, mask);
+}
+
+
diff --git a/ACE/ace/Select_Reactor_Base.cpp b/ACE/ace/Select_Reactor_Base.cpp
index 94fa673b84e..0306fc7007a 100644
--- a/ACE/ace/Select_Reactor_Base.cpp
+++ b/ACE/ace/Select_Reactor_Base.cpp
@@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp = 0;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- ACE_Event_Handler *event_handler = temp->eh_;
- event_handler->remove_reference ();
-
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a particularhandler),
- // and it matches the specified handler (or purging all), then
- // apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- { // should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
@@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp = 0;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- {
- delete [] temp;
- return -1;
- }
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
-
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
@@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b = 0;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Artificial scope to limit the duration of the mutex.
- {
- bool notification_required = false;
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = true;
-
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1 = 0;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
-
- ACE_ASSERT (temp != 0);
- *temp = buffer;
-
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- if (!notification_required)
- {
- // No failures.
- safe_handler.release ();
+ if (notification_required == 0)
+ {
+ // No failures, the handler is now owned by the notification queue
+ safe_handler.release ();
- return 0;
- }
+ return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
@@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
- ACE_Notification_Buffer *temp = 0;
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
+
+ result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
- if (notify_queue_.is_empty ())
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- bool write_next_buffer = false;
- ACE_Notification_Buffer ** next = 0;
-
- if(!this->notify_queue_.is_empty())
- {
- // The queue is not empty, need to queue another message.
- this->notify_queue_.get (next, 0);
- write_next_buffer = true;
- }
-
- if(write_next_buffer)
- {
- (void) ACE::send(
- this->notification_pipe_.write_handle(),
- (char *)*next, sizeof(ACE_Notification_Buffer));
- }
- }
+ }
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, sizeof(ACE_Notification_Buffer));
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
diff --git a/ACE/ace/Select_Reactor_Base.h b/ACE/ace/Select_Reactor_Base.h
index 9ec4e642210..bb12154ca8c 100644
--- a/ACE/ace/Select_Reactor_Base.h
+++ b/ACE/ace/Select_Reactor_Base.h
@@ -27,7 +27,7 @@
#include "ace/Reactor_Impl.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#ifdef ACE_WIN32
@@ -253,24 +253,18 @@ protected:
int max_notify_iterations_;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // = This configuration queues up notifications in separate buffers that
- // are in user-space, rather than stored in a pipe in the OS
- // kernel. The kernel-level notifications are used only to trigger
- // the Reactor to check its notification queue. This enables many
- // more notifications to be stored than would otherwise be the case.
-
- /// Keeps track of allocated arrays of type
- /// ACE_Notification_Buffer.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
-
- /// Keeps track of all pending notifications.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
-
- /// Keeps track of all free buffers.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
+ /**
+ * @brief A user-space queue to store the notifications.
+ *
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
+ */
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc
index 6f67e730e44..d91138a1ae9 100644
--- a/ACE/ace/ace.mpc
+++ b/ACE/ace/ace.mpc
@@ -125,6 +125,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca
Mutex.cpp
Netlink_Addr.cpp
Notification_Strategy.cpp
+ Notification_Queue.cpp
Obchunk.cpp
Object_Manager.cpp
Object_Manager_Base.cpp
diff --git a/ACE/ace/ace_for_tao.mpc b/ACE/ace/ace_for_tao.mpc
index 85798f0692f..76737dc380c 100644
--- a/ACE/ace/ace_for_tao.mpc
+++ b/ACE/ace/ace_for_tao.mpc
@@ -87,6 +87,7 @@ project(ACE_FOR_TAO) : acedefaults, install, svcconf, uuid, versioned_namespace,
MMAP_Memory_Pool.cpp
Mutex.cpp
Notification_Strategy.cpp
+ Notification_Queue.cpp
Obchunk.cpp
Object_Manager.cpp
Object_Manager_Base.cpp
diff --git a/ACE/tests/Bug_2815_Regression_Test.cpp b/ACE/tests/Bug_2815_Regression_Test.cpp
new file mode 100644
index 00000000000..4c4eb0a3630
--- /dev/null
+++ b/ACE/tests/Bug_2815_Regression_Test.cpp
@@ -0,0 +1,499 @@
+/**
+ * @file Bug_2815_Regression_Test.cpp
+ *
+ * $Id$
+ *
+ * Verify that the notification queue can be used with large numbers
+ * of event handlers.
+ *
+ * Normally the ACE_Reactor uses a pipe to implement the notify()
+ * methods. ACE can be compiled with
+ * ACE_HAS_REACTOR_NOTIFICATION_QUEUE, with this configuration flag
+ * the Reactor uses a user-space queue to contain the notifications.
+ * A single message is sent through the pipe to indicate "pipe not
+ * empty."
+ *
+ * In this configuration, if an event handler is removed
+ * from the Reactor the user-space queue has to be searched for
+ * pending notifications and the notifications must be removed.
+ *
+ * The original implementation used a naive algorithm to search and
+ * remove the handlers, which resulted in very high overhead when
+ * removing handlers while having a very long notification queue.
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ *
+ */
+
+#include "test_config.h"
+#include "ace/Reactor.h"
+#include "ace/TP_Reactor.h"
+#include "ace/Select_Reactor.h"
+
+ACE_RCSID(tests,
+ Bug_2815_Regression_Test, "$Id$")
+
+class One_Shot_Handler;
+
+/**
+ * @class Driver
+ *
+ * @brief Main driver for the test, generates notification events and
+ * verifies they are received correctly.
+ *
+ */
+class Driver
+{
+public:
+ Driver(ACE_Reactor * reactor,
+ int max_notifications,
+ char const *test_name);
+
+ /// Run the test
+ void run (void);
+
+ /// One of the sub-handlers has received a notification
+ void notification_received ();
+
+ /// One of the sub-handlers has decided to skip several notifications
+ void notifications_skipped (int skip_count);
+
+ /**
+ * @brief Return the reactor configured for this test
+ */
+ ACE_Reactor * reactor ();
+
+private:
+ /**
+ * @brief Implement a single iteration.
+ *
+ * Each iteration of the test consists of sending multiple
+ * notifications simultaneously.
+ */
+ void send_notifications (void);
+
+ /**
+ * @brief Return true if the test is finished.
+ */
+ bool done (void) const;
+
+ /**
+ * @brief Return true if there are more iterations to run.
+ */
+ bool more_iterations () const;
+
+ /**
+ * @brief Return true if the current iteration is completed.
+ */
+ bool current_iteration_done () const;
+
+ /**
+ * @brief Run one iteration of the test, each iteration doubles
+ * the number of events.
+ */
+ int run_one_iteration (void);
+
+ /**
+ * @brief Initialize a bunch of One_Shot_Handlers
+ */
+ void initialize_handlers(
+ int nhandlers, One_Shot_Handler ** handlers);
+
+ /**
+ * @brief Dispatch events to the One_Shot_Handlers
+ */
+ void notify_handlers(
+ int nhandlers, One_Shot_Handler ** handlers);
+
+ /**
+ * @brief Helpful for debugging
+ *
+ * The number of notifications received, skipped and sent are
+ * subject to simple invariants. During debugging any violation of
+ * those invariants indicates a problem in the application or the
+ * Reactor.
+ */
+ void check_notification_invariants();
+
+ /// A good place to set break points.
+ void invariant_failed();
+
+private:
+ /**
+ * @brief The reactor used in this test
+ */
+ ACE_Reactor * reactor_;
+
+ /**
+ * @brief The maximum number of notifications in any single
+ * iteration.
+ */
+ int max_notifications_;
+
+ /**
+ * @brief The name of the test
+ */
+ char const * test_name_;
+
+ /**
+ * @brief Number of notifications received
+ */
+ int notifications_sent_;
+
+ /**
+ * @brief Number of notifications sent
+ */
+ int notifications_recv_;
+
+ /**
+ * @brief Number of notifications skipped because
+ * the handler was removed
+ */
+ int notifications_skipped_;
+
+ /**
+ * @brief Number of notifications sent on each iteration
+ */
+ int notifications_curr_;
+};
+
+/**
+ * @class One_Shot_Handler
+ *
+ * @brief A handler that removes itself from the reactor
+ * after its first notification.
+ *
+ * To demonstrate the problems with the first implementation of
+ * the notification queue we generate multiple event handlers.
+ * Then we generate multiple notifications for each, but the handlers
+ * remove themselves from the reactor when the first notification
+ * is delivered. This causes a lot of activity in the notification
+ * queue.
+ *
+ */
+class One_Shot_Handler : public ACE_Event_Handler
+{
+public:
+ One_Shot_Handler(
+ Driver * master_handler,
+ char const * test_name,
+ int id);
+
+ /// Increase the number of expected notifications
+ void notification_queued();
+
+ /// Receive the notifications, but remove itself from the reactor on
+ /// on the first one.
+ virtual int handle_exception(ACE_HANDLE);
+
+private:
+ /// The driver for this test, communicate results to it
+ Driver * master_handler_;
+
+ /// The number of expected notifications
+ int expected_notifications_;
+
+ /// Identify the test and handler for debugging and better error output
+ char const * test_name_;
+ int id_;
+};
+
+int
+run_main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("Bug_2815_Regression_Test"));
+
+#if !defined(ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Notification queue disabled, ")
+ ACE_TEXT ("small test version, ")
+ ACE_TEXT ("which is of no practical use\n")));
+
+ int max_notifications = 16;
+#else
+ int max_notifications = 512 * 1024;
+#endif /* ACE_HAS_THREADS */
+
+ {
+ ACE_Reactor select_reactor (
+ new ACE_Select_Reactor,
+ 1);
+
+ Driver handler(&select_reactor,
+ max_notifications,
+ "Select_Reactor");
+
+ handler.run ();
+ }
+
+ {
+ ACE_Reactor tp_reactor (new ACE_TP_Reactor,
+ 1);
+ Driver handler(&tp_reactor,
+ max_notifications,
+ "TP_Reactor");
+ handler.run();
+ }
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+Driver::Driver (
+ ACE_Reactor * reactor,
+ int max_notifications,
+ char const * test_name)
+ : reactor_(reactor)
+ , max_notifications_(max_notifications)
+ , test_name_(test_name)
+ , notifications_sent_(0)
+ , notifications_recv_(0)
+ , notifications_skipped_(0)
+ , notifications_curr_(1)
+{
+}
+
+void
+Driver::run (void)
+{
+ while(more_iterations())
+ {
+ if(run_one_iteration() == -1)
+ {
+ return;
+ }
+
+ notifications_curr_ *= 2;
+ }
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Test %C passed sent=%d, recv=%d, skip=%d\n"),
+ test_name_,
+ notifications_sent_,
+ notifications_recv_,
+ notifications_skipped_));
+}
+
+void
+Driver::notification_received ()
+{
+ ++notifications_recv_;
+ check_notification_invariants();
+}
+
+void
+Driver::notifications_skipped (int skip_count)
+{
+ notifications_skipped_ += skip_count;
+ check_notification_invariants();
+}
+
+ACE_Reactor *
+Driver::reactor()
+{
+ return reactor_;
+}
+
+void
+Driver::send_notifications (void)
+{
+ int const nhandlers = 16;
+ One_Shot_Handler * handlers[nhandlers];
+ initialize_handlers(nhandlers, handlers);
+
+ for (int i = 0; i != notifications_curr_; ++i)
+ {
+ notify_handlers(nhandlers, handlers);
+ }
+}
+
+bool
+Driver::done (void) const
+{
+ return !more_iterations() && current_iteration_done();
+}
+
+bool
+Driver::more_iterations() const
+{
+ return notifications_curr_ < max_notifications_;
+}
+
+bool
+Driver::current_iteration_done() const
+{
+ return notifications_sent_ == (notifications_recv_ + notifications_skipped_);
+}
+
+int
+Driver::run_one_iteration (void)
+{
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Running iteration with %d events for %C test\n"),
+ notifications_curr_,
+ test_name_));
+
+ send_notifications ();
+
+ // Run for 30 seconds or until the test is done.
+
+ ACE_Time_Value const timeout(30,0);
+
+ while (!current_iteration_done())
+ {
+ ACE_Time_Value start = ACE_OS::gettimeofday();
+ ACE_Time_Value interval(1,0);
+ reactor()->run_reactor_event_loop(interval);
+ ACE_Time_Value end = ACE_OS::gettimeofday();
+
+ if (end - start >= timeout)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Test %C failed due to timeout ")
+ ACE_TEXT (" sent=%d,recv=%d,skip=%d\n"),
+ test_name_,
+ notifications_sent_,
+ notifications_recv_,
+ notifications_skipped_));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void
+Driver::initialize_handlers(
+ int nhandlers, One_Shot_Handler ** handlers)
+{
+ for (int j = 0; j != nhandlers; ++j)
+ {
+ handlers[j] = new One_Shot_Handler(this, test_name_, j);
+ }
+}
+
+void
+Driver::notify_handlers(
+ int nhandlers, One_Shot_Handler ** handlers)
+{
+ for(int i = 0; i != nhandlers; ++i)
+ {
+ if(reactor()->notify (handlers[i]) == -1)
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT ("Cannot send notifications in %C test (%d/%d)\n"),
+ test_name_, i, notifications_curr_));
+ return;
+ }
+ handlers[i]->notification_queued();
+
+ ++notifications_sent_;
+ }
+}
+
+void Driver::
+check_notification_invariants()
+{
+ if (notifications_sent_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications sent (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_recv_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications received (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_recv_));
+ invariant_failed();
+ }
+
+ if (notifications_skipped_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications skipped (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_skipped_));
+ invariant_failed();
+ }
+
+ if (notifications_sent_ < notifications_recv_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications received (%d)")
+ ACE_TEXT(" vs sent (%d)\n"),
+ notifications_recv_, notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_sent_ < notifications_skipped_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications skipped (%d)")
+ ACE_TEXT(" vs sent (%d)\n"),
+ notifications_skipped_, notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_skipped_ + notifications_recv_ > notifications_sent_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications skipped (%d)")
+ ACE_TEXT(" and received (%d) vs sent (%d)\n"),
+ notifications_skipped_, notifications_recv_,
+ notifications_sent_));
+ invariant_failed();
+ }
+}
+
+void Driver::
+invariant_failed()
+{
+ // Just a good place to set a breakpoint
+}
+
+// ============================================
+
+One_Shot_Handler::One_Shot_Handler(
+ Driver * master_handler,
+ char const * test_name, int id)
+ : ACE_Event_Handler(master_handler->reactor())
+ , master_handler_(master_handler)
+ , expected_notifications_(0)
+ , test_name_(test_name)
+ , id_(id)
+{
+}
+
+void One_Shot_Handler::
+notification_queued()
+{
+ ++expected_notifications_;
+}
+
+int One_Shot_Handler::
+handle_exception(ACE_HANDLE)
+{
+ --expected_notifications_;
+ master_handler_->notification_received();
+
+ int r = reactor()->purge_pending_notifications(this);
+ if (r >= 0)
+ {
+ master_handler_->notifications_skipped(r);
+ delete this;
+ return 0;
+ }
+
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT ("Cannot remove handler %d in %C test\n"),
+ id_, test_name_));
+
+ delete this;
+ return 0;
+}
diff --git a/ACE/tests/Notification_Queue_Unit_Test.cpp b/ACE/tests/Notification_Queue_Unit_Test.cpp
new file mode 100644
index 00000000000..cc138e3fc03
--- /dev/null
+++ b/ACE/tests/Notification_Queue_Unit_Test.cpp
@@ -0,0 +1,256 @@
+/**
+ * @file Notification_Queue_Unit_Test.cpp
+ *
+ * $Id$
+ *
+ * A unit test for the ACE_Notification_Queue class.
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ *
+ */
+
+#include "test_config.h"
+#include "ace/Notification_Queue.h"
+
+ACE_RCSID(tests,
+ Notification_Queue_Unit_Test, "$Id$")
+
+#define TEST_LIST \
+ ACTION(null_test) \
+ ACTION(pop_returns_element_pushed) \
+ ACTION(purge_empty_queue) \
+ ACTION(purge_with_no_matches) \
+ ACTION(purge_with_single_match) \
+ ACTION(purge_with_multiple_matches) \
+
+// Declare all the tests
+#define ACTION(TEST_NAME) void TEST_NAME (char const * test_name);
+TEST_LIST
+#undef ACTION
+
+int
+run_main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("Notification_Queue_Unit_Test"));
+
+ // Call all the tests
+#define ACTION(TEST_NAME) TEST_NAME (#TEST_NAME);
+TEST_LIST
+#undef ACTION
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+// There are far more elegant ways to do this. Ideally one would use
+// an existing framework (Boost.Test, TUT, CppTest). But this will
+// do for our purposes
+#define TEST_INTEGER_EQUAL(X, Y, MSG) \
+ do { \
+ if ((X) == (Y)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("%C in (%C %N:%l) %C (%d) != %C (%d)\n"), \
+ ACE_TEXT(MSG), test_name, \
+ ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \
+ } while(0)
+#define TEST_INTEGER_NOT_EQUAL(X, Y, MSG) \
+ do { \
+ if ((X) != (Y)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("%C in (%C %N:%l) %C (%d) == %C (%d)\n"), \
+ ACE_TEXT(MSG), test_name, \
+ ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \
+ } while(0)
+#define TEST_ASSERT(PREDICATE, MESSAGE) \
+ do { \
+ if ((PREDICATE)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("Assertion failure in (%C %N:%l) %C %C\n"), \
+ test_name, ACE_TEXT(#PREDICATE), MESSAGE )); \
+ } while(0)
+
+
+void null_test(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ TEST_INTEGER_EQUAL(0, 0, "Test framework failure");
+ TEST_INTEGER_NOT_EQUAL(1, 0, "Test framework failure");
+ TEST_ASSERT(true, ACE_TEXT("True is still true"));
+}
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler(int event_handler_id)
+ : ACE_Event_Handler()
+ , id (event_handler_id)
+ {
+ }
+
+ int id;
+};
+
+void pop_returns_element_pushed(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+ Event_Handler eh3(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK));
+ TEST_ASSERT(result == 1, "push[1] should return 1");
+
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ TEST_ASSERT(result == 0, "push[2] should return 0");
+
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh3,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ TEST_ASSERT(result == 0, "push[3] should return 0");
+
+ ACE_Notification_Buffer current;
+ bool more_messages_queued;
+ ACE_Notification_Buffer next;
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[0] should return 1");
+ TEST_ASSERT(more_messages_queued, "pop[0] should have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh1, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK,
+ "Wrong mask extracted");
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[1] should return 1");
+ TEST_ASSERT(more_messages_queued, "pop[1] should have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh2, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::WRITE_MASK,
+ "Wrong mask extracted");
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[2] should return 1");
+ TEST_ASSERT(!more_messages_queued, "pop[2] should not have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh3, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK,
+ "Wrong mask extracted");
+
+ more_messages_queued = true;
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 0, "pop[3] should return 0");
+ TEST_ASSERT(!more_messages_queued, "pop[3] should not have more messages");
+}
+
+void purge_empty_queue(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+
+ int result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::READ_MASK);
+ TEST_ASSERT(result == 0, "purge of empty queue should return 0");
+}
+
+void purge_with_no_matches(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::READ_MASK);
+ TEST_ASSERT(result == 0, "purge of eh2 should return 0");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_ASSERT(result == 0, "purge of eh1/WRITE should return 0");
+}
+
+void purge_with_single_match(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::READ_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::READ_MASK);
+ TEST_INTEGER_EQUAL(result, 1, "purge of eh2/READ should return 1");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::READ_MASK);
+ TEST_INTEGER_EQUAL(result, 0, "purge of eh1/READ should return 0");
+}
+
+void purge_with_multiple_matches(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::READ_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_INTEGER_EQUAL(result, 3, "purge of eh2/WRITE should return 3");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_INTEGER_EQUAL(result, 1, "purge of eh1/WRITE should return 1");
+}
+
diff --git a/ACE/tests/run_test.lst b/ACE/tests/run_test.lst
index e2639f4c3d4..4d0b911b48b 100644
--- a/ACE/tests/run_test.lst
+++ b/ACE/tests/run_test.lst
@@ -36,6 +36,7 @@ Bug_2497_Regression_Test
Bug_2540_Regression_Test
Bug_2659_Regression_Test: !ST
Bug_2653_Regression_Test: !ST
+Bug_2815_Regression_Test
CDR_Array_Test: !ACE_FOR_TAO
CDR_File_Test: !ACE_FOR_TAO
CDR_Test
@@ -96,6 +97,7 @@ Naming_Test: !LynxOS !Unicos !VxWorks !nsk !ACE_FOR_TAO
Network_Adapters_Test: !DISABLE_ToFix_LynxOS_PPC !DISABLE_ToFix_LynxOS_x86
New_Fail_Test: ALL !DISABLED
NonBlocking_Conn_Test
+Notification_Queue_Unit_Test
Notify_Performance_Test: !nsk !ACE_FOR_TAO
OS_Test
Object_Manager_Test
diff --git a/ACE/tests/tests.mpc b/ACE/tests/tests.mpc
index c79b0d36eaf..e9b4437b94a 100644
--- a/ACE/tests/tests.mpc
+++ b/ACE/tests/tests.mpc
@@ -234,6 +234,13 @@ project(Bug_2653_Regression_Test) : acetest {
}
}
+project(Bug_2815_Regression_Test) : acetest {
+ exename = Bug_2815_Regression_Test
+ Source_Files {
+ Bug_2815_Regression_Test.cpp
+ }
+}
+
project(Bug_2820_Regression_Test) : acetest {
exename = Bug_2820_Regression_Test
Source_Files {
@@ -631,6 +638,13 @@ project(New Fail Test) : acetest {
}
}
+project(Notification Queue Unit Test) : acetest {
+ exename = Notification_Queue_Unit_Test
+ Source_Files {
+ Notification_Queue_Unit_Test.cpp
+ }
+}
+
project(Notify Performance Test) : acetest {
avoids += ace_for_tao
exename = Notify_Performance_Test