summaryrefslogtreecommitdiff
path: root/ACE/ace
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-22 13:45:54 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-22 13:45:54 +0000
commite268cb0284efafc7790eac26c86ad9677c90107e (patch)
tree2128a629e7a519bc526326d980ddd72f90aed92d /ACE/ace
parent570eb5e6e320c5dcf0c3ead9f02c44edc1a760b9 (diff)
downloadATCD-e268cb0284efafc7790eac26c86ad9677c90107e.tar.gz
Thu Feb 22 13:03:01 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
Diffstat (limited to 'ACE/ace')
-rw-r--r--ACE/ace/Dev_Poll_Reactor.cpp216
-rw-r--r--ACE/ace/Dev_Poll_Reactor.h33
-rw-r--r--ACE/ace/Intrusive_List.cpp8
-rw-r--r--ACE/ace/Intrusive_List.h9
-rw-r--r--ACE/ace/Intrusive_List.inl9
-rw-r--r--ACE/ace/Notification_Queue.cpp210
-rw-r--r--ACE/ace/Notification_Queue.h156
-rw-r--r--ACE/ace/Notification_Queue.inl47
-rw-r--r--ACE/ace/Select_Reactor_Base.cpp226
-rw-r--r--ACE/ace/Select_Reactor_Base.h32
-rw-r--r--ACE/ace/ace.mpc1
-rw-r--r--ACE/ace/ace_for_tao.mpc1
12 files changed, 533 insertions, 415 deletions
diff --git a/ACE/ace/Dev_Poll_Reactor.cpp b/ACE/ace/Dev_Poll_Reactor.cpp
index 8bf63534735..abf3a432c31 100644
--- a/ACE/ace/Dev_Poll_Reactor.cpp
+++ b/ACE/ace/Dev_Poll_Reactor.cpp
@@ -50,10 +50,7 @@ ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
, notification_pipe_ ()
, max_notify_iterations_ (-1)
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- , alloc_queue_ ()
- , notify_queue_ ()
- , free_queue_ ()
- , notify_queue_lock_ ()
+ , notification_queue_ ()
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
{
}
@@ -85,18 +82,10 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- return -1;
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
if (ACE::set_flags (this->notification_pipe_.write_handle (),
ACE_NONBLOCK) == -1)
@@ -120,20 +109,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -154,42 +130,22 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Notification_Buffer buffer (eh, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // Locate a free buffer in the queue. Enlarge the queue if needed.
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- return -1;
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_ASSERT (temp != 0);
- *temp = buffer;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- ACE_Dev_Poll_Handler_Guard eh_guard (eh);
+ if (notification_required == 0)
+ {
+ eh_guard.release ();
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ return 0;
+ }
// Now pop the pipe to force the callback for dispatching when ready. If
// the send fails due to a full pipe, don't fail - assume the already-sent
@@ -239,26 +195,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
// also serves to slow down event dispatching particularly with this
// ACE_Dev_Poll_Reactor.
-#if 0
- ACE_HANDLE read_handle =
- this->notification_pipe_.read_handle ();
-
- // Note that we do not check if the handle has received any events.
- // Instead a non-blocking "speculative" read is performed. If the
- // read returns with errno == EWOULDBLOCK then no notifications are
- // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
- // for details.
- if (read_handle != ACE_INVALID_HANDLE)
- {
- --number_of_active_handles;
-
- return this->handle_input (read_handle);
- }
- else
- return 0;
-#else
ACE_NOTSUP_RETURN (-1);
-#endif /* 0 */
}
int
@@ -286,26 +223,31 @@ ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
char b;
read_p = &b;
to_read = 1;
- ACE_Notification_Buffer *temp;
- // New scope to release the guard before trying the recv().
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+ // Before reading the byte, pop a message from the queue and queue a
+ // new message unless the queue is now empty. The protocol is to
+ // keep a byte in the pipe as long as the queue is not empty.
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
- if (notify_queue_.is_empty ())
+ int result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
+
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")),
- -1);
- buffer = *temp;
- have_one = true;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("read_notify_pipe: enqueue_head")));
- }
+ }
+
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, 1 /* one byte is enough */);
+ }
#else
to_read = sizeof buffer;
@@ -475,83 +417,7 @@ ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular
- // handler), and it matches the specified handler (or purging
- // all), and applying the mask would totally eliminate the
- // notification, then release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
- // notification mask
- // is left with
- // nothing when
- // applying the mask.
- {
- if (this->free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a
- // particular handler), and it matches the specified handler
- // (or purging all), then apply the mask.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- {
- // Should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // Now put it back in the notify queue.
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
diff --git a/ACE/ace/Dev_Poll_Reactor.h b/ACE/ace/Dev_Poll_Reactor.h
index f8c5b58f1d2..f82283a70b7 100644
--- a/ACE/ace/Dev_Poll_Reactor.h
+++ b/ACE/ace/Dev_Poll_Reactor.h
@@ -53,7 +53,7 @@
#include "ace/Token.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#if defined (ACE_HAS_DEV_POLL)
@@ -283,30 +283,17 @@ protected:
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
/**
- * @name Reactor Notification Attributes
+ * @brief A user-space queue to store the notifications.
*
- * This configuration queues up notifications in separate buffers
- * that are in user-space, rather than stored in a pipe in the OS
- * kernel. The kernel-level notifications are used only to trigger
- * the Reactor to check its notification queue. This enables many
- * more notifications to be stored than would otherwise be the
- * case.
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
*/
- //@{
-
- /// ACE_Notification_Buffers are allocated in chunks. Each time a chunk is
- /// allocated, the chunk is added to alloc_queue_ so it can be freed later.
- /// Each individual ACE_Notification_Buffer is added to the free_queue_
- /// when it's free. Those in use for queued notifications are placed on the
- /// notify_queue_.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
-
- //@}
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ACE/ace/Intrusive_List.cpp b/ACE/ace/Intrusive_List.cpp
index 4a374c9b045..c0006792ce4 100644
--- a/ACE/ace/Intrusive_List.cpp
+++ b/ACE/ace/Intrusive_List.cpp
@@ -69,7 +69,7 @@ ACE_Intrusive_List<T>::pop_front (void)
T *node = this->head_;
if (node == 0)
return 0;
- this->remove_i (node);
+ this->unsafe_remove (node);
return node;
}
@@ -79,7 +79,7 @@ ACE_Intrusive_List<T>::pop_back (void)
T *node = this->tail_;
if (node == 0)
return 0;
- this->remove_i (node);
+ this->unsafe_remove (node);
return node;
}
@@ -90,14 +90,14 @@ ACE_Intrusive_List<T>::remove (T *node)
{
if (node == i)
{
- this->remove_i (node);
+ this->unsafe_remove (node);
return;
}
}
}
template<class T> void
-ACE_Intrusive_List<T>::remove_i (T *node)
+ACE_Intrusive_List<T>::unsafe_remove (T *node)
{
if (node->prev () != 0)
node->prev ()->next (node->next ());
diff --git a/ACE/ace/Intrusive_List.h b/ACE/ace/Intrusive_List.h
index 82789110cfa..59d0c9054a8 100644
--- a/ACE/ace/Intrusive_List.h
+++ b/ACE/ace/Intrusive_List.h
@@ -99,14 +99,17 @@ public:
*/
void remove (T *node);
-private:
- /// Remove a element from the list
+ /// Swap two lists
+ void swap(ACE_Intrusive_List<T> & rhs);
+
+ /// Remove a element from the list without checking
/**
* No attempts are performed to check if T* really belongs to the
* list. The effects of removing an invalid element are unspecified
*/
- void remove_i (T *node);
+ void unsafe_remove (T *node);
+private:
/** @name Disallow copying
*
*/
diff --git a/ACE/ace/Intrusive_List.inl b/ACE/ace/Intrusive_List.inl
index cc3ffc70109..f76370917e9 100644
--- a/ACE/ace/Intrusive_List.inl
+++ b/ACE/ace/Intrusive_List.inl
@@ -2,6 +2,8 @@
//
// $Id$
+#include <algorithm>
+
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
template<class T> ACE_INLINE int
@@ -28,4 +30,11 @@ ACE_Intrusive_List<T>::tail (void) const
return this->tail_;
}
+template<class T> ACE_INLINE void
+ACE_Intrusive_List<T>::swap(ACE_Intrusive_List<T> & rhs)
+{
+ std::swap(head_, rhs.head_);
+ std::swap(tail_, rhs.tail_);
+}
+
ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/Notification_Queue.cpp b/ACE/ace/Notification_Queue.cpp
new file mode 100644
index 00000000000..0b2ca59e831
--- /dev/null
+++ b/ACE/ace/Notification_Queue.cpp
@@ -0,0 +1,210 @@
+// $Id$
+
+#include "ace/Notification_Queue.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_Notification_Queue::
+ACE_Notification_Queue()
+ : ACE_Copy_Disabled()
+ , alloc_queue_()
+ , notify_queue_()
+ , free_queue_()
+{
+}
+
+ACE_Notification_Queue::
+~ACE_Notification_Queue()
+{
+ reset();
+}
+
+int
+ACE_Notification_Queue::
+open()
+{
+ ACE_TRACE ("ACE_Notification_Queue::open");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (!this->free_queue_.is_empty ())
+ return 0;
+
+ return allocate_more_buffers();
+}
+
+void
+ACE_Notification_Queue::
+reset()
+{
+ ACE_TRACE ("ACE_Notification_Queue::reset");
+
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Queue_Node **b = 0;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ {
+ delete [] *b;
+ *b = 0;
+ }
+
+ this->alloc_queue_.reset ();
+
+ // Swap with an empty list to reset the contents
+ Buffer_List().swap(notify_queue_);
+ Buffer_List().swap(free_queue_);
+}
+
+int ACE_Notification_Queue::
+allocate_more_buffers()
+{
+ ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
+
+ ACE_Notification_Queue_Node *temp = 0;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_queue_.enqueue_head (temp) == -1)
+ {
+ delete [] temp;
+ return -1;
+ }
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
+ {
+ free_queue_.push_front(temp + i);
+ }
+
+ return 0;
+}
+
+int
+ACE_Notification_Queue::
+purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (this->notify_queue_.is_empty ())
+ return 0;
+
+ int number_purged = 0;
+ ACE_Notification_Queue_Node * node = notify_queue_.head();
+ while(node != 0)
+ {
+ if (!node->matches_for_purging(eh))
+ {
+ // Easy case, skip to the next node
+ node = node->next();
+ continue;
+ }
+
+ if (!node->mask_disables_all_notifications(mask))
+ {
+ // ... another easy case, skip this node too, but clear the
+ // mask first ...
+ node->clear_mask(mask);
+ node = node->next();
+ continue;
+ }
+
+ // ... this is the more complicated case, we want to remove the
+ // node from the notify_queue_ list. First save the next node
+ // on the list:
+ ACE_Notification_Queue_Node * next = node->next();
+
+ // ... then remove it ...
+ notify_queue_.unsafe_remove(node);
+ ++number_purged;
+
+ // ... release resources ...
+ ACE_Event_Handler *event_handler = node->get().eh_;
+ event_handler->remove_reference ();
+
+ // ... now this is a free node ...
+ free_queue_.push_front(node);
+
+ // ... go to the next node, if there is one ...
+ node = next;
+ }
+
+ return number_purged;
+}
+
+int ACE_Notification_Queue::
+push_new_notification(
+ ACE_Notification_Buffer const & buffer)
+{
+ ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
+
+ bool notification_required = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+ if (this->notify_queue_.is_empty ())
+ notification_required = true;
+
+ if (free_queue_.is_empty())
+ {
+ if (allocate_more_buffers() == -1)
+ {
+ return -1;
+ }
+ }
+
+ ACE_Notification_Queue_Node * node =
+ free_queue_.pop_front();
+
+ ACE_ASSERT (node != 0);
+ node->set(buffer);
+
+ notify_queue_.push_back(node);
+
+ if (!notification_required)
+ {
+ return 0;
+ }
+
+ return 1;
+}
+
+int
+ACE_Notification_Queue::pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next)
+{
+ ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
+
+ more_messages_queued = false;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (notify_queue_.is_empty ())
+ {
+ return 0;
+ }
+
+ ACE_Notification_Queue_Node * node =
+ notify_queue_.pop_front();
+
+ current = node->get();
+ free_queue_.push_front(node);
+
+ if(!this->notify_queue_.is_empty())
+ {
+ more_messages_queued = true;
+ next = notify_queue_.head()->get();
+ }
+
+ return 1;
+}
diff --git a/ACE/ace/Notification_Queue.h b/ACE/ace/Notification_Queue.h
new file mode 100644
index 00000000000..a0348822060
--- /dev/null
+++ b/ACE/ace/Notification_Queue.h
@@ -0,0 +1,156 @@
+#ifndef ACE_NOTIFICATION_QUEUE_H
+#define ACE_NOTIFICATION_QUEUE_H
+
+#include /**/ "ace/pre.h"
+
+/**
+ * @file Notification_Queue.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@atdesk.com>
+ */
+#include "ace/Copy_Disabled.h"
+#include "ace/Event_Handler.h"
+#include "ace/Intrusive_List.h"
+#include "ace/Intrusive_List_Node.h"
+#include "ace/Unbounded_Queue.h"
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/**
+ * @class ACE_Notification_Queue_Node
+ *
+ * @brief Helper class
+ */
+class ACE_Export ACE_Notification_Queue_Node
+ : public ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>
+{
+public:
+ /**
+ * @brief Constructor
+ */
+ ACE_Notification_Queue_Node();
+
+ /**
+ * @brief Modifier change the contained buffer
+ */
+ void set(ACE_Notification_Buffer const & rhs);
+
+ /**
+ * @brief Accessor, fetch the contained buffer
+ */
+ ACE_Notification_Buffer const & get() const;
+
+ /**
+ * @brief Checks if the event handler matches the purge condition
+ */
+ bool matches_for_purging(ACE_Event_Handler * eh) const;
+
+ /**
+ * @brief Return true if clearing the mask would leave no
+ * notifications to deliver.
+ */
+ bool mask_disables_all_notifications(ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Clear the notifications specified by @c mask
+ */
+ void clear_mask(ACE_Reactor_Mask mask);
+
+private:
+ ACE_Notification_Buffer contents_;
+};
+
+/**
+ * @class ACE_Notification_Queue
+ *
+ * @brief Implements a user-space queue to send Reactor notifications.
+ *
+ * The ACE_Reactor uses a pipe to send wake up the thread running the
+ * event loop from other threads. This pipe can be limited in size
+ * under some operating systems. For some applications, this limit
+ * presents a problem. A user-space notification queue is used to
+ * overcome those limitations. The queue tries to use as few
+ * resources on the pipe as possible, while keeping all the data in
+ * user space.
+ *
+ * This code was refactored from Select_Reactor_Base.
+ */
+class ACE_Export ACE_Notification_Queue : private ACE_Copy_Disabled
+{
+public:
+ ACE_Notification_Queue();
+ ~ACE_Notification_Queue();
+
+ /**
+ * @brief Pre-allocate resources in the queue
+ */
+ int open();
+
+ /**
+ * @brief Release all resources in the queue
+ */
+ void reset();
+
+ /**
+ * @brief Remove all elements in the queue matching @c eh and @c mask
+ *
+ * I suggest reading the documentation in ACE_Reactor to find a more
+ * detailed description. This is just a helper function.
+ */
+ int purge_pending_notifications(ACE_Event_Handler * eh,
+ ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Add a new notification to the queue
+ *
+ * @return -1 on failure, 1 if a new message should be sent through
+ * the pipe and 0 otherwise.
+ */
+ int push_new_notification(ACE_Notification_Buffer const & buffer);
+
+ /**
+ * @brief Extract the next notification from the queue
+ *
+ * @return -1 on failure, 1 if a message was popped, 0 otherwise
+ */
+ int pop_next_notification(
+ ACE_Notification_Buffer & current,
+ bool & more_messages_queued,
+ ACE_Notification_Buffer & next);
+
+private:
+ /**
+ * @brief Allocate more memory for the queue
+ */
+ int allocate_more_buffers();
+
+private:
+ /// Keeps track of allocated arrays of type
+ /// ACE_Notification_Buffer. The idea is to amortize allocation
+ /// costs by allocating multiple ACE_Notification_Buffer objects at
+ /// a time.
+ ACE_Unbounded_Queue <ACE_Notification_Queue_Node*> alloc_queue_;
+
+ typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> Buffer_List;
+
+ /// Keeps track of all pending notifications.
+ Buffer_List notify_queue_;
+
+ /// Keeps track of all free buffers.
+ Buffer_List free_queue_;
+
+ /// Synchronization for handling of queues.
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+};
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_NOTIFICATION_QUEUE_H */
diff --git a/ACE/ace/Notification_Queue.inl b/ACE/ace/Notification_Queue.inl
new file mode 100644
index 00000000000..b7ef09a820f
--- /dev/null
+++ b/ACE/ace/Notification_Queue.inl
@@ -0,0 +1,47 @@
+// $Id$
+
+ACE_INLINE ACE_Notification_Queue_Node::
+ACE_Notification_Queue_Node()
+ : ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>()
+ , contents_(0, 0)
+{
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+set(ACE_Notification_Buffer const & rhs)
+{
+ contents_ = rhs;
+}
+
+ACE_INLINE ACE_Notification_Buffer const &
+ACE_Notification_Queue_Node::
+get() const
+{
+ return contents_;
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+matches_for_purging(ACE_Event_Handler * eh) const
+{
+ return (0 != get().eh_) && (0 == eh || eh == get().eh_);
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+mask_disables_all_notifications(ACE_Reactor_Mask mask)
+{
+ // the existing notification mask is left with nothing when applying
+ // the mask
+ return ACE_BIT_DISABLED (get().mask_, ~mask);
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+clear_mask(ACE_Reactor_Mask mask)
+{
+ ACE_CLR_BITS(contents_.mask_, mask);
+}
+
+
diff --git a/ACE/ace/Select_Reactor_Base.cpp b/ACE/ace/Select_Reactor_Base.cpp
index 94fa673b84e..0306fc7007a 100644
--- a/ACE/ace/Select_Reactor_Base.cpp
+++ b/ACE/ace/Select_Reactor_Base.cpp
@@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp = 0;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- ACE_Event_Handler *event_handler = temp->eh_;
- event_handler->remove_reference ();
-
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a particularhandler),
- // and it matches the specified handler (or purging all), then
- // apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- { // should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
@@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp = 0;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- {
- delete [] temp;
- return -1;
- }
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
-
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
@@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b = 0;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Artificial scope to limit the duration of the mutex.
- {
- bool notification_required = false;
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = true;
-
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1 = 0;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
-
- ACE_ASSERT (temp != 0);
- *temp = buffer;
-
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- if (!notification_required)
- {
- // No failures.
- safe_handler.release ();
+ if (notification_required == 0)
+ {
+ // No failures, the handler is now owned by the notification queue
+ safe_handler.release ();
- return 0;
- }
+ return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
@@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
- ACE_Notification_Buffer *temp = 0;
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
+
+ result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
- if (notify_queue_.is_empty ())
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- bool write_next_buffer = false;
- ACE_Notification_Buffer ** next = 0;
-
- if(!this->notify_queue_.is_empty())
- {
- // The queue is not empty, need to queue another message.
- this->notify_queue_.get (next, 0);
- write_next_buffer = true;
- }
-
- if(write_next_buffer)
- {
- (void) ACE::send(
- this->notification_pipe_.write_handle(),
- (char *)*next, sizeof(ACE_Notification_Buffer));
- }
- }
+ }
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, sizeof(ACE_Notification_Buffer));
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
diff --git a/ACE/ace/Select_Reactor_Base.h b/ACE/ace/Select_Reactor_Base.h
index 9ec4e642210..bb12154ca8c 100644
--- a/ACE/ace/Select_Reactor_Base.h
+++ b/ACE/ace/Select_Reactor_Base.h
@@ -27,7 +27,7 @@
#include "ace/Reactor_Impl.h"
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-# include "ace/Unbounded_Queue.h"
+# include "ace/Notification_Queue.h"
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
#ifdef ACE_WIN32
@@ -253,24 +253,18 @@ protected:
int max_notify_iterations_;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // = This configuration queues up notifications in separate buffers that
- // are in user-space, rather than stored in a pipe in the OS
- // kernel. The kernel-level notifications are used only to trigger
- // the Reactor to check its notification queue. This enables many
- // more notifications to be stored than would otherwise be the case.
-
- /// Keeps track of allocated arrays of type
- /// ACE_Notification_Buffer.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
-
- /// Keeps track of all pending notifications.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
-
- /// Keeps track of all free buffers.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
+ /**
+ * @brief A user-space queue to store the notifications.
+ *
+ * The notification pipe has OS-specific size restrictions. That
+ * is, no more than a certain number of bytes may be stored in the
+ * pipe without blocking. This limit may be too small for certain
+ * applications. In this case, ACE can be configured to store all
+ * the events in user-space. The pipe is still needed to wake up
+ * the reactor thread, but only one event is sent through the pipe
+ * at a time.
+ */
+ ACE_Notification_Queue notification_queue_;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc
index 6f67e730e44..d91138a1ae9 100644
--- a/ACE/ace/ace.mpc
+++ b/ACE/ace/ace.mpc
@@ -125,6 +125,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca
Mutex.cpp
Netlink_Addr.cpp
Notification_Strategy.cpp
+ Notification_Queue.cpp
Obchunk.cpp
Object_Manager.cpp
Object_Manager_Base.cpp
diff --git a/ACE/ace/ace_for_tao.mpc b/ACE/ace/ace_for_tao.mpc
index 85798f0692f..76737dc380c 100644
--- a/ACE/ace/ace_for_tao.mpc
+++ b/ACE/ace/ace_for_tao.mpc
@@ -87,6 +87,7 @@ project(ACE_FOR_TAO) : acedefaults, install, svcconf, uuid, versioned_namespace,
MMAP_Memory_Pool.cpp
Mutex.cpp
Notification_Strategy.cpp
+ Notification_Queue.cpp
Obchunk.cpp
Object_Manager.cpp
Object_Manager_Base.cpp