summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-19 20:33:14 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-02-19 20:33:14 +0000
commit3cfa06b09cbcf7f4baccaa6860183686e8ec6ae6 (patch)
tree168936e14f2d5efe1d10af0188e65fdd1c8cb0df
parentb3df323593d6110049d22bb03f6bcdae71ea4c4e (diff)
downloadATCD-3cfa06b09cbcf7f4baccaa6860183686e8ec6ae6.tar.gz
Mon Feb 19 20:32:34 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
-rw-r--r--ChangeLog14
-rw-r--r--ace/Intrusive_List.h3
-rw-r--r--ace/Intrusive_List.inl9
-rw-r--r--ace/Notification_Queue.cpp159
-rw-r--r--ace/Notification_Queue.h67
-rw-r--r--ace/Notification_Queue.inl47
6 files changed, 183 insertions, 116 deletions
diff --git a/ChangeLog b/ChangeLog
index 1edd1d91b06..54afef3c33e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,17 @@
+Mon Feb 19 20:32:34 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
+
+ * ace/Notification_Queue.h:
+ * ace/Notification_Queue.inl:
+ * ace/Notification_Queue.cpp:
+ Re-factored to use ACE_Intrusive_List in the implementation.
+ This eliminates memory allocations during additions and removal
+ of elements. Also, many of the operations cannot fail, so the
+ code became smaller.
+
+ * ace/Intrusive_List.h:
+ * ace/Intrusive_List.inl:
+ Add a swap() member function.
+
Mon Feb 19 04:49:35 UTC 2007 coryan <coryan@atdesk.com>
* ace/Notification_Queue.h:
diff --git a/ace/Intrusive_List.h b/ace/Intrusive_List.h
index 82789110cfa..350899ef04c 100644
--- a/ace/Intrusive_List.h
+++ b/ace/Intrusive_List.h
@@ -99,6 +99,9 @@ public:
*/
void remove (T *node);
+ /// Swap two lists
+ void swap(ACE_Intrusive_List<T> & rhs);
+
private:
/// Remove a element from the list
/**
diff --git a/ace/Intrusive_List.inl b/ace/Intrusive_List.inl
index cc3ffc70109..b386b18b1f7 100644
--- a/ace/Intrusive_List.inl
+++ b/ace/Intrusive_List.inl
@@ -2,6 +2,8 @@
//
// $Id$
+#include "ace/Swap.h"
+
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
template<class T> ACE_INLINE int
@@ -28,4 +30,11 @@ ACE_Intrusive_List<T>::tail (void) const
return this->tail_;
}
+template<class T> ACE_INLINE void
+ACE_Intrusive_List<T>::swap(ACE_Intrusive_List<T> & rhs)
+{
+ ACE_Swap<T*>::swap(head_, rhs.head_);
+ ACE_Swap<T*>::swap(tail_, rhs.tail_);
+}
+
ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ace/Notification_Queue.cpp b/ace/Notification_Queue.cpp
index f08d34aef46..ee700a81ff7 100644
--- a/ace/Notification_Queue.cpp
+++ b/ace/Notification_Queue.cpp
@@ -2,6 +2,10 @@
#include "ace/Notification_Queue.h"
+#if !defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
ACE_Notification_Queue::
ACE_Notification_Queue()
: ACE_Copy_Disabled()
@@ -38,9 +42,9 @@ reset()
ACE_TRACE ("ACE_Notification_Queue::reset");
// Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b = 0;
+ ACE_Notification_Queue_Node **b = 0;
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
alloc_iter.next (b) != 0;
alloc_iter.advance ())
{
@@ -49,8 +53,10 @@ reset()
}
this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+
+ // Swap with an empty list to reset the contents
+ Buffer_List().swap(notify_queue_);
+ Buffer_List().swap(free_queue_);
}
int ACE_Notification_Queue::
@@ -58,10 +64,10 @@ allocate_more_buffers()
{
ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
- ACE_Notification_Buffer *temp = 0;
+ ACE_Notification_Queue_Node *temp = 0;
ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
-1);
if (this->alloc_queue_.enqueue_head (temp) == -1)
@@ -71,8 +77,9 @@ allocate_more_buffers()
}
for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
+ {
+ free_queue_.push_front(temp + i);
+ }
return 0;
}
@@ -89,82 +96,35 @@ purge_pending_notifications(ACE_Event_Handler * eh,
if (this->notify_queue_.is_empty ())
return 0;
- ACE_Notification_Buffer *temp = 0;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
+ Buffer_List local_queue;
- size_t queue_size = this->notify_queue_.size ();
int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
+ while(!notify_queue_.is_empty())
{
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- ACE_Event_Handler *event_handler = temp->eh_;
- event_handler->remove_reference ();
-
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a
- // particular handler), and it matches the specified handler
- // (or purging all), then apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- {
- ACE_CLR_BITS(temp->mask_, mask);
- }
-
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
+ ACE_Notification_Queue_Node * node = notify_queue_.pop_front();
- if (this->notify_queue_.size ())
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("Notification queue should be ")
- ACE_LIB_TEXT ("empty after purging")),
- -1);
+ if (!node->matches_for_purging(eh))
+ {
+ // Easy case, save the node and continue;
+ local_queue.push_back(node);
+ continue;
+ }
+
+ if (!node->mask_disables_all_notifications(mask))
+ {
+ node->clear_mask(mask);
+ local_queue.push_back(node);
+ continue;
+ }
+
+ free_queue_.push_back(node);
+ ACE_Event_Handler *event_handler = node->get().eh_;
+ event_handler->remove_reference ();
+ ++number_purged;
}
// now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
+ local_queue.swap(notify_queue_);
return number_purged;
}
@@ -183,23 +143,21 @@ push_new_notification(
if (this->notify_queue_.is_empty ())
notification_required = true;
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
+ if (free_queue_.is_empty())
{
if (allocate_more_buffers() == -1)
{
return -1;
}
-
- free_queue_.dequeue_head(temp);
}
- ACE_ASSERT (temp != 0);
- *temp = buffer;
+ ACE_Notification_Queue_Node * node =
+ free_queue_.pop_front();
+
+ ACE_ASSERT (node != 0);
+ node->set(buffer);
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ notify_queue_.push_back(node);
if (!notification_required)
{
@@ -226,36 +184,17 @@ ACE_Notification_Queue::pop_next_notification(
return 0;
}
- ACE_Notification_Buffer *temp = 0;
-
- if (notify_queue_.dequeue_head (temp) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- }
-
- current = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
+ ACE_Notification_Queue_Node * node =
+ notify_queue_.pop_front();
- ACE_Notification_Buffer ** n = 0;
+ current = node->get();
+ free_queue_.push_back(node);
if(!this->notify_queue_.is_empty())
{
- // The queue is not empty, need to queue another message.
- this->notify_queue_.get (n, 0);
more_messages_queued = true;
- next = **n;
+ next = notify_queue_.head()->get();
}
return 1;
}
-
-
diff --git a/ace/Notification_Queue.h b/ace/Notification_Queue.h
index a5c3f534023..a0348822060 100644
--- a/ace/Notification_Queue.h
+++ b/ace/Notification_Queue.h
@@ -3,8 +3,6 @@
#include /**/ "ace/pre.h"
-#include "ace/Event_Handler.h"
-
/**
* @file Notification_Queue.h
*
@@ -13,8 +11,57 @@
* @author Carlos O'Ryan <coryan@atdesk.com>
*/
#include "ace/Copy_Disabled.h"
+#include "ace/Event_Handler.h"
+#include "ace/Intrusive_List.h"
+#include "ace/Intrusive_List_Node.h"
#include "ace/Unbounded_Queue.h"
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/**
+ * @class ACE_Notification_Queue_Node
+ *
+ * @brief Helper class
+ */
+class ACE_Export ACE_Notification_Queue_Node
+ : public ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>
+{
+public:
+ /**
+ * @brief Constructor
+ */
+ ACE_Notification_Queue_Node();
+
+ /**
+ * @brief Modifier change the contained buffer
+ */
+ void set(ACE_Notification_Buffer const & rhs);
+
+ /**
+ * @brief Accessor, fetch the contained buffer
+ */
+ ACE_Notification_Buffer const & get() const;
+
+ /**
+ * @brief Checks if the event handler matches the purge condition
+ */
+ bool matches_for_purging(ACE_Event_Handler * eh) const;
+
+ /**
+ * @brief Return true if clearing the mask would leave no
+ * notifications to deliver.
+ */
+ bool mask_disables_all_notifications(ACE_Reactor_Mask mask);
+
+ /**
+ * @brief Clear the notifications specified by @c mask
+ */
+ void clear_mask(ACE_Reactor_Mask mask);
+
+private:
+ ACE_Notification_Buffer contents_;
+};
+
/**
* @class ACE_Notification_Queue
*
@@ -30,7 +77,7 @@
*
* This code was refactored from Select_Reactor_Base.
*/
-class ACE_Notification_Queue : private ACE_Copy_Disabled
+class ACE_Export ACE_Notification_Queue : private ACE_Copy_Disabled
{
public:
ACE_Notification_Queue();
@@ -84,18 +131,26 @@ private:
/// ACE_Notification_Buffer. The idea is to amortize allocation
/// costs by allocating multiple ACE_Notification_Buffer objects at
/// a time.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
+ ACE_Unbounded_Queue <ACE_Notification_Queue_Node*> alloc_queue_;
+
+ typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> Buffer_List;
/// Keeps track of all pending notifications.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
+ Buffer_List notify_queue_;
/// Keeps track of all free buffers.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
+ Buffer_List free_queue_;
/// Synchronization for handling of queues.
ACE_SYNCH_MUTEX notify_queue_lock_;
};
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/Notification_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
#include /**/ "ace/post.h"
#endif /* ACE_NOTIFICATION_QUEUE_H */
diff --git a/ace/Notification_Queue.inl b/ace/Notification_Queue.inl
new file mode 100644
index 00000000000..b7ef09a820f
--- /dev/null
+++ b/ace/Notification_Queue.inl
@@ -0,0 +1,47 @@
+// $Id$
+
+ACE_INLINE ACE_Notification_Queue_Node::
+ACE_Notification_Queue_Node()
+ : ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>()
+ , contents_(0, 0)
+{
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+set(ACE_Notification_Buffer const & rhs)
+{
+ contents_ = rhs;
+}
+
+ACE_INLINE ACE_Notification_Buffer const &
+ACE_Notification_Queue_Node::
+get() const
+{
+ return contents_;
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+matches_for_purging(ACE_Event_Handler * eh) const
+{
+ return (0 != get().eh_) && (0 == eh || eh == get().eh_);
+}
+
+ACE_INLINE bool
+ACE_Notification_Queue_Node::
+mask_disables_all_notifications(ACE_Reactor_Mask mask)
+{
+ // the existing notification mask is left with nothing when applying
+ // the mask
+ return ACE_BIT_DISABLED (get().mask_, ~mask);
+}
+
+ACE_INLINE void
+ACE_Notification_Queue_Node::
+clear_mask(ACE_Reactor_Mask mask)
+{
+ ACE_CLR_BITS(contents_.mask_, mask);
+}
+
+