summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-19 04:58:05 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-19 04:58:05 +0000
commitb3df323593d6110049d22bb03f6bcdae71ea4c4e (patch)
tree47caf592f75abff93580a7295e0aac96e6d498ab
parent8eb818c9a9577e14a5e9099ae1d5e1a183449155 (diff)
downloadATCD-b3df323593d6110049d22bb03f6bcdae71ea4c4e.tar.gz
Mon Feb 19 04:49:35 UTC 2007 coryan <coryan@atdesk.com>
-rw-r--r--ChangeLog35
-rw-r--r--ace/Dev_Poll_Reactor.cpp216
-rw-r--r--ace/Dev_Poll_Reactor.h33
-rw-r--r--ace/Notification_Queue.cpp261
-rw-r--r--ace/Notification_Queue.h101
-rw-r--r--ace/Select_Reactor_Base.cpp226
-rw-r--r--ace/Select_Reactor_Base.h32
-rw-r--r--ace/ace.mpc1
-rw-r--r--tests/Bug_2815_Regression_Test.cpp80
-rw-r--r--tests/Notification_Queue_Unit_Test.cpp256
-rw-r--r--tests/run_test.lst1
-rw-r--r--tests/tests.mpc7
12 files changed, 841 insertions, 408 deletions
diff --git a/ChangeLog b/ChangeLog
index c88a23cf9bd..1edd1d91b06 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,38 @@
+Mon Feb 19 04:49:35 UTC 2007 coryan <coryan@atdesk.com>
+
+ * ace/Notification_Queue.h:
+ * ace/Notification_Queue.cpp:
+ * ace/ace.mpc:
+ New class to encapsulate the implementation of a user-space
+ based notification queue. The code was duplicated in both
+ Select_Reactor.{h,cpp} and Dev_Poll_Reactor.{h,cpp}
+
+ * tests/tests.mpc:
+ * tests/run_test.lst:
+ * tests/Notification_Queue_Unit_Test.cpp:
+ New unit test for the notification queue class.
+
+ * ace/Dev_Poll_Reactor.h:
+ * ace/Dev_Poll_Reactor.cpp:
+ * ace/Select_Reactor_Base.h:
+ * ace/Select_Reactor_Base.cpp:
+ Refactored notification queue code to a new class
+ (ACE_Notification_Queue)
+
+ * tests/Bug_2815_Regression_Test.cpp:
+ Add code to help with debugging. Basically my refactoring above
+ had at least one bug, and this test uncovered it. But it was
+ hard to debug because there was no single breakpoint to detect
+ when the failure condition was triggered.
+
+ * tests:
+ Add new files to svn:ignore property. Also added some old
+ files.
+
+ * tests/SSL:
+ * include/makeinclude:
+ Add missing files to svn:ignore
+
Sun Feb 18 21:13:41 UTC 2007 coryan <coryan@atdesk.com>
* tests/tests.mpc:
diff --git a/ace/Dev_Poll_Reactor.cpp b/ace/Dev_Poll_Reactor.cpp
index 8bf63534735..abf3a432c31 100644
--- a/ace/Dev_Poll_Reactor.cpp
+++ b/ace/Dev_Poll_Reactor.cpp
@@ -50,10 +50,7 @@ ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
, notification_pipe_ ()
, max_notify_iterations_ (-1)
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- , alloc_queue_ ()
- , notify_queue_ ()
- , free_queue_ ()
- , notify_queue_lock_ ()
+ , notification_queue_ ()
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
{
}
@@ -85,18 +82,10 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- return -1;
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
if (ACE::set_flags (this->notification_pipe_.write_handle (),
ACE_NONBLOCK) == -1)
@@ -120,20 +109,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -154,42 +130,22 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Notification_Buffer buffer (eh, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // Locate a free buffer in the queue. Enlarge the queue if needed.
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- return -1;
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_ASSERT (temp != 0);
- *temp = buffer;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- ACE_Dev_Poll_Handler_Guard eh_guard (eh);
+ if (notification_required == 0)
+ {
+ eh_guard.release ();
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ return 0;
+ }
// Now pop the pipe to force the callback for dispatching when ready. If
// the send fails due to a full pipe, don't fail - assume the already-sent
@@ -239,26 +195,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
// also serves to slow down event dispatching particularly with this
// ACE_Dev_Poll_Reactor.
-#if 0
- ACE_HANDLE read_handle =
- this->notification_pipe_.read_handle ();
-
- // Note that we do not check if the handle has received any events.
- // Instead a non-blocking "speculative" read is performed. If the
- // read returns with errno == EWOULDBLOCK then no notifications are
- // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
- // for details.
- if (read_handle != ACE_INVALID_HANDLE)
- {
- --number_of_active_handles;
-
- return this->handle_input (read_handle);
- }
- else
- return 0;
-#else
ACE_NOTSUP_RETURN (-1);
-#endif /* 0 */
}
int
@@ -286,26 +223,31 @@ ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
char b;
read_p = &b;
to_read = 1;
- ACE_Notification_Buffer *temp;
- // New scope to release the guard before trying the recv().
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+ // Before reading the byte, pop a message from the queue and queue a
+ // new message unless the queue is now empty. The protocol is to
+ // keep a byte in the pipe as long as the queue is not empty.
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
- if (notify_queue_.is_empty ())
+ int result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
+
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")),
- -1);
- buffer = *temp;
- have_one = true;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: enqueue_head")));
- }
+ }
+
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, 1 /* one byte is enough */);
+ }
#else
to_read = sizeof buffer;
@@ -475,83 +417,7 @@ ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular
- // handler), and it matches the specified handler (or purging
- // all), and applying the mask would totally eliminate the
- // notification, then release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
- // notification mask
- // is left with
- // nothing when
- // applying the mask.
- {
- if (this->free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a
- // particular handler), and it matches the specified handler
- // (or purging all), then apply the mask.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- {
- // Should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // Now put it back in the notify queue.
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
diff --git a/ace/Dev_Poll_Reactor.h b/ace/Dev_Poll_Reactor.h
index f8c5b58f1d2..f82283a70b7 100644
--- a/ace/Dev_Poll_Reactor.h
+++ b/ace/Dev_Poll_Reactor.h
@@ -53,7 +53,7 @@
#include "ace/Token.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#if defined (ACE_HAS_DEV_POLL)
@@ -283,30 +283,17 @@ protected:
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
/**
- * @name Reactor Notification Attributes
+ * @brief A user-space queue to store the notifications.
*
- * This configuration queues up notifications in separate buffers
- * that are in user-space, rather than stored in a pipe in the OS
- * kernel. The kernel-level notifications are used only to trigger
- * the Reactor to check its notification queue. This enables many
- * more notifications to be stored than would otherwise be the
- * case.
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
*/
- //@{
-
- /// ACE_Notification_Buffers are allocated in chunks. Each time a chunk is
- /// allocated, the chunk is added to alloc_queue_ so it can be freed later.
- /// Each individual ACE_Notification_Buffer is added to the free_queue_
- /// when it's free. Those in use for queued notifications are placed on the
- /// notify_queue_.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
-
- //@}
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ace/Notification_Queue.cpp b/ace/Notification_Queue.cpp
new file mode 100644
index 00000000000..f08d34aef46
--- /dev/null
+++ b/ace/Notification_Queue.cpp
@@ -0,0 +1,261 @@
+// $Id$
+
+#include "ace/Notification_Queue.h"
+
+ACE_Notification_Queue::
+ACE_Notification_Queue()
+ : ACE_Copy_Disabled()
+ , alloc_queue_()
+ , notify_queue_()
+ , free_queue_()
+{
+}
+
+ACE_Notification_Queue::
+~ACE_Notification_Queue()
+{
+ reset();
+}
+
+int
+ACE_Notification_Queue::
+open()
+{
+ ACE_TRACE ("ACE_Notification_Queue::open");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (!this->free_queue_.is_empty ())
+ return 0;
+
+ return allocate_more_buffers();
+}
+
+void
+ACE_Notification_Queue::
+reset()
+{
+ ACE_TRACE ("ACE_Notification_Queue::reset");
+
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Buffer **b = 0;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ {
+ delete [] *b;
+ *b = 0;
+ }
+
+ this->alloc_queue_.reset ();
+ this->notify_queue_.reset ();
+ this->free_queue_.reset ();
+}
+
+int ACE_Notification_Queue::
+allocate_more_buffers()
+{
+ ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
+
+ ACE_Notification_Buffer *temp = 0;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_queue_.enqueue_head (temp) == -1)
+ {
+ delete [] temp;
+ return -1;
+ }
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
+ if (free_queue_.enqueue_head (temp + i) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_Notification_Queue::
+purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (this->notify_queue_.is_empty ())
+ return 0;
+
+ ACE_Notification_Buffer *temp = 0;
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
+
+ size_t queue_size = this->notify_queue_.size ();
+ int number_purged = 0;
+ size_t i;
+ for (i = 0; i < queue_size; ++i)
+ {
+ if (-1 == this->notify_queue_.dequeue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+
+ // If this is not a Reactor notify (it is for a particular handler),
+ // and it matches the specified handler (or purging all),
+ // and applying the mask would totally eliminate the notification, then
+ // release it and count the number purged.
+ if ((0 != temp->eh_) &&
+ (0 == eh || eh == temp->eh_) &&
+ ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
+ // is left with nothing when
+ // applying the mask
+ {
+ if (-1 == this->free_queue_.enqueue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+
+ ACE_Event_Handler *event_handler = temp->eh_;
+ event_handler->remove_reference ();
+
+ ++number_purged;
+ }
+ else
+ {
+ // To preserve it, move it to the local_queue.
+ // But first, if this is not a Reactor notify (it is for a
+ // particular handler), and it matches the specified handler
+ // (or purging all), then apply the mask
+ if ((0 != temp->eh_) &&
+ (0 == eh || eh == temp->eh_))
+ {
+ ACE_CLR_BITS(temp->mask_, mask);
+ }
+
+ if (-1 == local_queue.enqueue_head (temp))
+ return -1;
+ }
+ }
+
+ if (this->notify_queue_.size ())
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("Notification queue should be ")
+ ACE_LIB_TEXT ("empty after purging")),
+ -1);
+ }
+
+ // now put it back in the notify queue
+ queue_size = local_queue.size ();
+ for (i = 0; i < queue_size; ++i)
+ {
+ if (-1 == local_queue.dequeue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+
+ if (-1 == this->notify_queue_.enqueue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ return number_purged;
+}
+
+int ACE_Notification_Queue::
+push_new_notification(
+ ACE_Notification_Buffer const & buffer)
+{
+ ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
+
+ bool notification_required = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+ if (this->notify_queue_.is_empty ())
+ notification_required = true;
+
+ ACE_Notification_Buffer *temp = 0;
+
+ if (free_queue_.dequeue_head (temp) == -1)
+ {
+ if (allocate_more_buffers() == -1)
+ {
+ return -1;
+ }
+
+ free_queue_.dequeue_head(temp);
+ }
+
+ ACE_ASSERT (temp != 0);
+ *temp = buffer;
+
+ if (notify_queue_.enqueue_tail (temp) == -1)
+ return -1;
+
+ if (!notification_required)
+ {
+ return 0;
+ }
+
+ return 1;
+}
+
+int
+ACE_Notification_Queue::pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next)
+{
+ ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
+
+ more_messages_queued = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (notify_queue_.is_empty ())
+ {
+ return 0;
+ }
+
+ ACE_Notification_Buffer *temp = 0;
+
+ if (notify_queue_.dequeue_head (temp) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ }
+
+ current = *temp;
+ if (free_queue_.enqueue_head (temp) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ ACE_Notification_Buffer ** n = 0;
+
+ if(!this->notify_queue_.is_empty())
+ {
+ // The queue is not empty, need to queue another message.
+ this->notify_queue_.get (n, 0);
+ more_messages_queued = true;
+ next = **n;
+ }
+
+ return 1;
+}
+
+
diff --git a/ace/Notification_Queue.h b/ace/Notification_Queue.h
new file mode 100644
index 00000000000..a5c3f534023
--- /dev/null
+++ b/ace/Notification_Queue.h
@@ -0,0 +1,101 @@
+#ifndef ACE_NOTIFICATION_QUEUE_H
+#define ACE_NOTIFICATION_QUEUE_H
+
+#include /**/ "ace/pre.h"
+
+#include "ace/Event_Handler.h"
+
+/**
+ * @file Notification_Queue.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ */
+#include "ace/Copy_Disabled.h"
+#include "ace/Unbounded_Queue.h"
+
+/**
+ * @class ACE_Notification_Queue
+ *
+ * @brief Implements a user-space queue to send Reactor notifications.
+ *
+ * The ACE_Reactor uses a pipe to send wake up the thread running the
+ * event loop from other threads. This pipe can be limited in size
+ * under some operating systems. For some applications, this limit
+ * presents a problem. A user-space notification queue is used to
+ * overcome those limitations. The queue tries to use as few
+ * resources on the pipe as possible, while keeping all the data in
+ * user space.
+ *
+ * This code was refactored from Select_Reactor_Base.
+ */
+class ACE_Notification_Queue : private ACE_Copy_Disabled
+{
+public:
+ ACE_Notification_Queue();
+ ~ACE_Notification_Queue();
+
+ /**
+ * @brief Pre-allocate resources in the queue
+ */
+ int open();
+
+ /**
+ * @brief Release all resources in the queue
+ */
+ void reset();
+
+ /**
+ * @brief Remove all elements in the queue matching @c eh and @c mask
+ *
+ * I suggest reading the documentation in ACE_Reactor to find a more
+ * detailed description. This is just a helper function.
+ */
+ int purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Add a new notification to the queue
+ *
+ * @return -1 on failure, 1 if a new message should be sent through
+ * the pipe and 0 otherwise.
+ */
+ int push_new_notification(ACE_Notification_Buffer const & buffer);
+
+ /**
+ * @brief Extract the next notification from the queue
+ *
+ * @return -1 on failure, 1 if a message was popped, 0 otherwise
+ */
+ int pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next);
+
+private:
+ /**
+ * @brief Allocate more memory for the queue
+ */
+ int allocate_more_buffers();
+
+private:
+ /// Keeps track of allocated arrays of type
+ /// ACE_Notification_Buffer. The idea is to amortize allocation
+ /// costs by allocating multiple ACE_Notification_Buffer objects at
+ /// a time.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
+
+ /// Keeps track of all pending notifications.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
+
+ /// Keeps track of all free buffers.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
+
+ /// Synchronization for handling of queues.
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+};
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_NOTIFICATION_QUEUE_H */
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 94fa673b84e..0306fc7007a 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp = 0;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- ACE_Event_Handler *event_handler = temp->eh_;
- event_handler->remove_reference ();
-
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a particularhandler),
- // and it matches the specified handler (or purging all), then
- // apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- { // should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
@@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp = 0;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- {
- delete [] temp;
- return -1;
- }
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
-
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
@@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b = 0;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Artificial scope to limit the duration of the mutex.
- {
- bool notification_required = false;
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = true;
-
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1 = 0;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
-
- ACE_ASSERT (temp != 0);
- *temp = buffer;
-
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- if (!notification_required)
- {
- // No failures.
- safe_handler.release ();
+ if (notification_required == 0)
+ {
+ // No failures, the handler is now owned by the notification queue
+ safe_handler.release ();
- return 0;
- }
+ return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
@@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
- ACE_Notification_Buffer *temp = 0;
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
+
+ result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
- if (notify_queue_.is_empty ())
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- bool write_next_buffer = false;
- ACE_Notification_Buffer ** next = 0;
-
- if(!this->notify_queue_.is_empty())
- {
- // The queue is not empty, need to queue another message.
- this->notify_queue_.get (next, 0);
- write_next_buffer = true;
- }
-
- if(write_next_buffer)
- {
- (void) ACE::send(
- this->notification_pipe_.write_handle(),
- (char *)*next, sizeof(ACE_Notification_Buffer));
- }
- }
+ }
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, sizeof(ACE_Notification_Buffer));
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h
index 9ec4e642210..bb12154ca8c 100644
--- a/ace/Select_Reactor_Base.h
+++ b/ace/Select_Reactor_Base.h
@@ -27,7 +27,7 @@
#include "ace/Reactor_Impl.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#ifdef ACE_WIN32
@@ -253,24 +253,18 @@ protected:
int max_notify_iterations_;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // = This configuration queues up notifications in separate buffers that
- // are in user-space, rather than stored in a pipe in the OS
- // kernel. The kernel-level notifications are used only to trigger
- // the Reactor to check its notification queue. This enables many
- // more notifications to be stored than would otherwise be the case.
-
- /// Keeps track of allocated arrays of type
- /// ACE_Notification_Buffer.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
-
- /// Keeps track of all pending notifications.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
-
- /// Keeps track of all free buffers.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
+ /**
+ * @brief A user-space queue to store the notifications.
+ *
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
+ */
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ace/ace.mpc b/ace/ace.mpc
index 0fc52bf9afe..16416896159 100644
--- a/ace/ace.mpc
+++ b/ace/ace.mpc
@@ -125,6 +125,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca
Mutex.cpp
Netlink_Addr.cpp
Notification_Strategy.cpp
+ Notification_Queue.cpp
Obchunk.cpp
Object_Manager.cpp
Object_Manager_Base.cpp
diff --git a/tests/Bug_2815_Regression_Test.cpp b/tests/Bug_2815_Regression_Test.cpp
index 654846b71e3..e6c7765f532 100644
--- a/tests/Bug_2815_Regression_Test.cpp
+++ b/tests/Bug_2815_Regression_Test.cpp
@@ -105,6 +105,19 @@ private:
void notify_handlers(
int nhandlers, One_Shot_Handler ** handlers);
+ /**
+ * @brief Helpful for debugging
+ *
+ * The number of notifications received, skipped and sent are
+ * subject to simple invariants. During debugging any violation of
+ * those invariants indicates a problem in the application or the
+ * Reactor.
+ */
+ void check_notification_invariants();
+
+ /// A good place to set break points.
+ void invariant_failed();
+
private:
/**
* @brief The reactor used in this test
@@ -266,12 +279,14 @@ void
Driver::notification_received ()
{
++notifications_recv_;
+ check_notification_invariants();
}
void
Driver::notifications_skipped (int skip_count)
{
notifications_skipped_ += skip_count;
+ check_notification_invariants();
}
ACE_Reactor *
@@ -377,6 +392,71 @@ Driver::notify_handlers(
}
}
+void Driver::
+check_notification_invariants()
+{
+ if (notifications_sent_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications sent (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_recv_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications received (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_recv_));
+ invariant_failed();
+ }
+
+ if (notifications_skipped_ < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("The number of notifications skipped (%d)")
+ ACE_TEXT(" should be positive\n"),
+ notifications_skipped_));
+ invariant_failed();
+ }
+
+ if (notifications_sent_ < notifications_recv_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications received (%d)")
+ ACE_TEXT(" vs sent (%d)\n"),
+ notifications_recv_, notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_sent_ < notifications_skipped_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications skipped (%d)")
+ ACE_TEXT(" vs sent (%d)\n"),
+ notifications_skipped_, notifications_sent_));
+ invariant_failed();
+ }
+
+ if (notifications_skipped_ + notifications_recv_ > notifications_sent_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("Too many notifications skipped (%d)")
+ ACE_TEXT(" and received (%d) vs sent (%d)\n"),
+ notifications_skipped_, notifications_recv_,
+ notifications_sent_));
+ invariant_failed();
+ }
+}
+
+void Driver::
+invariant_failed()
+{
+ // Just a good place to set a breakpoint
+}
+
// ============================================
One_Shot_Handler::One_Shot_Handler(
diff --git a/tests/Notification_Queue_Unit_Test.cpp b/tests/Notification_Queue_Unit_Test.cpp
new file mode 100644
index 00000000000..cc138e3fc03
--- /dev/null
+++ b/tests/Notification_Queue_Unit_Test.cpp
@@ -0,0 +1,256 @@
+/**
+ * @file Notification_Queue_Unit_Test.cpp
+ *
+ * $Id$
+ *
+ * A unit test for the ACE_Notification_Queue class.
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ *
+ */
+
+#include "test_config.h"
+#include "ace/Notification_Queue.h"
+
+ACE_RCSID(tests,
+ Notification_Queue_Unit_Test, "$Id$")
+
+#define TEST_LIST \
+ ACTION(null_test) \
+ ACTION(pop_returns_element_pushed) \
+ ACTION(purge_empty_queue) \
+ ACTION(purge_with_no_matches) \
+ ACTION(purge_with_single_match) \
+ ACTION(purge_with_multiple_matches) \
+
+// Declare all the tests
+#define ACTION(TEST_NAME) void TEST_NAME (char const * test_name);
+TEST_LIST
+#undef ACTION
+
+int
+run_main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("Notification_Queue_Unit_Test"));
+
+ // Call all the tests
+#define ACTION(TEST_NAME) TEST_NAME (#TEST_NAME);
+TEST_LIST
+#undef ACTION
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+// There are far more elegant ways to do this. Ideally one would use
+// an existing framework (Boost.Test, TUT, CppTest). But this will
+// do for our purposes
+#define TEST_INTEGER_EQUAL(X, Y, MSG) \
+ do { \
+ if ((X) == (Y)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("%C in (%C %N:%l) %C (%d) != %C (%d)\n"), \
+ ACE_TEXT(MSG), test_name, \
+ ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \
+ } while(0)
+#define TEST_INTEGER_NOT_EQUAL(X, Y, MSG) \
+ do { \
+ if ((X) != (Y)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("%C in (%C %N:%l) %C (%d) == %C (%d)\n"), \
+ ACE_TEXT(MSG), test_name, \
+ ACE_TEXT(#X), (X), ACE_TEXT(#Y), (Y) )); \
+ } while(0)
+#define TEST_ASSERT(PREDICATE, MESSAGE) \
+ do { \
+ if ((PREDICATE)) break; \
+ ACE_ERROR ((LM_ERROR, \
+ ACE_TEXT("Assertion failure in (%C %N:%l) %C %C\n"), \
+ test_name, ACE_TEXT(#PREDICATE), MESSAGE )); \
+ } while(0)
+
+
+void null_test(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ TEST_INTEGER_EQUAL(0, 0, "Test framework failure");
+ TEST_INTEGER_NOT_EQUAL(1, 0, "Test framework failure");
+ TEST_ASSERT(true, ACE_TEXT("True is still true"));
+}
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler(int event_handler_id)
+ : ACE_Event_Handler()
+ , id (event_handler_id)
+ {
+ }
+
+ int id;
+};
+
+void pop_returns_element_pushed(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+ Event_Handler eh3(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK));
+ TEST_ASSERT(result == 1, "push[1] should return 1");
+
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ TEST_ASSERT(result == 0, "push[2] should return 0");
+
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh3,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ TEST_ASSERT(result == 0, "push[3] should return 0");
+
+ ACE_Notification_Buffer current;
+ bool more_messages_queued;
+ ACE_Notification_Buffer next;
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[0] should return 1");
+ TEST_ASSERT(more_messages_queued, "pop[0] should have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh1, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK,
+ "Wrong mask extracted");
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[1] should return 1");
+ TEST_ASSERT(more_messages_queued, "pop[1] should have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh2, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::WRITE_MASK,
+ "Wrong mask extracted");
+
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 1, "pop[2] should return 1");
+ TEST_ASSERT(!more_messages_queued, "pop[2] should not have more messages");
+
+ TEST_INTEGER_EQUAL(current.eh_, &eh3, "Wrong handler extracted");
+ TEST_INTEGER_EQUAL(current.mask_, ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK,
+ "Wrong mask extracted");
+
+ more_messages_queued = true;
+ result = queue.pop_next_notification(current, more_messages_queued, next);
+ TEST_ASSERT(result == 0, "pop[3] should return 0");
+ TEST_ASSERT(!more_messages_queued, "pop[3] should not have more messages");
+}
+
+void purge_empty_queue(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+
+ int result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::READ_MASK);
+ TEST_ASSERT(result == 0, "purge of empty queue should return 0");
+}
+
+void purge_with_no_matches(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::READ_MASK);
+ TEST_ASSERT(result == 0, "purge of eh2 should return 0");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_ASSERT(result == 0, "purge of eh1/WRITE should return 0");
+}
+
+void purge_with_single_match(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::READ_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::READ_MASK);
+ TEST_INTEGER_EQUAL(result, 1, "purge of eh2/READ should return 1");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::READ_MASK);
+ TEST_INTEGER_EQUAL(result, 0, "purge of eh1/READ should return 0");
+}
+
+void purge_with_multiple_matches(char const * test_name)
+{
+ ACE_Notification_Queue queue;
+
+ Event_Handler eh1(1);
+ Event_Handler eh2(2);
+
+ int result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::READ_MASK
+ |ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh1,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::READ_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+ result = queue.push_new_notification(
+ ACE_Notification_Buffer(&eh2,
+ ACE_Event_Handler::WRITE_MASK));
+
+ result = queue.purge_pending_notifications(&eh2,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_INTEGER_EQUAL(result, 3, "purge of eh2/WRITE should return 3");
+
+ result = queue.purge_pending_notifications(&eh1,
+ ACE_Event_Handler::WRITE_MASK);
+ TEST_INTEGER_EQUAL(result, 1, "purge of eh1/WRITE should return 1");
+}
+
diff --git a/tests/run_test.lst b/tests/run_test.lst
index 4568e8ba530..68f7f78a2e0 100644
--- a/tests/run_test.lst
+++ b/tests/run_test.lst
@@ -96,6 +96,7 @@ Naming_Test: !LynxOS !Unicos !VxWorks !nsk !ACE_FOR_TAO
Network_Adapters_Test: !DISABLE_ToFix_LynxOS_PPC !DISABLE_ToFix_LynxOS_x86
New_Fail_Test: ALL !DISABLED
NonBlocking_Conn_Test
+Notification_Queue_Unit_Test
Notify_Performance_Test: !nsk !ACE_FOR_TAO
OS_Test
Object_Manager_Test
diff --git a/tests/tests.mpc b/tests/tests.mpc
index a42b0749f53..9723999aec4 100644
--- a/tests/tests.mpc
+++ b/tests/tests.mpc
@@ -631,6 +631,13 @@ project(New Fail Test) : acetest {
}
}
+project(Notification Queue Unit Test) : acetest {
+ exename = Notification_Queue_Unit_Test
+ Source_Files {
+ Notification_Queue_Unit_Test.cpp
+ }
+}
+
project(Notify Performance Test) : acetest {
avoids += ace_for_tao
exename = Notify_Performance_Test