diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-02-22 13:45:54 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-02-22 13:45:54 +0000 |
commit | e268cb0284efafc7790eac26c86ad9677c90107e (patch) | |
tree | 2128a629e7a519bc526326d980ddd72f90aed92d /ACE/ace | |
parent | 570eb5e6e320c5dcf0c3ead9f02c44edc1a760b9 (diff) | |
download | ATCD-e268cb0284efafc7790eac26c86ad9677c90107e.tar.gz |
Thu Feb 22 13:03:01 UTC 2007 Carlos O'Ryan <coryan@atdesk.com>
Diffstat (limited to 'ACE/ace')
-rw-r--r-- | ACE/ace/Dev_Poll_Reactor.cpp | 216 | ||||
-rw-r--r-- | ACE/ace/Dev_Poll_Reactor.h | 33 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.cpp | 8 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.h | 9 | ||||
-rw-r--r-- | ACE/ace/Intrusive_List.inl | 9 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.cpp | 210 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.h | 156 | ||||
-rw-r--r-- | ACE/ace/Notification_Queue.inl | 47 | ||||
-rw-r--r-- | ACE/ace/Select_Reactor_Base.cpp | 226 | ||||
-rw-r--r-- | ACE/ace/Select_Reactor_Base.h | 32 | ||||
-rw-r--r-- | ACE/ace/ace.mpc | 1 | ||||
-rw-r--r-- | ACE/ace/ace_for_tao.mpc | 1 |
12 files changed, 533 insertions, 415 deletions
diff --git a/ACE/ace/Dev_Poll_Reactor.cpp b/ACE/ace/Dev_Poll_Reactor.cpp index 8bf63534735..abf3a432c31 100644 --- a/ACE/ace/Dev_Poll_Reactor.cpp +++ b/ACE/ace/Dev_Poll_Reactor.cpp @@ -50,10 +50,7 @@ ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void) , notification_pipe_ () , max_notify_iterations_ (-1) #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - , alloc_queue_ () - , notify_queue_ () - , free_queue_ () - , notify_queue_lock_ () + , notification_queue_ () #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ { } @@ -85,18 +82,10 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r, #endif /* F_SETFD */ #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - return -1; - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; + if (notification_queue_.open() == -1) + { + return -1; + } if (ACE::set_flags (this->notification_pipe_.write_handle (), ACE_NONBLOCK) == -1) @@ -120,20 +109,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void) ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close"); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); + notification_queue_.reset(); #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return this->notification_pipe_.close (); @@ -154,42 +130,22 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Notification_Buffer buffer (eh, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Dev_Poll_Handler_Guard eh_guard (eh); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // Locate a free buffer in the queue. Enlarge the queue if needed. - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - return -1; - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - ++i) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } + int notification_required = + notification_queue_.push_new_notification(buffer); - ACE_ASSERT (temp != 0); - *temp = buffer; + if (notification_required == -1) + { + return -1; + } - ACE_Dev_Poll_Handler_Guard eh_guard (eh); + if (notification_required == 0) + { + eh_guard.release (); - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; + return 0; + } // Now pop the pipe to force the callback for dispatching when ready. If // the send fails due to a full pipe, don't fail - assume the already-sent @@ -239,26 +195,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notifications ( // also serves to slow down event dispatching particularly with this // ACE_Dev_Poll_Reactor. -#if 0 - ACE_HANDLE read_handle = - this->notification_pipe_.read_handle (); - - // Note that we do not check if the handle has received any events. - // Instead a non-blocking "speculative" read is performed. If the - // read returns with errno == EWOULDBLOCK then no notifications are - // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe() - // for details. - if (read_handle != ACE_INVALID_HANDLE) - { - --number_of_active_handles; - - return this->handle_input (read_handle); - } - else - return 0; -#else ACE_NOTSUP_RETURN (-1); -#endif /* 0 */ } int @@ -286,26 +223,31 @@ ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, char b; read_p = &b; to_read = 1; - ACE_Notification_Buffer *temp; - // New scope to release the guard before trying the recv(). - { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + // Before reading the byte, pop a message from the queue and queue a + // new message unless the queue is now empty. The protocol is to + // keep a byte in the pipe as long as the queue is not empty. + bool more_messages_queued = false; + ACE_Notification_Buffer next; - if (notify_queue_.is_empty ()) + int result = notification_queue_.pop_next_notification( + buffer, more_messages_queued, next); + + if (result == 0) + { return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")), - -1); - buffer = *temp; - have_one = true; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("read_notify_pipe: enqueue_head"))); - } + } + + if (result == -1) + { + return -1; + } + + if(more_messages_queued) + { + (void) ACE::send(this->notification_pipe_.write_handle(), + (char *)&next, 1 /* one byte is enough */); + } #else to_read = sizeof buffer; @@ -475,83 +417,7 @@ ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications ( #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular - // handler), and it matches the specified handler (or purging - // all), and applying the mask would totally eliminate the - // notification, then release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing - // notification mask - // is left with - // nothing when - // applying the mask. - { - if (this->free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a - // particular handler), and it matches the specified handler - // (or purging all), then apply the mask. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { - // Should be empty! - ACE_ASSERT (0); - return -1; - } - - // Now put it back in the notify queue. - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; + return notification_queue_.purge_pending_notifications(eh, mask); #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ ACE_UNUSED_ARG (eh); diff --git a/ACE/ace/Dev_Poll_Reactor.h b/ACE/ace/Dev_Poll_Reactor.h index f8c5b58f1d2..f82283a70b7 100644 --- a/ACE/ace/Dev_Poll_Reactor.h +++ b/ACE/ace/Dev_Poll_Reactor.h @@ -53,7 +53,7 @@ #include "ace/Token.h" #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -# include "ace/Unbounded_Queue.h" +# include "ace/Notification_Queue.h" #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ #if defined (ACE_HAS_DEV_POLL) @@ -283,30 +283,17 @@ protected: #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) /** - * @name Reactor Notification Attributes + * @brief A user-space queue to store the notifications. * - * This configuration queues up notifications in separate buffers - * that are in user-space, rather than stored in a pipe in the OS - * kernel. The kernel-level notifications are used only to trigger - * the Reactor to check its notification queue. This enables many - * more notifications to be stored than would otherwise be the - * case. + * The notification pipe has OS-specific size restrictions. That + * is, no more than a certain number of bytes may be stored in the + * pipe without blocking. This limit may be too small for certain + * applications. In this case, ACE can be configured to store all + * the events in user-space. The pipe is still needed to wake up + * the reactor thread, but only one event is sent through the pipe + * at a time. */ - //@{ - - /// ACE_Notification_Buffers are allocated in chunks. Each time a chunk is - /// allocated, the chunk is added to alloc_queue_ so it can be freed later. - /// Each individual ACE_Notification_Buffer is added to the free_queue_ - /// when it's free. Those in use for queued notifications are placed on the - /// notify_queue_. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; - - /// Synchronization for handling of queues. - ACE_SYNCH_MUTEX notify_queue_lock_; - - //@} + ACE_Notification_Queue notification_queue_; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; diff --git a/ACE/ace/Intrusive_List.cpp b/ACE/ace/Intrusive_List.cpp index 4a374c9b045..c0006792ce4 100644 --- a/ACE/ace/Intrusive_List.cpp +++ b/ACE/ace/Intrusive_List.cpp @@ -69,7 +69,7 @@ ACE_Intrusive_List<T>::pop_front (void) T *node = this->head_; if (node == 0) return 0; - this->remove_i (node); + this->unsafe_remove (node); return node; } @@ -79,7 +79,7 @@ ACE_Intrusive_List<T>::pop_back (void) T *node = this->tail_; if (node == 0) return 0; - this->remove_i (node); + this->unsafe_remove (node); return node; } @@ -90,14 +90,14 @@ ACE_Intrusive_List<T>::remove (T *node) { if (node == i) { - this->remove_i (node); + this->unsafe_remove (node); return; } } } template<class T> void -ACE_Intrusive_List<T>::remove_i (T *node) +ACE_Intrusive_List<T>::unsafe_remove (T *node) { if (node->prev () != 0) node->prev ()->next (node->next ()); diff --git a/ACE/ace/Intrusive_List.h b/ACE/ace/Intrusive_List.h index 82789110cfa..59d0c9054a8 100644 --- a/ACE/ace/Intrusive_List.h +++ b/ACE/ace/Intrusive_List.h @@ -99,14 +99,17 @@ public: */ void remove (T *node); -private: - /// Remove a element from the list + /// Swap two lists + void swap(ACE_Intrusive_List<T> & rhs); + + /// Remove a element from the list without checking /** * No attempts are performed to check if T* really belongs to the * list. The effects of removing an invalid element are unspecified */ - void remove_i (T *node); + void unsafe_remove (T *node); +private: /** @name Disallow copying * */ diff --git a/ACE/ace/Intrusive_List.inl b/ACE/ace/Intrusive_List.inl index cc3ffc70109..f76370917e9 100644 --- a/ACE/ace/Intrusive_List.inl +++ b/ACE/ace/Intrusive_List.inl @@ -2,6 +2,8 @@ // // $Id$ +#include <algorithm> + ACE_BEGIN_VERSIONED_NAMESPACE_DECL template<class T> ACE_INLINE int @@ -28,4 +30,11 @@ ACE_Intrusive_List<T>::tail (void) const return this->tail_; } +template<class T> ACE_INLINE void +ACE_Intrusive_List<T>::swap(ACE_Intrusive_List<T> & rhs) +{ + std::swap(head_, rhs.head_); + std::swap(tail_, rhs.tail_); +} + ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/Notification_Queue.cpp b/ACE/ace/Notification_Queue.cpp new file mode 100644 index 00000000000..0b2ca59e831 --- /dev/null +++ b/ACE/ace/Notification_Queue.cpp @@ -0,0 +1,210 @@ +// $Id$ + +#include "ace/Notification_Queue.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Notification_Queue.inl" +#endif /* __ACE_INLINE__ */ + +ACE_Notification_Queue:: +ACE_Notification_Queue() + : ACE_Copy_Disabled() + , alloc_queue_() + , notify_queue_() + , free_queue_() +{ +} + +ACE_Notification_Queue:: +~ACE_Notification_Queue() +{ + reset(); +} + +int +ACE_Notification_Queue:: +open() +{ + ACE_TRACE ("ACE_Notification_Queue::open"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (!this->free_queue_.is_empty ()) + return 0; + + return allocate_more_buffers(); +} + +void +ACE_Notification_Queue:: +reset() +{ + ACE_TRACE ("ACE_Notification_Queue::reset"); + + // Free up the dynamically allocated resources. + ACE_Notification_Queue_Node **b = 0; + + for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + { + delete [] *b; + *b = 0; + } + + this->alloc_queue_.reset (); + + // Swap with an empty list to reset the contents + Buffer_List().swap(notify_queue_); + Buffer_List().swap(free_queue_); +} + +int ACE_Notification_Queue:: +allocate_more_buffers() +{ + ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers"); + + ACE_Notification_Queue_Node *temp = 0; + + ACE_NEW_RETURN (temp, + ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp) == -1) + { + delete [] temp; + return -1; + } + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) + { + free_queue_.push_front(temp + i); + } + + return 0; +} + +int +ACE_Notification_Queue:: +purge_pending_notifications(ACE_Event_Handler * eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (this->notify_queue_.is_empty ()) + return 0; + + int number_purged = 0; + ACE_Notification_Queue_Node * node = notify_queue_.head(); + while(node != 0) + { + if (!node->matches_for_purging(eh)) + { + // Easy case, skip to the next node + node = node->next(); + continue; + } + + if (!node->mask_disables_all_notifications(mask)) + { + // ... another easy case, skip this node too, but clear the + // mask first ... + node->clear_mask(mask); + node = node->next(); + continue; + } + + // ... this is the more complicated case, we want to remove the + // node from the notify_queue_ list. First save the next node + // on the list: + ACE_Notification_Queue_Node * next = node->next(); + + // ... then remove it ... + notify_queue_.unsafe_remove(node); + ++number_purged; + + // ... release resources ... + ACE_Event_Handler *event_handler = node->get().eh_; + event_handler->remove_reference (); + + // ... now this is a free node ... + free_queue_.push_front(node); + + // ... go to the next node, if there is one ... + node = next; + } + + return number_purged; +} + +int ACE_Notification_Queue:: +push_new_notification( + ACE_Notification_Buffer const & buffer) +{ + ACE_TRACE ("ACE_Notification_Queue::push_new_notification"); + + bool notification_required = false; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + // No pending notifications. + if (this->notify_queue_.is_empty ()) + notification_required = true; + + if (free_queue_.is_empty()) + { + if (allocate_more_buffers() == -1) + { + return -1; + } + } + + ACE_Notification_Queue_Node * node = + free_queue_.pop_front(); + + ACE_ASSERT (node != 0); + node->set(buffer); + + notify_queue_.push_back(node); + + if (!notification_required) + { + return 0; + } + + return 1; +} + +int +ACE_Notification_Queue::pop_next_notification( + ACE_Notification_Buffer & current, + bool & more_messages_queued, + ACE_Notification_Buffer & next) +{ + ACE_TRACE ("ACE_Notification_Queue::pop_next_notification"); + + more_messages_queued = false; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (notify_queue_.is_empty ()) + { + return 0; + } + + ACE_Notification_Queue_Node * node = + notify_queue_.pop_front(); + + current = node->get(); + free_queue_.push_front(node); + + if(!this->notify_queue_.is_empty()) + { + more_messages_queued = true; + next = notify_queue_.head()->get(); + } + + return 1; +} diff --git a/ACE/ace/Notification_Queue.h b/ACE/ace/Notification_Queue.h new file mode 100644 index 00000000000..a0348822060 --- /dev/null +++ b/ACE/ace/Notification_Queue.h @@ -0,0 +1,156 @@ +#ifndef ACE_NOTIFICATION_QUEUE_H +#define ACE_NOTIFICATION_QUEUE_H + +#include /**/ "ace/pre.h" + +/** + * @file Notification_Queue.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@atdesk.com> + */ +#include "ace/Copy_Disabled.h" +#include "ace/Event_Handler.h" +#include "ace/Intrusive_List.h" +#include "ace/Intrusive_List_Node.h" +#include "ace/Unbounded_Queue.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class ACE_Notification_Queue_Node + * + * @brief Helper class + */ +class ACE_Export ACE_Notification_Queue_Node + : public ACE_Intrusive_List_Node<ACE_Notification_Queue_Node> +{ +public: + /** + * @brief Constructor + */ + ACE_Notification_Queue_Node(); + + /** + * @brief Modifier change the contained buffer + */ + void set(ACE_Notification_Buffer const & rhs); + + /** + * @brief Accessor, fetch the contained buffer + */ + ACE_Notification_Buffer const & get() const; + + /** + * @brief Checks if the event handler matches the purge condition + */ + bool matches_for_purging(ACE_Event_Handler * eh) const; + + /** + * @brief Return true if clearing the mask would leave no + * notifications to deliver. + */ + bool mask_disables_all_notifications(ACE_Reactor_Mask mask); + + /** + * @brief Clear the notifications specified by @c mask + */ + void clear_mask(ACE_Reactor_Mask mask); + +private: + ACE_Notification_Buffer contents_; +}; + +/** + * @class ACE_Notification_Queue + * + * @brief Implements a user-space queue to send Reactor notifications. + * + * The ACE_Reactor uses a pipe to send wake up the thread running the + * event loop from other threads. This pipe can be limited in size + * under some operating systems. For some applications, this limit + * presents a problem. A user-space notification queue is used to + * overcome those limitations. The queue tries to use as few + * resources on the pipe as possible, while keeping all the data in + * user space. + * + * This code was refactored from Select_Reactor_Base. + */ +class ACE_Export ACE_Notification_Queue : private ACE_Copy_Disabled +{ +public: + ACE_Notification_Queue(); + ~ACE_Notification_Queue(); + + /** + * @brief Pre-allocate resources in the queue + */ + int open(); + + /** + * @brief Release all resources in the queue + */ + void reset(); + + /** + * @brief Remove all elements in the queue matching @c eh and @c mask + * + * I suggest reading the documentation in ACE_Reactor to find a more + * detailed description. This is just a helper function. + */ + int purge_pending_notifications(ACE_Event_Handler * eh, + ACE_Reactor_Mask mask); + + /** + * @brief Add a new notification to the queue + * + * @return -1 on failure, 1 if a new message should be sent through + * the pipe and 0 otherwise. + */ + int push_new_notification(ACE_Notification_Buffer const & buffer); + + /** + * @brief Extract the next notification from the queue + * + * @return -1 on failure, 1 if a message was popped, 0 otherwise + */ + int pop_next_notification( + ACE_Notification_Buffer & current, + bool & more_messages_queued, + ACE_Notification_Buffer & next); + +private: + /** + * @brief Allocate more memory for the queue + */ + int allocate_more_buffers(); + +private: + /// Keeps track of allocated arrays of type + /// ACE_Notification_Buffer. The idea is to amortize allocation + /// costs by allocating multiple ACE_Notification_Buffer objects at + /// a time. + ACE_Unbounded_Queue <ACE_Notification_Queue_Node*> alloc_queue_; + + typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> Buffer_List; + + /// Keeps track of all pending notifications. + Buffer_List notify_queue_; + + /// Keeps track of all free buffers. + Buffer_List free_queue_; + + /// Synchronization for handling of queues. + ACE_SYNCH_MUTEX notify_queue_lock_; +}; + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "ace/Notification_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* ACE_NOTIFICATION_QUEUE_H */ diff --git a/ACE/ace/Notification_Queue.inl b/ACE/ace/Notification_Queue.inl new file mode 100644 index 00000000000..b7ef09a820f --- /dev/null +++ b/ACE/ace/Notification_Queue.inl @@ -0,0 +1,47 @@ +// $Id$ + +ACE_INLINE ACE_Notification_Queue_Node:: +ACE_Notification_Queue_Node() + : ACE_Intrusive_List_Node<ACE_Notification_Queue_Node>() + , contents_(0, 0) +{ +} + +ACE_INLINE void +ACE_Notification_Queue_Node:: +set(ACE_Notification_Buffer const & rhs) +{ + contents_ = rhs; +} + +ACE_INLINE ACE_Notification_Buffer const & +ACE_Notification_Queue_Node:: +get() const +{ + return contents_; +} + +ACE_INLINE bool +ACE_Notification_Queue_Node:: +matches_for_purging(ACE_Event_Handler * eh) const +{ + return (0 != get().eh_) && (0 == eh || eh == get().eh_); +} + +ACE_INLINE bool +ACE_Notification_Queue_Node:: +mask_disables_all_notifications(ACE_Reactor_Mask mask) +{ + // the existing notification mask is left with nothing when applying + // the mask + return ACE_BIT_DISABLED (get().mask_, ~mask); +} + +ACE_INLINE void +ACE_Notification_Queue_Node:: +clear_mask(ACE_Reactor_Mask mask) +{ + ACE_CLR_BITS(contents_.mask_, mask); +} + + diff --git a/ACE/ace/Select_Reactor_Base.cpp b/ACE/ace/Select_Reactor_Base.cpp index 94fa673b84e..0306fc7007a 100644 --- a/ACE/ace/Select_Reactor_Base.cpp +++ b/ACE/ace/Select_Reactor_Base.cpp @@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp = 0; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular handler), - // and it matches the specified handler (or purging all), - // and applying the mask would totally eliminate the notification, then - // release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask - // is left with nothing when - // applying the mask - { - if (-1 == this->free_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - ACE_Event_Handler *event_handler = temp->eh_; - event_handler->remove_reference (); - - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a particularhandler), - // and it matches the specified handler (or purging all), then - // apply the mask - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { // should be empty! - ACE_ASSERT (0); - return -1; - } - - // now put it back in the notify queue - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; + return notification_queue_.purge_pending_notifications(eh, mask); #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ ACE_UNUSED_ARG (eh); @@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, #endif /* F_SETFD */ #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp = 0; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - { - delete [] temp; - return -1; - } - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; - + if (notification_queue_.open() == -1) + { + return -1; + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // There seems to be a Win32 bug with this... Set this into @@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void) ACE_TRACE ("ACE_Select_Reactor_Notify::close"); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b = 0; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); + notification_queue_.reset(); #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return this->notification_pipe_.close (); @@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler, ACE_Notification_Buffer buffer (event_handler, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Artificial scope to limit the duration of the mutex. - { - bool notification_required = false; + int notification_required = + notification_queue_.push_new_notification(buffer); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // No pending notifications. - if (this->notify_queue_.is_empty ()) - notification_required = true; - - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1 = 0; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - ++i) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } - - ACE_ASSERT (temp != 0); - *temp = buffer; - - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; + if (notification_required == -1) + { + return -1; + } - if (!notification_required) - { - // No failures. - safe_handler.release (); + if (notification_required == 0) + { + // No failures, the handler is now owned by the notification queue + safe_handler.release (); - return 0; - } + return 0; } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ @@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // Dispatch one message from the notify queue, and put another in // the pipe if one is available. Remember, the idea is to keep // exactly one message in the pipe at a time. - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - ACE_Notification_Buffer *temp = 0; + bool more_messages_queued = false; + ACE_Notification_Buffer next; + + result = notification_queue_.pop_next_notification( + buffer, more_messages_queued, next); - if (notify_queue_.is_empty ()) + if (result == 0) + { return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - bool write_next_buffer = false; - ACE_Notification_Buffer ** next = 0; - - if(!this->notify_queue_.is_empty()) - { - // The queue is not empty, need to queue another message. - this->notify_queue_.get (next, 0); - write_next_buffer = true; - } - - if(write_next_buffer) - { - (void) ACE::send( - this->notification_pipe_.write_handle(), - (char *)*next, sizeof(ACE_Notification_Buffer)); - } - } + } + if (result == -1) + { + return -1; + } + + if(more_messages_queued) + { + (void) ACE::send(this->notification_pipe_.write_handle(), + (char *)&next, sizeof(ACE_Notification_Buffer)); + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // If eh == 0 then another thread is unblocking the diff --git a/ACE/ace/Select_Reactor_Base.h b/ACE/ace/Select_Reactor_Base.h index 9ec4e642210..bb12154ca8c 100644 --- a/ACE/ace/Select_Reactor_Base.h +++ b/ACE/ace/Select_Reactor_Base.h @@ -27,7 +27,7 @@ #include "ace/Reactor_Impl.h" #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -# include "ace/Unbounded_Queue.h" +# include "ace/Notification_Queue.h" #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ #ifdef ACE_WIN32 @@ -253,24 +253,18 @@ protected: int max_notify_iterations_; #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // = This configuration queues up notifications in separate buffers that - // are in user-space, rather than stored in a pipe in the OS - // kernel. The kernel-level notifications are used only to trigger - // the Reactor to check its notification queue. This enables many - // more notifications to be stored than would otherwise be the case. - - /// Keeps track of allocated arrays of type - /// ACE_Notification_Buffer. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; - - /// Keeps track of all pending notifications. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; - - /// Keeps track of all free buffers. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; - - /// Synchronization for handling of queues. - ACE_SYNCH_MUTEX notify_queue_lock_; + /** + * @brief A user-space queue to store the notifications. + * + * The notification pipe has OS-specific size restrictions. That + * is, no more than a certain number of bytes may be stored in the + * pipe without blocking. This limit may be too small for certain + * applications. In this case, ACE can be configured to store all + * the events in user-space. The pipe is still needed to wake up + * the reactor thread, but only one event is sent through the pipe + * at a time. + */ + ACE_Notification_Queue notification_queue_; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc index 6f67e730e44..d91138a1ae9 100644 --- a/ACE/ace/ace.mpc +++ b/ACE/ace/ace.mpc @@ -125,6 +125,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca Mutex.cpp Netlink_Addr.cpp Notification_Strategy.cpp + Notification_Queue.cpp Obchunk.cpp Object_Manager.cpp Object_Manager_Base.cpp diff --git a/ACE/ace/ace_for_tao.mpc b/ACE/ace/ace_for_tao.mpc index 85798f0692f..76737dc380c 100644 --- a/ACE/ace/ace_for_tao.mpc +++ b/ACE/ace/ace_for_tao.mpc @@ -87,6 +87,7 @@ project(ACE_FOR_TAO) : acedefaults, install, svcconf, uuid, versioned_namespace, MMAP_Memory_Pool.cpp Mutex.cpp Notification_Strategy.cpp + Notification_Queue.cpp Obchunk.cpp Object_Manager.cpp Object_Manager_Base.cpp |