From d45ee5cd26f6c4d19bcae17cbad89b8dd01a2a6b Mon Sep 17 00:00:00 2001 From: nobody Date: Wed, 28 Aug 2002 20:53:07 +0000 Subject: This commit was manufactured by cvs2svn to create branch 'notification_problem_stage_1'. --- ace/Select_Reactor_Handler_Repository.cpp | 459 +++++++++++++++++++++ ace/Select_Reactor_Handler_Repository.h | 230 +++++++++++ ace/Select_Reactor_Handler_Repository.inl | 41 ++ ace/Select_Reactor_Notify.cpp | 603 ++++++++++++++++++++++++++++ ace/Select_Reactor_Notify.h | 200 +++++++++ tests/ChangeLog | 54 +++ tests/Select_Reactor_Notify_Stress_Test.cpp | 352 ++++++++++++++++ tests/Select_Reactor_Notify_Stress_Test.dsp | 162 ++++++++ 8 files changed, 2101 insertions(+) create mode 100644 ace/Select_Reactor_Handler_Repository.cpp create mode 100644 ace/Select_Reactor_Handler_Repository.h create mode 100644 ace/Select_Reactor_Handler_Repository.inl create mode 100644 ace/Select_Reactor_Notify.cpp create mode 100644 ace/Select_Reactor_Notify.h create mode 100644 tests/ChangeLog create mode 100644 tests/Select_Reactor_Notify_Stress_Test.cpp create mode 100644 tests/Select_Reactor_Notify_Stress_Test.dsp diff --git a/ace/Select_Reactor_Handler_Repository.cpp b/ace/Select_Reactor_Handler_Repository.cpp new file mode 100644 index 00000000000..62eee31185e --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.cpp @@ -0,0 +1,459 @@ +#include "Select_Reactor_Handler_Repository.h" +#include "ACE.h" +#include "Select_Reactor_Base.h" +#include "Reactor.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Select_Reactor_Handler_Repository.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, + Select_Reactor_Handler_Repository, + "$Id$") + +#if defined (ACE_WIN32) +#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) +#else +#define ACE_SELECT_REACTOR_HANDLE(H) (H) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) +#endif /* ACE_WIN32 */ + + +ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void) +{ +} + +// Performs sanity checking on the ACE_HANDLE. +int +ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle == ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle < 0 || handle >= this->max_size_) +#endif /* ACE_WIN32 */ + { + errno = EINVAL; + return 1; + } + else + return 0; +} + +// Performs sanity checking on the ACE_HANDLE. + +int +ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle != ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle >= 0 && handle < this->max_handlep1_) +#endif /* ACE_WIN32 */ + return 1; + else + { + errno = EINVAL; + return 0; + } +} + +size_t +ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); + + return this->max_handlep1_; +} + +int +ACE_Select_Reactor_Handler_Repository::open (size_t size) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); + this->max_size_ = size; + this->max_handlep1_ = 0; + +#if defined (ACE_WIN32) + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Tuple[size], + -1); + + // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. + for (size_t h = 0; h < size; h++) + { + ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; + } +#else + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Handler *[size], + -1); + + // Initialize the ACE_Event_Handler * to NULL. + for (size_t h = 0; h < size; h++) + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; +#endif /* ACE_WIN32 */ + + // Try to increase the number of handles if is greater than + // the current limit. + return ACE::set_handle_limit (size); +} + +// Initialize a repository of the appropriate . + +ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository ( + ACE_Select_Reactor_Impl &select_reactor) + : select_reactor_ (select_reactor), + max_size_ (0), + max_handlep1_ (0), + event_handlers_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); +} + +int +ACE_Select_Reactor_Handler_Repository::unbind_all (void) +{ + // Unbind all of the s. + for (int handle = 0; + handle < this->max_handlep1_; + handle++) + this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), + ACE_Event_Handler::ALL_EVENTS_MASK); + + return 0; +} + +int +ACE_Select_Reactor_Handler_Repository::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); + + if (this->event_handlers_ != 0) + { + this->unbind_all (); + + delete [] this->event_handlers_; + this->event_handlers_ = 0; + } + return 0; +} + +// Return the associated with the . + +ACE_Event_Handler * +ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, + size_t *index_p) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); + + ACE_Event_Handler *eh = 0; + ssize_t i; + + // Only bother to search for the if it's in range. + if (this->handle_in_range (handle)) + { +#if defined (ACE_WIN32) + i = 0; + + for (; i < this->max_handlep1_; i++) + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); + break; + } +#else + i = handle; + + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); +#endif /* ACE_WIN32 */ + } + else + // g++ can't figure out that won't be used below if the handle + // is out of range, so keep it happy by defining here . . . + i = 0; + + if (eh != 0) + { + if (index_p != 0) + *index_p = i; + } + else + errno = ENOENT; + + return eh; +} + +// Bind the to the . + +int +ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); + + if (handle == ACE_INVALID_HANDLE) + handle = event_handler->get_handle (); + + if (this->invalid_handle (handle)) + return -1; + +#if defined (ACE_WIN32) + int assigned_slot = -1; + + for (ssize_t i = 0; i < this->max_handlep1_; i++) + { + // Found it, so let's just reuse this location. + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + assigned_slot = i; + break; + } + // Here's the first free slot, so let's take it. + else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE + && assigned_slot == -1) + assigned_slot = i; + } + + if (assigned_slot > -1) + // We found a free spot, let's reuse it. + { + ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; + } + else if (this->max_handlep1_ < this->max_size_) + { + // Insert at the end of the active portion. + ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; + this->max_handlep1_++; + } + else + { + // No more room at the inn! + errno = ENOMEM; + return -1; + } +#else + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; + + if (this->max_handlep1_ < handle + 1) + this->max_handlep1_ = handle + 1; +#endif /* ACE_WIN32 */ + + // Add the for this in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::ADD_MASK); + + // Note the fact that we've changed the state of the , + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + return 0; +} + +// Remove the binding of . + +int +ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); + + size_t slot; + ACE_Event_Handler *eh = this->find (handle, &slot); + + if (eh == 0) + return -1; + + // Clear out the bits in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::CLR_MASK); + + // And suspend_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.suspend_set_, + ACE_Reactor::CLR_MASK); + + // Note the fact that we've changed the state of the , + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + // Close down the unless we've been instructed not + // to. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) + eh->handle_close (handle, mask); + + // If there are no longer any outstanding events on this + // then we can totally shut down the Event_Handler. + if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) +#if defined (ACE_WIN32) + { + ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0; + + if (this->max_handlep1_ == (int) slot + 1) + { + // We've deleted the last entry (i.e., i + 1 == the current + // size of the array), so we need to figure out the last + // valid place in the array that we should consider in + // subsequent searches. + + int i; + + for (i = this->max_handlep1_ - 1; + i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; + i--) + continue; + + this->max_handlep1_ = i + 1; + } + } +#else + { + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; + + if (this->max_handlep1_ == handle + 1) + { + // We've deleted the last entry, so we need to figure out + // the last valid place in the array that is worth looking + // at. + ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); + ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); + ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); + + ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); + ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); + ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); + + // Compute the maximum of six values. + this->max_handlep1_ = wait_rd_max; + if (this->max_handlep1_ < wait_wr_max) + this->max_handlep1_ = wait_wr_max; + if (this->max_handlep1_ < wait_ex_max) + this->max_handlep1_ = wait_ex_max; + + if (this->max_handlep1_ < suspend_rd_max) + this->max_handlep1_ = suspend_rd_max; + if (this->max_handlep1_ < suspend_wr_max) + this->max_handlep1_ = suspend_wr_max; + if (this->max_handlep1_ < suspend_ex_max) + this->max_handlep1_ = suspend_ex_max; + + this->max_handlep1_++; + } + } +#endif /* ACE_WIN32 */ + + return 0; +} + +/****************************************************************************/ + +ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void) +{ +} + +ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator + (const ACE_Select_Reactor_Handler_Repository *s) + : rep_ (s), + current_ (-1) +{ + this->advance (); +} + +// Pass back the that hasn't been seen in the Set. +// Returns 0 when all items have been seen, else 1. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) +{ + int result = 1; + + if (this->current_ >= this->rep_->max_handlep1_) + result = 0; + else + next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, + this->current_); + return result; +} + +int +ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const +{ + return this->current_ >= this->rep_->max_handlep1_; +} + +// Move forward by one element in the set. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) +{ + if (this->current_ < this->rep_->max_handlep1_) + this->current_++; + + while (this->current_ < this->rep_->max_handlep1_) + if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) + return 1; + else + this->current_++; + + return this->current_ < this->rep_->max_handlep1_; +} + +// Dump the state of an object. + +void +ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +void +ACE_Select_Reactor_Handler_Repository::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), + this->max_handlep1_, this->max_size_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("["))); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); + iter.next (eh) != 0; + iter.advance ()) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"), + eh, eh->get_handle ())); + + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]"))); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) diff --git a/ace/Select_Reactor_Handler_Repository.h b/ace/Select_Reactor_Handler_Repository.h new file mode 100644 index 00000000000..b6cc5bc7189 --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.h @@ -0,0 +1,230 @@ +/* -*- C++ -*- */ +//============================================================================= +/** + * @file Select_Reactor_Handler_Repository.h + * + * $Id$ + * + * @author Douglas C. Schmidt + */ +//============================================================================= + +#ifndef ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H +#define ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H + +#include "ace/pre.h" + +#include "ACE_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "Event_Handler.h" + +class ACE_Select_Reactor_Impl; +class ACE_Event_Handler; + +/** + * @class ACE_Event_Tuple + * + * @brief An ACE_Event_Handler and its associated ACE_HANDLE. + * + * One is registered for one or more + * . At various points, this information must be + * stored explicitly. This class provides a lightweight + * mechanism to do so. + */ +class ACE_Export ACE_Event_Tuple +{ +public: + /// Default constructor. + ACE_Event_Tuple (void); + + /// Constructor. + ACE_Event_Tuple (ACE_Event_Handler *eh, + ACE_HANDLE h); + + /// Destructor. + ~ACE_Event_Tuple (void); + + /// Equality operator. + int operator== (const ACE_Event_Tuple &rhs) const; + + /// Inequality operator. + int operator!= (const ACE_Event_Tuple &rhs) const; + + /// Handle. + ACE_HANDLE handle_; + + /// associated with the . + ACE_Event_Handler *event_handler_; +}; + + +//=================================================================== +/** + * @class ACE_Select_Reactor_Handler_Repository + * + * @brief Used to map s onto the appropriate + * *. + * + * This class is necessary to shield differences between UNIX + * and Win32. In UNIX, is an int, whereas in Win32 + * it's a void *. This class hides all these details from the + * bulk of the code. All of these methods + * are called with the main token lock held. + */ +class ACE_Export ACE_Select_Reactor_Handler_Repository +{ +public: + friend class ACE_Select_Reactor_Handler_Repository_Iterator; + + // = Initialization and termination methods. + /// Default "do-nothing" constructor. + ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &); + + /// Destructor. + ~ACE_Select_Reactor_Handler_Repository (void); + + /// Initialize a repository of the appropriate . + /** + * On Unix platforms, the size parameter should be as large as the + * maximum number of file descriptors allowed for a given process. + * This is necessary since a file descriptor is used to directly + * index the array of event handlers maintained by the Reactor's + * handler repository. Direct indexing is used for efficiency + * reasons. + */ + int open (size_t size); + + /// Close down the repository. + int close (void); + + // = Search structure operations. + + /** + * Return the associated with . + * If is non-0, then return the index location of the + * , if found. + */ + ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0); + + /// Bind the to the with the + /// appropriate settings. + int bind (ACE_HANDLE, + ACE_Event_Handler *, + ACE_Reactor_Mask); + + /// Remove the binding of in accordance with the . + int unbind (ACE_HANDLE, + ACE_Reactor_Mask mask); + + /// Remove all the tuples. + int unbind_all (void); + + // = Sanity checking. + + // Check the to make sure it's a valid ACE_HANDLE that + // within the range of legal handles (i.e., >= 0 && < max_size_). + int invalid_handle (ACE_HANDLE handle); + + // Check the to make sure it's a valid ACE_HANDLE that + // within the range of currently registered handles (i.e., >= 0 && < + // max_handlep1_). + int handle_in_range (ACE_HANDLE handle); + + // = Accessors. + /// Returns the current table size. + size_t size (void) const; + + /// Maximum ACE_HANDLE value, plus 1. + size_t max_handlep1 (void); + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + /// Reference to our . + ACE_Select_Reactor_Impl &select_reactor_; + + /// Maximum number of handles. + ssize_t max_size_; + + /// The highest currently active handle, plus 1 (ranges between 0 and + /// . + int max_handlep1_; + +#if defined (ACE_WIN32) + // = The mapping from to . + + /** + * The NT version implements this via a dynamically allocated + * array of . Since NT implements ACE_HANDLE + * as a void * we can't directly index into this array. Therefore, + * we just do a linear search (for now). Next, we'll modify + * things to use hashing or something faster... + */ + ACE_Event_Tuple *event_handlers_; +#else + /** + * The UNIX version implements this via a dynamically allocated + * array of that is indexed directly using + * the ACE_HANDLE value. + */ + ACE_Event_Handler **event_handlers_; +#endif /* ACE_WIN32 */ +}; + +//================================================================= + +/** + * @class ACE_Select_Reactor_Handler_Repository_Iterator + * + * @brief Iterate through the . + */ +class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator +{ +public: + // = Initialization method. + ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s); + + /// dtor. + ~ACE_Select_Reactor_Handler_Repository_Iterator (void); + + // = Iteration methods. + + /// Pass back the that hasn't been seen in the Set. + /// Returns 0 when all items have been seen, else 1. + int next (ACE_Event_Handler *&next_item); + + /// Returns 1 when all items have been seen, else 0. + int done (void) const; + + /// Move forward by one element in the set. Returns 0 when all the + /// items in the set have been seen, else 1. + int advance (void); + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + /// Reference to the Handler_Repository we are iterating over. + const ACE_Select_Reactor_Handler_Repository *rep_; + + /// Pointer to the current iteration level. + ssize_t current_; +}; + + +#if defined (__ACE_INLINE__) +#include "Select_Reactor_Handler_Repository.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /*ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H*/ diff --git a/ace/Select_Reactor_Handler_Repository.inl b/ace/Select_Reactor_Handler_Repository.inl new file mode 100644 index 00000000000..9c9c234ec56 --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.inl @@ -0,0 +1,41 @@ +/* -*- C++ -*- */ +//$Id$ +ACE_INLINE +ACE_Event_Tuple::~ACE_Event_Tuple (void) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (void) +: handle_ (ACE_INVALID_HANDLE), + event_handler_ (0) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, + ACE_HANDLE h) +: handle_ (h), + event_handler_ (eh) +{ +} + +ACE_INLINE int +ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const +{ + return this->handle_ == rhs.handle_; +} + +ACE_INLINE int +ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const +{ + return !(*this == rhs); +} + +/************************************************************/ + +ACE_INLINE size_t +ACE_Select_Reactor_Handler_Repository::size (void) const +{ + return this->max_size_; +} diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp new file mode 100644 index 00000000000..aaf43e8a110 --- /dev/null +++ b/ace/Select_Reactor_Notify.cpp @@ -0,0 +1,603 @@ +#include "Select_Reactor_Notify.h" +#include "ACE.h" +#include "Select_Reactor_Base.h" + +ACE_RCSID(ace, + Select_Reactor_Notify, + "$Id$") + + +ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void) + : max_notify_iterations_ (-1) +{ +} + +ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void) +{ +} + +void +ACE_Select_Reactor_Notify::max_notify_iterations (int iterations) +{ + // Must always be > 0 or < 0 to optimize the loop exit condition. + if (iterations == 0) + iterations = 1; + + this->max_notify_iterations_ = iterations; +} + +int +ACE_Select_Reactor_Notify::max_notify_iterations (void) +{ + return this->max_notify_iterations_; +} + +int +ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask ) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (this->notify_queue_.is_empty ()) + return 0; + + ACE_Notification_Buffer *temp; + ACE_Unbounded_Queue local_queue; + + size_t queue_size = this->notify_queue_.size (); + int number_purged = 0; + size_t i; + for (i = 0; i < queue_size; ++i) + { + if (-1 == this->notify_queue_.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + + // If this is not a Reactor notify (it is for a particular handler), + // and it matches the specified handler (or purging all), + // and applying the mask would totally eliminate the notification, then + // release it and count the number purged. + if ((0 != temp->eh_) && + (0 == eh || eh == temp->eh_) && + ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask + // is left with nothing when + // applying the mask + { + if (-1 == this->free_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + ++number_purged; + } + else + { + // To preserve it, move it to the local_queue. + // But first, if this is not a Reactor notify (it is for a particularhandler), + // and it matches the specified handler (or purging all), then + // apply the mask + if ((0 != temp->eh_) && + (0 == eh || eh == temp->eh_)) + ACE_CLR_BITS(temp->mask_, mask); + if (-1 == local_queue.enqueue_head (temp)) + return -1; + } + } + + if (this->notify_queue_.size ()) + { // should be empty! + ACE_ASSERT (0); + return -1; + } + + // now put it back in the notify queue + queue_size = local_queue.size (); + for (i = 0; i < queue_size; ++i) + { + if (-1 == local_queue.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + + if (-1 == this->notify_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } + + return number_purged; + +#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ + ACE_UNUSED_ARG (eh); + ACE_UNUSED_ARG (mask); + ACE_NOTSUP_RETURN (-1); +#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ +} + +void +ACE_Select_Reactor_Notify::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_)); + this->notification_pipe_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +int +ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, + ACE_Timer_Queue *, + int disable_notify_pipe) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::open"); + + if (disable_notify_pipe == 0) + { + this->select_reactor_ = + ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); + + if (this->select_reactor_ == 0) + { + errno = EINVAL; + return -1; + } + + if (this->notification_pipe_.open () == -1) + return -1; +#if defined (F_SETFD) + ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); + ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); +#endif /* F_SETFD */ + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer *temp; + + ACE_NEW_RETURN (temp, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp) == -1) + { + delete [] temp; + return -1; + } + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++) + if (free_queue_.enqueue_head (temp + i) == -1) + return -1; + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + // There seems to be a Win32 bug with this... Set this into + // non-blocking mode. + if (ACE::set_flags (this->notification_pipe_.read_handle (), + ACE_NONBLOCK) == -1) + return -1; + else + return this->select_reactor_->register_handler + (this->notification_pipe_.read_handle (), + this, + ACE_Event_Handler::READ_MASK); + } + else + { + this->select_reactor_ = 0; + return 0; + } +} + +int +ACE_Select_Reactor_Notify::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::close"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Free up the dynamically allocated resources. + ACE_Notification_Buffer **b; + + for (ACE_Unbounded_Queue_Iterator alloc_iter (this->alloc_queue_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + { + delete [] *b; + *b = 0; + } + + this->alloc_queue_.reset (); + this->notify_queue_.reset (); + this->free_queue_.reset (); +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + return this->notification_pipe_.close (); +} + +int +ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); + + // Just consider this method a "no-op" if there's no + // configured. + if (this->select_reactor_ == 0) + return 0; + + ACE_Notification_Buffer buffer (eh, mask); + + int notification_required = 1; + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Artificial scope to limit the duration of the mutex. + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + this->notify_queue_lock_, -1); + + if (this->notify_message_to_queue (buffer, + notification_required) == -1) + return -1; + } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + ssize_t n = 0; + + // Send a notification message on the pipe if required. + if (notification_required) + { + n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + } + + if (n == -1) + return -1; + + return 0; +} + + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +int +ACE_Select_Reactor_Notify::notify_message_to_queue (ACE_Notification_Buffer &buffer, + int ¬ification_required) +{ + // If the notification queue is not empty, set the + // notification_required flag to zero. + if (!this->notify_queue_.is_empty ()) + notification_required = 0; + + ACE_Notification_Buffer *temp = 0; + + // Take a node from the free list. + if (this->free_queue_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; + + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp1) == -1) + { + delete [] temp1; + return -1; + } + + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_queue_.enqueue_head (temp1 + i); + + temp = temp1; + } + + + ACE_ASSERT (temp != 0); + *temp = buffer; + + // Enqueue the node in the + if (this->notify_queue_.enqueue_tail (temp) == -1) + return -1; + + return 0; +} + +#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE*/ + +int +ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, + ACE_Handle_Set &rd_mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications"); + + ACE_HANDLE read_handle = + this->notification_pipe_.read_handle (); + + if (read_handle != ACE_INVALID_HANDLE + && rd_mask.is_set (read_handle)) + { + number_of_active_handles--; + rd_mask.clr_bit (read_handle); + return this->handle_input (read_handle); + } + else + return 0; +} + + +ACE_HANDLE +ACE_Select_Reactor_Notify::notify_handle (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); + + return this->notification_pipe_.read_handle (); +} + + +// Special trick to unblock or + * when updates occur other than in the main + * thread. To do this, we signal an + * auto-reset event the is listening on. + * If an and is + * passed to , the appropriate method is + * dispatched in the context of the thread. + */ +class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify +{ +public: + /// Constructor. + ACE_Select_Reactor_Notify (void); + + /// Destructor. + ~ACE_Select_Reactor_Notify (void); + + // = Initialization and termination methods. + /// Initialize. + virtual int open (ACE_Reactor_Impl *, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0); + + /// Destroy. + virtual int close (void); + + /** + * Called by a thread when it wants to unblock the + * . This wakeups the if + * currently blocked in