diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-08-08 13:58:09 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-08-08 13:58:09 +0000 |
commit | a6eceedfa723c56b54cac8f3a1714d7aafd1abd6 (patch) | |
tree | a3db83114ba30cd300ae76dc6c743bafe92cecc9 | |
parent | 3b70657b3925dcd1369f9056cb4c488700b02db1 (diff) | |
download | ATCD-a6eceedfa723c56b54cac8f3a1714d7aafd1abd6.tar.gz |
ChangeLogTag:Tue Aug 06 00:39:15 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 1005 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 357 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.i | 51 | ||||
-rw-r--r-- | ace/Select_Reactor_Handler_Repository.cpp | 459 | ||||
-rw-r--r-- | ace/Select_Reactor_Handler_Repository.h | 230 | ||||
-rw-r--r-- | ace/Select_Reactor_Handler_Repository.inl | 41 | ||||
-rw-r--r-- | ace/Select_Reactor_Notify.cpp | 576 | ||||
-rw-r--r-- | ace/Select_Reactor_Notify.h | 189 | ||||
-rw-r--r-- | ace/Select_Reactor_T.cpp | 3 | ||||
-rw-r--r-- | ace/ace_dll.dsp | 20 | ||||
-rw-r--r-- | ace/ace_lib.dsp | 28 | ||||
-rw-r--r-- | tests/ChangeLog | 25 |
12 files changed, 1569 insertions, 1415 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index f74cc42835c..a1777a443a5 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -15,997 +15,6 @@ ACE_RCSID(ace, Select_Reactor_Base, "$Id$") -#if defined (ACE_WIN32) -#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) -#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) -#else -#define ACE_SELECT_REACTOR_HANDLE(H) (H) -#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) -#endif /* ACE_WIN32 */ - -// Performs sanity checking on the ACE_HANDLE. - -int -ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); -#if defined (ACE_WIN32) - // It's too expensive to perform more exhaustive validity checks on - // Win32 due to the way that they implement SOCKET HANDLEs. - if (handle == ACE_INVALID_HANDLE) -#else /* !ACE_WIN32 */ - if (handle < 0 || handle >= this->max_size_) -#endif /* ACE_WIN32 */ - { - errno = EINVAL; - return 1; - } - else - return 0; -} - -// Performs sanity checking on the ACE_HANDLE. - -int -ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); -#if defined (ACE_WIN32) - // It's too expensive to perform more exhaustive validity checks on - // Win32 due to the way that they implement SOCKET HANDLEs. - if (handle != ACE_INVALID_HANDLE) -#else /* !ACE_WIN32 */ - if (handle >= 0 && handle < this->max_handlep1_) -#endif /* ACE_WIN32 */ - return 1; - else - { - errno = EINVAL; - return 0; - } -} - -size_t -ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); - - return this->max_handlep1_; -} - -int -ACE_Select_Reactor_Handler_Repository::open (size_t size) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); - this->max_size_ = size; - this->max_handlep1_ = 0; - -#if defined (ACE_WIN32) - // Try to allocate the memory. - ACE_NEW_RETURN (this->event_handlers_, - ACE_Event_Tuple[size], - -1); - - // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. - for (size_t h = 0; h < size; h++) - { - ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; - } -#else - // Try to allocate the memory. - ACE_NEW_RETURN (this->event_handlers_, - ACE_Event_Handler *[size], - -1); - - // Initialize the ACE_Event_Handler * to NULL. - for (size_t h = 0; h < size; h++) - ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; -#endif /* ACE_WIN32 */ - - // Try to increase the number of handles if <size> is greater than - // the current limit. - return ACE::set_handle_limit (size); -} - -// Initialize a repository of the appropriate <size>. - -ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor) - : select_reactor_ (select_reactor), - max_size_ (0), - max_handlep1_ (0), - event_handlers_ (0) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); -} - -int -ACE_Select_Reactor_Handler_Repository::unbind_all (void) -{ - // Unbind all of the <handle, ACE_Event_Handler>s. - for (int handle = 0; - handle < this->max_handlep1_; - handle++) - this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), - ACE_Event_Handler::ALL_EVENTS_MASK); - - return 0; -} - -int -ACE_Select_Reactor_Handler_Repository::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); - - if (this->event_handlers_ != 0) - { - this->unbind_all (); - - delete [] this->event_handlers_; - this->event_handlers_ = 0; - } - return 0; -} - -// Return the <ACE_Event_Handler *> associated with the <handle>. - -ACE_Event_Handler * -ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, - size_t *index_p) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); - - ACE_Event_Handler *eh = 0; - ssize_t i; - - // Only bother to search for the <handle> if it's in range. - if (this->handle_in_range (handle)) - { -#if defined (ACE_WIN32) - i = 0; - - for (; i < this->max_handlep1_; i++) - if (ACE_SELECT_REACTOR_HANDLE (i) == handle) - { - eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); - break; - } -#else - i = handle; - - eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); -#endif /* ACE_WIN32 */ - } - else - // g++ can't figure out that <i> won't be used below if the handle - // is out of range, so keep it happy by defining <i> here . . . - i = 0; - - if (eh != 0) - { - if (index_p != 0) - *index_p = i; - } - else - errno = ENOENT; - - return eh; -} - -// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>. - -int -ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); - - if (handle == ACE_INVALID_HANDLE) - handle = event_handler->get_handle (); - - if (this->invalid_handle (handle)) - return -1; - -#if defined (ACE_WIN32) - int assigned_slot = -1; - - for (ssize_t i = 0; i < this->max_handlep1_; i++) - { - // Found it, so let's just reuse this location. - if (ACE_SELECT_REACTOR_HANDLE (i) == handle) - { - assigned_slot = i; - break; - } - // Here's the first free slot, so let's take it. - else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE - && assigned_slot == -1) - assigned_slot = i; - } - - if (assigned_slot > -1) - // We found a free spot, let's reuse it. - { - ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; - } - else if (this->max_handlep1_ < this->max_size_) - { - // Insert at the end of the active portion. - ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; - this->max_handlep1_++; - } - else - { - // No more room at the inn! - errno = ENOMEM; - return -1; - } -#else - ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; - - if (this->max_handlep1_ < handle + 1) - this->max_handlep1_ = handle + 1; -#endif /* ACE_WIN32 */ - - // Add the <mask> for this <handle> in the Select_Reactor's wait_set. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.wait_set_, - ACE_Reactor::ADD_MASK); - - // Note the fact that we've changed the state of the <wait_set_>, - // which is used by the dispatching loop to determine whether it can - // keep going or if it needs to reconsult select(). - this->select_reactor_.state_changed_ = 1; - - return 0; -} - -// Remove the binding of <ACE_HANDLE>. - -int -ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); - - size_t slot; - ACE_Event_Handler *eh = this->find (handle, &slot); - - if (eh == 0) - return -1; - - // Clear out the <mask> bits in the Select_Reactor's wait_set. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.wait_set_, - ACE_Reactor::CLR_MASK); - - // And suspend_set. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.suspend_set_, - ACE_Reactor::CLR_MASK); - - // Note the fact that we've changed the state of the <wait_set_>, - // which is used by the dispatching loop to determine whether it can - // keep going or if it needs to reconsult select(). - this->select_reactor_.state_changed_ = 1; - - // Close down the <Event_Handler> unless we've been instructed not - // to. - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) - eh->handle_close (handle, mask); - - // If there are no longer any outstanding events on this <handle> - // then we can totally shut down the Event_Handler. - if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 - && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 - && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) -#if defined (ACE_WIN32) - { - ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0; - - if (this->max_handlep1_ == (int) slot + 1) - { - // We've deleted the last entry (i.e., i + 1 == the current - // size of the array), so we need to figure out the last - // valid place in the array that we should consider in - // subsequent searches. - - int i; - - for (i = this->max_handlep1_ - 1; - i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; - i--) - continue; - - this->max_handlep1_ = i + 1; - } - } -#else - { - ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; - - if (this->max_handlep1_ == handle + 1) - { - // We've deleted the last entry, so we need to figure out - // the last valid place in the array that is worth looking - // at. - ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); - ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); - ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); - - ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); - ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); - ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); - - // Compute the maximum of six values. - this->max_handlep1_ = wait_rd_max; - if (this->max_handlep1_ < wait_wr_max) - this->max_handlep1_ = wait_wr_max; - if (this->max_handlep1_ < wait_ex_max) - this->max_handlep1_ = wait_ex_max; - - if (this->max_handlep1_ < suspend_rd_max) - this->max_handlep1_ = suspend_rd_max; - if (this->max_handlep1_ < suspend_wr_max) - this->max_handlep1_ = suspend_wr_max; - if (this->max_handlep1_ < suspend_ex_max) - this->max_handlep1_ = suspend_ex_max; - - this->max_handlep1_++; - } - } -#endif /* ACE_WIN32 */ - - return 0; -} - -ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator - (const ACE_Select_Reactor_Handler_Repository *s) - : rep_ (s), - current_ (-1) -{ - this->advance (); -} - -// Pass back the <next_item> that hasn't been seen in the Set. -// Returns 0 when all items have been seen, else 1. - -int -ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) -{ - int result = 1; - - if (this->current_ >= this->rep_->max_handlep1_) - result = 0; - else - next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, - this->current_); - return result; -} - -int -ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const -{ - return this->current_ >= this->rep_->max_handlep1_; -} - -// Move forward by one element in the set. - -int -ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) -{ - if (this->current_ < this->rep_->max_handlep1_) - this->current_++; - - while (this->current_ < this->rep_->max_handlep1_) - if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) - return 1; - else - this->current_++; - - return this->current_ < this->rep_->max_handlep1_; -} - -// Dump the state of an object. - -void -ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_)); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_)); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -void -ACE_Select_Reactor_Handler_Repository::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), - this->max_handlep1_, this->max_size_)); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("["))); - - ACE_Event_Handler *eh = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); - iter.next (eh) != 0; - iter.advance ()) - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"), - eh, eh->get_handle ())); - - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]"))); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) - -ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void) - : max_notify_iterations_ (-1) -{ -} - -void -ACE_Select_Reactor_Notify::max_notify_iterations (int iterations) -{ - // Must always be > 0 or < 0 to optimize the loop exit condition. - if (iterations == 0) - iterations = 1; - - this->max_notify_iterations_ = iterations; -} - -int -ACE_Select_Reactor_Notify::max_notify_iterations (void) -{ - return this->max_notify_iterations_; -} - -// purge_pending_notifications -// Removes all entries from the notify_queue_ and each one that -// matches <eh> is put on the free_queue_. The rest are saved on a -// local queue and copied back to the notify_queue_ at the end. -// Returns the number of entries removed. Returns -1 on error. -// ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined. -int -ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask ) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular handler), - // and it matches the specified handler (or purging all), - // and applying the mask would totally eliminate the notification, then - // release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask - // is left with nothing when - // applying the mask - { - if (-1 == this->free_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a particularhandler), - // and it matches the specified handler (or purging all), then - // apply the mask - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { // should be empty! - ACE_ASSERT (0); - return -1; - } - - // now put it back in the notify queue - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; - -#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ - ACE_UNUSED_ARG (eh); - ACE_UNUSED_ARG (mask); - ACE_NOTSUP_RETURN (-1); -#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ -} - -void -ACE_Select_Reactor_Notify::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_)); - this->notification_pipe_.dump (); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -int -ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, - ACE_Timer_Queue *, - int disable_notify_pipe) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::open"); - - if (disable_notify_pipe == 0) - { - this->select_reactor_ = - ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); - - if (select_reactor_ == 0) - { - errno = EINVAL; - return -1; - } - - if (this->notification_pipe_.open () == -1) - return -1; -#if defined (F_SETFD) - ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); - ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); -#endif /* F_SETFD */ - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - { - delete [] temp; - return -1; - } - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; - -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - - // There seems to be a Win32 bug with this... Set this into - // non-blocking mode. - if (ACE::set_flags (this->notification_pipe_.read_handle (), - ACE_NONBLOCK) == -1) - return -1; - else - return this->select_reactor_->register_handler - (this->notification_pipe_.read_handle (), - this, - ACE_Event_Handler::READ_MASK); - } - else - { - this->select_reactor_ = 0; - return 0; - } -} - -int -ACE_Select_Reactor_Notify::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::close"); - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - - return this->notification_pipe_.close (); -} - -int -ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); - - // Just consider this method a "no-op" if there's no - // <ACE_Select_Reactor> configured. - if (this->select_reactor_ == 0) - return 0; - - ACE_Notification_Buffer buffer (eh, mask); - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Artificial scope to limit the duration of the mutex. - { - // int notification_required = 0; - - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // No pending notifications. - - // We will send notify for every message.. - // if (this->notify_queue_.is_empty ()) - // notification_required = 1; - - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - i++) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } - - ACE_ASSERT (temp != 0); - *temp = buffer; - - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; - } -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - - ssize_t n = ACE::send (this->notification_pipe_.write_handle (), - (char *) &buffer, - sizeof buffer, - timeout); - if (n == -1) - return -1; - - return 0; -} - -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. - -int -ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, - ACE_Handle_Set &rd_mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications"); - - ACE_HANDLE read_handle = - this->notification_pipe_.read_handle (); - - if (read_handle != ACE_INVALID_HANDLE - && rd_mask.is_set (read_handle)) - { - number_of_active_handles--; - rd_mask.clr_bit (read_handle); - return this->handle_input (read_handle); - } - else - return 0; -} - - -ACE_HANDLE -ACE_Select_Reactor_Notify::notify_handle (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); - - return this->notification_pipe_.read_handle (); -} - - -// Special trick to unblock <select> when updates occur in somewhere -// other than the main <ACE_Select_Reactor> thread. All we do is -// write data to a pipe that the <ACE_Select_Reactor> is listening on. -// Thanks to Paul Stephenson for suggesting this approach. -int -ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer) -{ - // There is tonnes of code that can be abstracted... -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - ACE_Notification_Buffer *temp; - - ACE_UNUSED_ARG (buffer); - - // If the queue is empty just return 0 - if (notify_queue_.is_empty ()) - return 0; - - if (this->notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - if (temp->eh_ != 0) - { - // If the queue had a buffer that has an event handler, put - // the element back in the queue and return a 1 - if (this->notify_queue_.enqueue_head (temp) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enque_head")), - -1); - } - - return 1; - } - // Else put the element in the free queue - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } -#else - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - return 1; - -#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - - // has no dispatchable buffer - return 0; -} - -int -ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) -{ - int result = 0; -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Dispatch all messages that are in the <notify_queue_>. - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - ACE_Notification_Buffer *temp; - - if (notify_queue_.is_empty ()) - return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - { - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } -#else - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - { - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("invalid mask = %d\n"), - buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - - return 1; -} - -int -ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, - ACE_Notification_Buffer &buffer) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe"); - - ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer); - - if (n > 0) - { - // Check to see if we've got a short read. - if (n != sizeof buffer) - { - ssize_t remainder = sizeof buffer - n; - - // If so, try to recover by reading the remainder. If this - // doesn't work we're in big trouble since the input stream - // won't be aligned correctly. I'm not sure quite what to - // do at this point. It's probably best just to return -1. - if (ACE::recv (handle, - ((char *) &buffer) + n, - remainder) != remainder) - return -1; - } - - - return 1; - } - - // Return -1 if things have gone seriously wrong. - if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) - return -1; - - return 0; -} - - -int -ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); - // Precondition: this->select_reactor_.token_.current_owner () == - // ACE_Thread::self (); - - int number_dispatched = 0; - int result = 0; - ACE_Notification_Buffer buffer; - - while ((result = this->read_notify_pipe (handle, buffer)) > 0) - { - // Dispatch the buffer - // NOTE: We count only if we made any dispatches ie. upcalls. - if (this->dispatch_notify (buffer) > 0) - number_dispatched++; - - // Bail out if we've reached the <notify_threshold_>. Note that - // by default <notify_threshold_> is -1, so we'll loop until all - // the notifications in the pipe have been dispatched. - if (number_dispatched == this->max_notify_iterations_) - break; - } - - // Reassign number_dispatched to -1 if things have gone seriously - // wrong. - if (result < 0) - number_dispatched = -1; - - // Enqueue ourselves into the list of waiting threads. When we - // reacquire the token we'll be off and running again with ownership - // of the token. The postcondition of this call is that - // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. - this->select_reactor_->renew (); - return number_dispatched; -} - // Perform GET, CLR, SET, and ADD operations on the Handle_Sets. // // GET = 1, Retrieve current value @@ -1112,17 +121,3 @@ ACE_Select_Reactor_Impl::resumable_handler (void) return 0; } - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>; -template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>; -template class ACE_Node <ACE_Notification_Buffer *>; -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *> -#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *> -#pragma instantiate ACE_Node <ACE_Notification_Buffer *> -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index 6f4c4923d54..24df01219c3 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -24,12 +24,8 @@ #include "ace/Event_Handler.h" #include "ace/Handle_Set.h" #include "ace/Token.h" -#include "ace/Pipe.h" #include "ace/Reactor_Impl.h" - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) -#include "ace/Unbounded_Queue.h" -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#include "Select_Reactor_Handler_Repository.h" // Add useful typedefs to simplify the following code. typedef void (ACE_Handle_Set::*ACE_FDS_PTMF) (ACE_HANDLE); @@ -42,7 +38,7 @@ typedef ACE_Noop_Token ACE_SELECT_TOKEN; #endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */ // Forward declaration. -class ACE_Select_Reactor_Impl; +class ACE_Select_Reactor_Notify; /** * @class ACE_Select_Reactor_Handle_Set @@ -63,355 +59,8 @@ public: ACE_Handle_Set ex_mask_; }; -/** - * @class ACE_Event_Tuple - * - * @brief An ACE_Event_Handler and its associated ACE_HANDLE. - * - * One <ACE_Event_Handler> is registered for one or more - * <ACE_HANDLE>. At various points, this information must be - * stored explicitly. This class provides a lightweight - * mechanism to do so. - */ -class ACE_Export ACE_Event_Tuple -{ -public: - /// Default constructor. - ACE_Event_Tuple (void); - - /// Constructor. - ACE_Event_Tuple (ACE_Event_Handler *eh, - ACE_HANDLE h); - - /// Destructor. - ~ACE_Event_Tuple (void); - - /// Equality operator. - int operator== (const ACE_Event_Tuple &rhs) const; - - /// Inequality operator. - int operator!= (const ACE_Event_Tuple &rhs) const; - - /// Handle. - ACE_HANDLE handle_; - - /// <ACE_Event_Handler> associated with the <ACE_HANDLE>. - ACE_Event_Handler *event_handler_; -}; - -/** - * @class ACE_Select_Reactor_Notify - * - * @brief Unblock the <ACE_Select_Reactor> from its event loop. - * - * This implementation is necessary for cases where the - * <ACE_Select_Reactor> is run in a multi-threaded program. In - * this case, we need to be able to unblock <select> or <poll> - * when updates occur other than in the main - * <ACE_Select_Reactor> thread. To do this, we signal an - * auto-reset event the <ACE_Select_Reactor> is listening on. - * If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is - * passed to <notify>, the appropriate <handle_*> method is - * dispatched in the context of the <ACE_Select_Reactor> thread. - */ -class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify -{ -public: - /// Constructor. - ACE_Select_Reactor_Notify (void); - - /// Destructor. - ~ACE_Select_Reactor_Notify (void); - - // = Initialization and termination methods. - /// Initialize. - virtual int open (ACE_Reactor_Impl *, - ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0); - - /// Destroy. - virtual int close (void); - - /** - * Called by a thread when it wants to unblock the - * <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if - * currently blocked in <select>/<poll>. Pass over both the - * <Event_Handler> *and* the <mask> to allow the caller to dictate - * which <Event_Handler> method the <ACE_Select_Reactor> will - * invoke. The <ACE_Time_Value> indicates how long to blocking - * trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, - * the caller will block until action is possible, else will wait - * until the relative time specified in *<timeout> elapses). - */ - virtual int notify (ACE_Event_Handler * = 0, - ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, - ACE_Time_Value * = 0); - - /// Handles pending threads (if any) that are waiting to unblock the - /// <ACE_Select_Reactor>. - virtual int dispatch_notifications (int &number_of_active_handles, - ACE_Handle_Set &rd_mask); - - /// Returns the ACE_HANDLE of the notify pipe on which the reactor - /// is listening for notifications so that other threads can unblock - /// the Select_Reactor - virtual ACE_HANDLE notify_handle (void); - - /// Handle one of the notify call on the <handle>. This could be - /// because of a thread trying to unblock the <Reactor_Impl> - virtual int dispatch_notify (ACE_Notification_Buffer &buffer); - - /// Read one of the notify call on the <handle> into the - /// <buffer>. This could be because of a thread trying to unblock - /// the <Reactor_Impl> - virtual int read_notify_pipe (ACE_HANDLE handle, - ACE_Notification_Buffer &buffer); - - /// Verify whether the buffer has dispatchable info or not. - virtual int is_dispatchable (ACE_Notification_Buffer &buffer); - - /// Called back by the <ACE_Select_Reactor> when a thread wants to - /// unblock us. - virtual int handle_input (ACE_HANDLE handle); - - /** - * Set the maximum number of times that the - * <ACE_Select_Reactor_Notify::handle_input> method will iterate and - * dispatch the <ACE_Event_Handlers> that are passed in via the - * notify pipe before breaking out of its <recv> loop. By default, - * this is set to -1, which means "iterate until the pipe is empty." - * Setting this to a value like "1 or 2" will increase "fairness" - * (and thus prevent starvation) at the expense of slightly higher - * dispatching overhead. - */ - virtual void max_notify_iterations (int); - - /** - * Get the maximum number of times that the - * <ACE_Select_Reactor_Notify::handle_input> method will iterate and - * dispatch the <ACE_Event_Handlers> that are passed in via the - * notify pipe before breaking out of its <recv> loop. - */ - virtual int max_notify_iterations (void); - - /** - * Purge any notifications pending in this reactor for the specified - * <ACE_Event_Handler> object. If <eh> == 0, all notifications for all - * handlers are removed (but not any notifications posted just to wake up - * the reactor itself). Returns the number of notifications purged. - * Returns -1 on error. - */ - virtual int purge_pending_notifications (ACE_Event_Handler *, - ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); - - /// Dump the state of an object. - virtual void dump (void) const; - - /// Declare the dynamic allocation hooks. - ACE_ALLOC_HOOK_DECLARE; - -protected: - /** - * Keep a back pointer to the <ACE_Select_Reactor>. If this value - * if NULL then the <ACE_Select_Reactor> has been initialized with - * <disable_notify_pipe>. - */ - ACE_Select_Reactor_Impl *select_reactor_; - - /** - * Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening - * on, as well as the <ACE_HANDLE> that threads wanting the - * attention of the <ACE_Select_Reactor> will write to. - */ - ACE_Pipe notification_pipe_; - - /** - * Keeps track of the maximum number of times that the - * <ACE_Select_Reactor_Notify::handle_input> method will iterate and - * dispatch the <ACE_Event_Handlers> that are passed in via the - * notify pipe before breaking out of its <recv> loop. By default, - * this is set to -1, which means "iterate until the pipe is empty." - */ - int max_notify_iterations_; - -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // = This configuration queues up notifications in separate buffers that - // are in user-space, rather than stored in a pipe in the OS - // kernel. The kernel-level notifications are used only to trigger - // the Reactor to check its notification queue. This enables many - // more notifications to be stored than would otherwise be the case. - - /// Keeps track of allocated arrays of type - /// <ACE_Notification_Buffer>. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; - - /// Keeps track of all pending notifications. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; - - /// Keeps track of all free buffers. - ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; - - /// Synchronization for handling of queues. - ACE_SYNCH_MUTEX notify_queue_lock_; -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ -}; - -/** - * @class ACE_Select_Reactor_Handler_Repository - * - * @brief Used to map <ACE_HANDLE>s onto the appropriate - * <ACE_Event_Handler> *. - * - * This class is necessary to shield differences between UNIX - * and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32 - * it's a void *. This class hides all these details from the - * bulk of the <ACE_Select_Reactor> code. All of these methods - * are called with the main <Select_Reactor> token lock held. - */ -class ACE_Export ACE_Select_Reactor_Handler_Repository -{ -public: - friend class ACE_Select_Reactor_Handler_Repository_Iterator; - - // = Initialization and termination methods. - /// Default "do-nothing" constructor. - ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &); - - /// Destructor. - ~ACE_Select_Reactor_Handler_Repository (void); - - /// Initialize a repository of the appropriate <size>. - /** - * On Unix platforms, the size parameter should be as large as the - * maximum number of file descriptors allowed for a given process. - * This is necessary since a file descriptor is used to directly - * index the array of event handlers maintained by the Reactor's - * handler repository. Direct indexing is used for efficiency - * reasons. - */ - int open (size_t size); - - /// Close down the repository. - int close (void); - - // = Search structure operations. - - /** - * Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>. - * If <index_p> is non-0, then return the index location of the - * <handle>, if found. - */ - ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0); - - /// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the - /// appropriate <ACE_Reactor_Mask> settings. - int bind (ACE_HANDLE, - ACE_Event_Handler *, - ACE_Reactor_Mask); - - /// Remove the binding of <ACE_HANDLE> in accordance with the <mask>. - int unbind (ACE_HANDLE, - ACE_Reactor_Mask mask); - - /// Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples. - int unbind_all (void); - - // = Sanity checking. - - // Check the <handle> to make sure it's a valid ACE_HANDLE that - // within the range of legal handles (i.e., >= 0 && < max_size_). - int invalid_handle (ACE_HANDLE handle); - - // Check the <handle> to make sure it's a valid ACE_HANDLE that - // within the range of currently registered handles (i.e., >= 0 && < - // max_handlep1_). - int handle_in_range (ACE_HANDLE handle); - - // = Accessors. - /// Returns the current table size. - size_t size (void) const; - - /// Maximum ACE_HANDLE value, plus 1. - size_t max_handlep1 (void); - - /// Dump the state of an object. - void dump (void) const; - /// Declare the dynamic allocation hooks. - ACE_ALLOC_HOOK_DECLARE; - -private: - /// Reference to our <Select_Reactor>. - ACE_Select_Reactor_Impl &select_reactor_; - - /// Maximum number of handles. - ssize_t max_size_; - - /// The highest currently active handle, plus 1 (ranges between 0 and - /// <max_size_>. - int max_handlep1_; - -#if defined (ACE_WIN32) - // = The mapping from <HANDLES> to <Event_Handlers>. - - /** - * The NT version implements this via a dynamically allocated - * array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE - * as a void * we can't directly index into this array. Therefore, - * we just do a linear search (for now). Next, we'll modify - * things to use hashing or something faster... - */ - ACE_Event_Tuple *event_handlers_; -#else - /** - * The UNIX version implements this via a dynamically allocated - * array of <ACE_Event_Handler *> that is indexed directly using - * the ACE_HANDLE value. - */ - ACE_Event_Handler **event_handlers_; -#endif /* ACE_WIN32 */ -}; - -/** - * @class ACE_Select_Reactor_Handler_Repository_Iterator - * - * @brief Iterate through the <ACE_Select_Reactor_Handler_Repository>. - */ -class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator -{ -public: - // = Initialization method. - ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s); - - /// dtor. - ~ACE_Select_Reactor_Handler_Repository_Iterator (void); - - // = Iteration methods. - - /// Pass back the <next_item> that hasn't been seen in the Set. - /// Returns 0 when all items have been seen, else 1. - int next (ACE_Event_Handler *&next_item); - - /// Returns 1 when all items have been seen, else 0. - int done (void) const; - - /// Move forward by one element in the set. Returns 0 when all the - /// items in the set have been seen, else 1. - int advance (void); - - /// Dump the state of an object. - void dump (void) const; - - /// Declare the dynamic allocation hooks. - ACE_ALLOC_HOOK_DECLARE; - -private: - /// Reference to the Handler_Repository we are iterating over. - const ACE_Select_Reactor_Handler_Repository *rep_; - - /// Pointer to the current iteration level. - ssize_t current_; -}; +//===================================================================== /** * @class ACE_Select_Reactor_Impl diff --git a/ace/Select_Reactor_Base.i b/ace/Select_Reactor_Base.i index f04446e2e86..6dbadd25370 100644 --- a/ace/Select_Reactor_Base.i +++ b/ace/Select_Reactor_Base.i @@ -3,58 +3,7 @@ #include "ace/Reactor.h" -ACE_INLINE -ACE_Event_Tuple::~ACE_Event_Tuple (void) -{ -} -ACE_INLINE -ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void) -{ -} - -ACE_INLINE -ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void) -{ -} - -ACE_INLINE -ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void) -{ -} - -ACE_INLINE size_t -ACE_Select_Reactor_Handler_Repository::size (void) const -{ - return this->max_size_; -} - -ACE_INLINE -ACE_Event_Tuple::ACE_Event_Tuple (void) -: handle_ (ACE_INVALID_HANDLE), - event_handler_ (0) -{ -} - -ACE_INLINE -ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, - ACE_HANDLE h) -: handle_ (h), - event_handler_ (eh) -{ -} - -ACE_INLINE int -ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const -{ - return this->handle_ == rhs.handle_; -} - -ACE_INLINE int -ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const -{ - return !(*this == rhs); -} ACE_INLINE ACE_Select_Reactor_Impl::ACE_Select_Reactor_Impl () diff --git a/ace/Select_Reactor_Handler_Repository.cpp b/ace/Select_Reactor_Handler_Repository.cpp new file mode 100644 index 00000000000..62eee31185e --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.cpp @@ -0,0 +1,459 @@ +#include "Select_Reactor_Handler_Repository.h" +#include "ACE.h" +#include "Select_Reactor_Base.h" +#include "Reactor.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Select_Reactor_Handler_Repository.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, + Select_Reactor_Handler_Repository, + "$Id$") + +#if defined (ACE_WIN32) +#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) +#else +#define ACE_SELECT_REACTOR_HANDLE(H) (H) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) +#endif /* ACE_WIN32 */ + + +ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void) +{ +} + +// Performs sanity checking on the ACE_HANDLE. +int +ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle == ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle < 0 || handle >= this->max_size_) +#endif /* ACE_WIN32 */ + { + errno = EINVAL; + return 1; + } + else + return 0; +} + +// Performs sanity checking on the ACE_HANDLE. + +int +ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle != ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle >= 0 && handle < this->max_handlep1_) +#endif /* ACE_WIN32 */ + return 1; + else + { + errno = EINVAL; + return 0; + } +} + +size_t +ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); + + return this->max_handlep1_; +} + +int +ACE_Select_Reactor_Handler_Repository::open (size_t size) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); + this->max_size_ = size; + this->max_handlep1_ = 0; + +#if defined (ACE_WIN32) + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Tuple[size], + -1); + + // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. + for (size_t h = 0; h < size; h++) + { + ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; + } +#else + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Handler *[size], + -1); + + // Initialize the ACE_Event_Handler * to NULL. + for (size_t h = 0; h < size; h++) + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; +#endif /* ACE_WIN32 */ + + // Try to increase the number of handles if <size> is greater than + // the current limit. + return ACE::set_handle_limit (size); +} + +// Initialize a repository of the appropriate <size>. + +ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository ( + ACE_Select_Reactor_Impl &select_reactor) + : select_reactor_ (select_reactor), + max_size_ (0), + max_handlep1_ (0), + event_handlers_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); +} + +int +ACE_Select_Reactor_Handler_Repository::unbind_all (void) +{ + // Unbind all of the <handle, ACE_Event_Handler>s. + for (int handle = 0; + handle < this->max_handlep1_; + handle++) + this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), + ACE_Event_Handler::ALL_EVENTS_MASK); + + return 0; +} + +int +ACE_Select_Reactor_Handler_Repository::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); + + if (this->event_handlers_ != 0) + { + this->unbind_all (); + + delete [] this->event_handlers_; + this->event_handlers_ = 0; + } + return 0; +} + +// Return the <ACE_Event_Handler *> associated with the <handle>. + +ACE_Event_Handler * +ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, + size_t *index_p) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); + + ACE_Event_Handler *eh = 0; + ssize_t i; + + // Only bother to search for the <handle> if it's in range. + if (this->handle_in_range (handle)) + { +#if defined (ACE_WIN32) + i = 0; + + for (; i < this->max_handlep1_; i++) + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); + break; + } +#else + i = handle; + + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); +#endif /* ACE_WIN32 */ + } + else + // g++ can't figure out that <i> won't be used below if the handle + // is out of range, so keep it happy by defining <i> here . . . + i = 0; + + if (eh != 0) + { + if (index_p != 0) + *index_p = i; + } + else + errno = ENOENT; + + return eh; +} + +// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>. + +int +ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); + + if (handle == ACE_INVALID_HANDLE) + handle = event_handler->get_handle (); + + if (this->invalid_handle (handle)) + return -1; + +#if defined (ACE_WIN32) + int assigned_slot = -1; + + for (ssize_t i = 0; i < this->max_handlep1_; i++) + { + // Found it, so let's just reuse this location. + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + assigned_slot = i; + break; + } + // Here's the first free slot, so let's take it. + else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE + && assigned_slot == -1) + assigned_slot = i; + } + + if (assigned_slot > -1) + // We found a free spot, let's reuse it. + { + ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; + } + else if (this->max_handlep1_ < this->max_size_) + { + // Insert at the end of the active portion. + ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; + this->max_handlep1_++; + } + else + { + // No more room at the inn! + errno = ENOMEM; + return -1; + } +#else + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; + + if (this->max_handlep1_ < handle + 1) + this->max_handlep1_ = handle + 1; +#endif /* ACE_WIN32 */ + + // Add the <mask> for this <handle> in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::ADD_MASK); + + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + return 0; +} + +// Remove the binding of <ACE_HANDLE>. + +int +ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); + + size_t slot; + ACE_Event_Handler *eh = this->find (handle, &slot); + + if (eh == 0) + return -1; + + // Clear out the <mask> bits in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::CLR_MASK); + + // And suspend_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.suspend_set_, + ACE_Reactor::CLR_MASK); + + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + // Close down the <Event_Handler> unless we've been instructed not + // to. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) + eh->handle_close (handle, mask); + + // If there are no longer any outstanding events on this <handle> + // then we can totally shut down the Event_Handler. + if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) +#if defined (ACE_WIN32) + { + ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0; + + if (this->max_handlep1_ == (int) slot + 1) + { + // We've deleted the last entry (i.e., i + 1 == the current + // size of the array), so we need to figure out the last + // valid place in the array that we should consider in + // subsequent searches. + + int i; + + for (i = this->max_handlep1_ - 1; + i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; + i--) + continue; + + this->max_handlep1_ = i + 1; + } + } +#else + { + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; + + if (this->max_handlep1_ == handle + 1) + { + // We've deleted the last entry, so we need to figure out + // the last valid place in the array that is worth looking + // at. + ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); + ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); + ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); + + ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); + ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); + ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); + + // Compute the maximum of six values. + this->max_handlep1_ = wait_rd_max; + if (this->max_handlep1_ < wait_wr_max) + this->max_handlep1_ = wait_wr_max; + if (this->max_handlep1_ < wait_ex_max) + this->max_handlep1_ = wait_ex_max; + + if (this->max_handlep1_ < suspend_rd_max) + this->max_handlep1_ = suspend_rd_max; + if (this->max_handlep1_ < suspend_wr_max) + this->max_handlep1_ = suspend_wr_max; + if (this->max_handlep1_ < suspend_ex_max) + this->max_handlep1_ = suspend_ex_max; + + this->max_handlep1_++; + } + } +#endif /* ACE_WIN32 */ + + return 0; +} + +/****************************************************************************/ + +ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void) +{ +} + +ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator + (const ACE_Select_Reactor_Handler_Repository *s) + : rep_ (s), + current_ (-1) +{ + this->advance (); +} + +// Pass back the <next_item> that hasn't been seen in the Set. +// Returns 0 when all items have been seen, else 1. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) +{ + int result = 1; + + if (this->current_ >= this->rep_->max_handlep1_) + result = 0; + else + next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, + this->current_); + return result; +} + +int +ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const +{ + return this->current_ >= this->rep_->max_handlep1_; +} + +// Move forward by one element in the set. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) +{ + if (this->current_ < this->rep_->max_handlep1_) + this->current_++; + + while (this->current_ < this->rep_->max_handlep1_) + if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) + return 1; + else + this->current_++; + + return this->current_ < this->rep_->max_handlep1_; +} + +// Dump the state of an object. + +void +ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +void +ACE_Select_Reactor_Handler_Repository::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), + this->max_handlep1_, this->max_size_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("["))); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); + iter.next (eh) != 0; + iter.advance ()) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"), + eh, eh->get_handle ())); + + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]"))); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) diff --git a/ace/Select_Reactor_Handler_Repository.h b/ace/Select_Reactor_Handler_Repository.h new file mode 100644 index 00000000000..b6cc5bc7189 --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.h @@ -0,0 +1,230 @@ +/* -*- C++ -*- */ +//============================================================================= +/** + * @file Select_Reactor_Handler_Repository.h + * + * $Id$ + * + * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> + */ +//============================================================================= + +#ifndef ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H +#define ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H + +#include "ace/pre.h" + +#include "ACE_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "Event_Handler.h" + +class ACE_Select_Reactor_Impl; +class ACE_Event_Handler; + +/** + * @class ACE_Event_Tuple + * + * @brief An ACE_Event_Handler and its associated ACE_HANDLE. + * + * One <ACE_Event_Handler> is registered for one or more + * <ACE_HANDLE>. At various points, this information must be + * stored explicitly. This class provides a lightweight + * mechanism to do so. + */ +class ACE_Export ACE_Event_Tuple +{ +public: + /// Default constructor. + ACE_Event_Tuple (void); + + /// Constructor. + ACE_Event_Tuple (ACE_Event_Handler *eh, + ACE_HANDLE h); + + /// Destructor. + ~ACE_Event_Tuple (void); + + /// Equality operator. + int operator== (const ACE_Event_Tuple &rhs) const; + + /// Inequality operator. + int operator!= (const ACE_Event_Tuple &rhs) const; + + /// Handle. + ACE_HANDLE handle_; + + /// <ACE_Event_Handler> associated with the <ACE_HANDLE>. + ACE_Event_Handler *event_handler_; +}; + + +//=================================================================== +/** + * @class ACE_Select_Reactor_Handler_Repository + * + * @brief Used to map <ACE_HANDLE>s onto the appropriate + * <ACE_Event_Handler> *. + * + * This class is necessary to shield differences between UNIX + * and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32 + * it's a void *. This class hides all these details from the + * bulk of the <ACE_Select_Reactor> code. All of these methods + * are called with the main <Select_Reactor> token lock held. + */ +class ACE_Export ACE_Select_Reactor_Handler_Repository +{ +public: + friend class ACE_Select_Reactor_Handler_Repository_Iterator; + + // = Initialization and termination methods. + /// Default "do-nothing" constructor. + ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &); + + /// Destructor. + ~ACE_Select_Reactor_Handler_Repository (void); + + /// Initialize a repository of the appropriate <size>. + /** + * On Unix platforms, the size parameter should be as large as the + * maximum number of file descriptors allowed for a given process. + * This is necessary since a file descriptor is used to directly + * index the array of event handlers maintained by the Reactor's + * handler repository. Direct indexing is used for efficiency + * reasons. + */ + int open (size_t size); + + /// Close down the repository. + int close (void); + + // = Search structure operations. + + /** + * Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>. + * If <index_p> is non-0, then return the index location of the + * <handle>, if found. + */ + ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0); + + /// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the + /// appropriate <ACE_Reactor_Mask> settings. + int bind (ACE_HANDLE, + ACE_Event_Handler *, + ACE_Reactor_Mask); + + /// Remove the binding of <ACE_HANDLE> in accordance with the <mask>. + int unbind (ACE_HANDLE, + ACE_Reactor_Mask mask); + + /// Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples. + int unbind_all (void); + + // = Sanity checking. + + // Check the <handle> to make sure it's a valid ACE_HANDLE that + // within the range of legal handles (i.e., >= 0 && < max_size_). + int invalid_handle (ACE_HANDLE handle); + + // Check the <handle> to make sure it's a valid ACE_HANDLE that + // within the range of currently registered handles (i.e., >= 0 && < + // max_handlep1_). + int handle_in_range (ACE_HANDLE handle); + + // = Accessors. + /// Returns the current table size. + size_t size (void) const; + + /// Maximum ACE_HANDLE value, plus 1. + size_t max_handlep1 (void); + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + /// Reference to our <Select_Reactor>. + ACE_Select_Reactor_Impl &select_reactor_; + + /// Maximum number of handles. + ssize_t max_size_; + + /// The highest currently active handle, plus 1 (ranges between 0 and + /// <max_size_>. + int max_handlep1_; + +#if defined (ACE_WIN32) + // = The mapping from <HANDLES> to <Event_Handlers>. + + /** + * The NT version implements this via a dynamically allocated + * array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE + * as a void * we can't directly index into this array. Therefore, + * we just do a linear search (for now). Next, we'll modify + * things to use hashing or something faster... + */ + ACE_Event_Tuple *event_handlers_; +#else + /** + * The UNIX version implements this via a dynamically allocated + * array of <ACE_Event_Handler *> that is indexed directly using + * the ACE_HANDLE value. + */ + ACE_Event_Handler **event_handlers_; +#endif /* ACE_WIN32 */ +}; + +//================================================================= + +/** + * @class ACE_Select_Reactor_Handler_Repository_Iterator + * + * @brief Iterate through the <ACE_Select_Reactor_Handler_Repository>. + */ +class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator +{ +public: + // = Initialization method. + ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s); + + /// dtor. + ~ACE_Select_Reactor_Handler_Repository_Iterator (void); + + // = Iteration methods. + + /// Pass back the <next_item> that hasn't been seen in the Set. + /// Returns 0 when all items have been seen, else 1. + int next (ACE_Event_Handler *&next_item); + + /// Returns 1 when all items have been seen, else 0. + int done (void) const; + + /// Move forward by one element in the set. Returns 0 when all the + /// items in the set have been seen, else 1. + int advance (void); + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + /// Reference to the Handler_Repository we are iterating over. + const ACE_Select_Reactor_Handler_Repository *rep_; + + /// Pointer to the current iteration level. + ssize_t current_; +}; + + +#if defined (__ACE_INLINE__) +#include "Select_Reactor_Handler_Repository.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /*ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H*/ diff --git a/ace/Select_Reactor_Handler_Repository.inl b/ace/Select_Reactor_Handler_Repository.inl new file mode 100644 index 00000000000..9c9c234ec56 --- /dev/null +++ b/ace/Select_Reactor_Handler_Repository.inl @@ -0,0 +1,41 @@ +/* -*- C++ -*- */ +//$Id$ +ACE_INLINE +ACE_Event_Tuple::~ACE_Event_Tuple (void) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (void) +: handle_ (ACE_INVALID_HANDLE), + event_handler_ (0) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, + ACE_HANDLE h) +: handle_ (h), + event_handler_ (eh) +{ +} + +ACE_INLINE int +ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const +{ + return this->handle_ == rhs.handle_; +} + +ACE_INLINE int +ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const +{ + return !(*this == rhs); +} + +/************************************************************/ + +ACE_INLINE size_t +ACE_Select_Reactor_Handler_Repository::size (void) const +{ + return this->max_size_; +} diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp new file mode 100644 index 00000000000..18886c44a36 --- /dev/null +++ b/ace/Select_Reactor_Notify.cpp @@ -0,0 +1,576 @@ +#include "Select_Reactor_Notify.h" +#include "ACE.h" +#include "Select_Reactor_Base.h" + +ACE_RCSID(ace, + Select_Reactor_Notify, + "$Id$") + + +ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void) + : max_notify_iterations_ (-1) +{ +} + +ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void) +{ +} + +void +ACE_Select_Reactor_Notify::max_notify_iterations (int iterations) +{ + // Must always be > 0 or < 0 to optimize the loop exit condition. + if (iterations == 0) + iterations = 1; + + this->max_notify_iterations_ = iterations; +} + +int +ACE_Select_Reactor_Notify::max_notify_iterations (void) +{ + return this->max_notify_iterations_; +} + +int +ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask ) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (this->notify_queue_.is_empty ()) + return 0; + + ACE_Notification_Buffer *temp; + ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; + + size_t queue_size = this->notify_queue_.size (); + int number_purged = 0; + size_t i; + for (i = 0; i < queue_size; ++i) + { + if (-1 == this->notify_queue_.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + + // If this is not a Reactor notify (it is for a particular handler), + // and it matches the specified handler (or purging all), + // and applying the mask would totally eliminate the notification, then + // release it and count the number purged. + if ((0 != temp->eh_) && + (0 == eh || eh == temp->eh_) && + ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask + // is left with nothing when + // applying the mask + { + if (-1 == this->free_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + ++number_purged; + } + else + { + // To preserve it, move it to the local_queue. + // But first, if this is not a Reactor notify (it is for a particularhandler), + // and it matches the specified handler (or purging all), then + // apply the mask + if ((0 != temp->eh_) && + (0 == eh || eh == temp->eh_)) + ACE_CLR_BITS(temp->mask_, mask); + if (-1 == local_queue.enqueue_head (temp)) + return -1; + } + } + + if (this->notify_queue_.size ()) + { // should be empty! + ACE_ASSERT (0); + return -1; + } + + // now put it back in the notify queue + queue_size = local_queue.size (); + for (i = 0; i < queue_size; ++i) + { + if (-1 == local_queue.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + + if (-1 == this->notify_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } + + return number_purged; + +#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ + ACE_UNUSED_ARG (eh); + ACE_UNUSED_ARG (mask); + ACE_NOTSUP_RETURN (-1); +#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ +} + +void +ACE_Select_Reactor_Notify::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_)); + this->notification_pipe_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +int +ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, + ACE_Timer_Queue *, + int disable_notify_pipe) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::open"); + + if (disable_notify_pipe == 0) + { + this->select_reactor_ = + ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); + + if (select_reactor_ == 0) + { + errno = EINVAL; + return -1; + } + + if (this->notification_pipe_.open () == -1) + return -1; +#if defined (F_SETFD) + ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); + ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); +#endif /* F_SETFD */ + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer *temp; + + ACE_NEW_RETURN (temp, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp) == -1) + { + delete [] temp; + return -1; + } + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++) + if (free_queue_.enqueue_head (temp + i) == -1) + return -1; + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + // There seems to be a Win32 bug with this... Set this into + // non-blocking mode. + if (ACE::set_flags (this->notification_pipe_.read_handle (), + ACE_NONBLOCK) == -1) + return -1; + else + return this->select_reactor_->register_handler + (this->notification_pipe_.read_handle (), + this, + ACE_Event_Handler::READ_MASK); + } + else + { + this->select_reactor_ = 0; + return 0; + } +} + +int +ACE_Select_Reactor_Notify::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::close"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Free up the dynamically allocated resources. + ACE_Notification_Buffer **b; + + for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + { + delete [] *b; + *b = 0; + } + + this->alloc_queue_.reset (); + this->notify_queue_.reset (); + this->free_queue_.reset (); +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + return this->notification_pipe_.close (); +} + +int +ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); + + // Just consider this method a "no-op" if there's no + // <ACE_Select_Reactor> configured. + if (this->select_reactor_ == 0) + return 0; + + ACE_Notification_Buffer buffer (eh, mask); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Artificial scope to limit the duration of the mutex. + { + // int notification_required = 0; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + // No pending notifications. + + // We will send notify for every message.. + // if (this->notify_queue_.is_empty ()) + // notification_required = 1; + + ACE_Notification_Buffer *temp = 0; + + if (free_queue_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; + + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_queue_.enqueue_head (temp1) == -1) + { + delete [] temp1; + return -1; + } + + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_queue_.enqueue_head (temp1 + i); + + temp = temp1; + } + + ACE_ASSERT (temp != 0); + *temp = buffer; + + if (notify_queue_.enqueue_tail (temp) == -1) + return -1; + } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + if (n == -1) + return -1; + + return 0; +} + +// Handles pending threads (if any) that are waiting to unblock the +// Select_Reactor. + +int +ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, + ACE_Handle_Set &rd_mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications"); + + ACE_HANDLE read_handle = + this->notification_pipe_.read_handle (); + + if (read_handle != ACE_INVALID_HANDLE + && rd_mask.is_set (read_handle)) + { + number_of_active_handles--; + rd_mask.clr_bit (read_handle); + return this->handle_input (read_handle); + } + else + return 0; +} + + +ACE_HANDLE +ACE_Select_Reactor_Notify::notify_handle (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); + + return this->notification_pipe_.read_handle (); +} + + +// Special trick to unblock <select> when updates occur in somewhere +// other than the main <ACE_Select_Reactor> thread. All we do is +// write data to a pipe that the <ACE_Select_Reactor> is listening on. +// Thanks to Paul Stephenson for suggesting this approach. +int +ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer) +{ + // There is tonnes of code that can be abstracted... +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + ACE_UNUSED_ARG (buffer); + + // If the queue is empty just return 0 + if (notify_queue_.is_empty ()) + return 0; + + if (this->notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + if (temp->eh_ != 0) + { + // If the queue had a buffer that has an event handler, put + // the element back in the queue and return a 1 + if (this->notify_queue_.enqueue_head (temp) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enque_head")), + -1); + } + + return 1; + } + // Else put the element in the free queue + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } +#else + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + return 1; + +#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + // has no dispatchable buffer + return 0; +} + +int +ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) +{ + int result = 0; +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the <notify_queue_>. + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + if (notify_queue_.is_empty ()) + return 0; + else if (notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } + + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } +#else + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::QOS_MASK: + result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::GROUP_QOS_MASK: + result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("invalid mask = %d\n"), + buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + return 1; +} + +int +ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe"); + + ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer); + + if (n > 0) + { + // Check to see if we've got a short read. + if (n != sizeof buffer) + { + ssize_t remainder = sizeof buffer - n; + + // If so, try to recover by reading the remainder. If this + // doesn't work we're in big trouble since the input stream + // won't be aligned correctly. I'm not sure quite what to + // do at this point. It's probably best just to return -1. + if (ACE::recv (handle, + ((char *) &buffer) + n, + remainder) != remainder) + return -1; + } + + + return 1; + } + + // Return -1 if things have gone seriously wrong. + if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) + return -1; + + return 0; +} + + +int +ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); + // Precondition: this->select_reactor_.token_.current_owner () == + // ACE_Thread::self (); + + int number_dispatched = 0; + int result = 0; + ACE_Notification_Buffer buffer; + + while ((result = this->read_notify_pipe (handle, buffer)) > 0) + { + // Dispatch the buffer + // NOTE: We count only if we made any dispatches ie. upcalls. + if (this->dispatch_notify (buffer) > 0) + number_dispatched++; + + // Bail out if we've reached the <notify_threshold_>. Note that + // by default <notify_threshold_> is -1, so we'll loop until all + // the notifications in the pipe have been dispatched. + if (number_dispatched == this->max_notify_iterations_) + break; + } + + // Reassign number_dispatched to -1 if things have gone seriously + // wrong. + if (result < 0) + number_dispatched = -1; + + // Enqueue ourselves into the list of waiting threads. When we + // reacquire the token we'll be off and running again with ownership + // of the token. The postcondition of this call is that + // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. + this->select_reactor_->renew (); + return number_dispatched; +} + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>; +template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>; +template class ACE_Node <ACE_Notification_Buffer *>; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *> +#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *> +#pragma instantiate ACE_Node <ACE_Notification_Buffer *> +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Select_Reactor_Notify.h b/ace/Select_Reactor_Notify.h new file mode 100644 index 00000000000..5e1b972021e --- /dev/null +++ b/ace/Select_Reactor_Notify.h @@ -0,0 +1,189 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file Select_Reactor_Base.h + * + * $Id$ + * + * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> + */ +//============================================================================= + +#ifndef ACE_SELECT_REACTOR_NOTIFY_H +#define ACE_SELECT_REACTOR_NOTIFY_H +#include "ace/pre.h" +#include "Reactor_Impl.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "ace/Pipe.h" + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +#include "Unbounded_Queue.h" +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + +class ACE_Select_Reactor_Impl; + + +/** + * @class ACE_Select_Reactor_Notify + * + * @brief Unblock the <ACE_Select_Reactor> from its event loop. + * + * This implementation is necessary for cases where the + * <ACE_Select_Reactor> is run in a multi-threaded program. In + * this case, we need to be able to unblock <select> or <poll> + * when updates occur other than in the main + * <ACE_Select_Reactor> thread. To do this, we signal an + * auto-reset event the <ACE_Select_Reactor> is listening on. + * If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is + * passed to <notify>, the appropriate <handle_*> method is + * dispatched in the context of the <ACE_Select_Reactor> thread. + */ +class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify +{ +public: + /// Constructor. + ACE_Select_Reactor_Notify (void); + + /// Destructor. + ~ACE_Select_Reactor_Notify (void); + + // = Initialization and termination methods. + /// Initialize. + virtual int open (ACE_Reactor_Impl *, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0); + + /// Destroy. + virtual int close (void); + + /** + * Called by a thread when it wants to unblock the + * <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if + * currently blocked in <select>/<poll>. Pass over both the + * <Event_Handler> *and* the <mask> to allow the caller to dictate + * which <Event_Handler> method the <ACE_Select_Reactor> will + * invoke. The <ACE_Time_Value> indicates how long to blocking + * trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, + * the caller will block until action is possible, else will wait + * until the relative time specified in *<timeout> elapses). + */ + virtual int notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0); + + /// Handles pending threads (if any) that are waiting to unblock the + /// <ACE_Select_Reactor>. + virtual int dispatch_notifications (int &number_of_active_handles, + ACE_Handle_Set &rd_mask); + + /// Returns the ACE_HANDLE of the notify pipe on which the reactor + /// is listening for notifications so that other threads can unblock + /// the Select_Reactor + virtual ACE_HANDLE notify_handle (void); + + /// Handle one of the notify call on the <handle>. This could be + /// because of a thread trying to unblock the <Reactor_Impl> + virtual int dispatch_notify (ACE_Notification_Buffer &buffer); + + /// Read one of the notify call on the <handle> into the + /// <buffer>. This could be because of a thread trying to unblock + /// the <Reactor_Impl> + virtual int read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer); + + /// Verify whether the buffer has dispatchable info or not. + virtual int is_dispatchable (ACE_Notification_Buffer &buffer); + + /// Called back by the <ACE_Select_Reactor> when a thread wants to + /// unblock us. + virtual int handle_input (ACE_HANDLE handle); + + /** + * Set the maximum number of times that the + * <ACE_Select_Reactor_Notify::handle_input> method will iterate and + * dispatch the <ACE_Event_Handlers> that are passed in via the + * notify pipe before breaking out of its <recv> loop. By default, + * this is set to -1, which means "iterate until the pipe is empty." + * Setting this to a value like "1 or 2" will increase "fairness" + * (and thus prevent starvation) at the expense of slightly higher + * dispatching overhead. + */ + virtual void max_notify_iterations (int); + + /** + * Get the maximum number of times that the + * <ACE_Select_Reactor_Notify::handle_input> method will iterate and + * dispatch the <ACE_Event_Handlers> that are passed in via the + * notify pipe before breaking out of its <recv> loop. + */ + virtual int max_notify_iterations (void); + + /** + * Purge any notifications pending in this reactor for the specified + * <ACE_Event_Handler> object. If <eh> == 0, all notifications for all + * handlers are removed (but not any notifications posted just to wake up + * the reactor itself). Returns the number of notifications purged. + * Returns -1 on error. + */ + virtual int purge_pending_notifications (ACE_Event_Handler *, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + + /// Dump the state of an object. + virtual void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +protected: + /** + * Keep a back pointer to the <ACE_Select_Reactor>. If this value + * if NULL then the <ACE_Select_Reactor> has been initialized with + * <disable_notify_pipe>. + */ + ACE_Select_Reactor_Impl *select_reactor_; + + /** + * Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening + * on, as well as the <ACE_HANDLE> that threads wanting the + * attention of the <ACE_Select_Reactor> will write to. + */ + ACE_Pipe notification_pipe_; + + /** + * Keeps track of the maximum number of times that the + * <ACE_Select_Reactor_Notify::handle_input> method will iterate and + * dispatch the <ACE_Event_Handlers> that are passed in via the + * notify pipe before breaking out of its <recv> loop. By default, + * this is set to -1, which means "iterate until the pipe is empty." + */ + int max_notify_iterations_; + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // = This configuration queues up notifications in separate buffers that + // are in user-space, rather than stored in a pipe in the OS + // kernel. The kernel-level notifications are used only to trigger + // the Reactor to check its notification queue. This enables many + // more notifications to be stored than would otherwise be the case. + + /// Keeps track of allocated arrays of type + /// <ACE_Notification_Buffer>. + ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_; + + /// Keeps track of all pending notifications. + ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_; + + /// Keeps track of all free buffers. + ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_; + + /// Synchronization for handling of queues. + ACE_SYNCH_MUTEX notify_queue_lock_; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +}; + + +#include "ace/post.h" +#endif /*ACE_SELECT_REACTOR_NOTIFY_H*/ diff --git a/ace/Select_Reactor_T.cpp b/ace/Select_Reactor_T.cpp index 9a45c4183ff..51ea42f35d6 100644 --- a/ace/Select_Reactor_T.cpp +++ b/ace/Select_Reactor_T.cpp @@ -13,6 +13,7 @@ #include "ace/Log_Msg.h" #include "ace/Thread.h" #include "ace/Timer_Heap.h" +#include "Select_Reactor_Notify.h" // @@ The latest version of SunCC can't grok the code if we put inline // function here. Therefore, we temporarily disable the code here. @@ -996,7 +997,7 @@ ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i } template <class ACE_SELECT_REACTOR_TOKEN> int -ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::work_pending +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::work_pending (const ACE_Time_Value &max_wait_time) { ACE_TRACE ("ACE_Select_Reactor_T::work_pending"); diff --git a/ace/ace_dll.dsp b/ace/ace_dll.dsp index cd7cbeb09c3..ea8a5c4ff7d 100644 --- a/ace/ace_dll.dsp +++ b/ace/ace_dll.dsp @@ -762,6 +762,14 @@ SOURCE=.\Select_Reactor_Base.cpp # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Service_Config.cpp
# End Source File
# Begin Source File
@@ -1914,6 +1922,14 @@ SOURCE=.\Select_Reactor_Base.h # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.h
# End Source File
# Begin Source File
@@ -2890,6 +2906,10 @@ SOURCE=.\Select_Reactor_Base.i # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.i
# End Source File
# Begin Source File
diff --git a/ace/ace_lib.dsp b/ace/ace_lib.dsp index 9aff45872f0..ca98ce59af1 100644 --- a/ace/ace_lib.dsp +++ b/ace/ace_lib.dsp @@ -42,8 +42,8 @@ RSC=rc.exe # PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Release"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /MD /W3 /GX /O1 /I "../" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /FD /c
# SUBTRACT BASE CPP /YX
# ADD CPP /nologo /MT /W3 /GX /O1 /I "../" /I "../PACE" /D ACE_OS_HAS_DLL=0 /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /FD /c
@@ -69,8 +69,8 @@ LIB32=link.exe -lib # PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Debug"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /Gy /I "../" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /FD /c
# SUBTRACT BASE CPP /YX
# ADD CPP /nologo /MTd /W3 /GX /Z7 /Od /Gy /I "../" /I "../PACE" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D ACE_OS_HAS_DLL=0 /FD /c
@@ -96,8 +96,8 @@ LIB32=link.exe -lib # PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Release"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /G5 /MT /W3 /GX /O1 /I "../" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /YX /FD /c
# ADD CPP /nologo /MD /W3 /GX /Zi /O1 /I "../" /I "../PACE" /D "_WINDOWS" /D "NDEBUG" /D "ACE_AS_STATIC_LIBS" /D "WIN32" /FD /c
# SUBTRACT CPP /YX
@@ -122,8 +122,8 @@ LIB32=link.exe -lib # PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Debug"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /G5 /MTd /W3 /Gm /GX /Zi /Od /Gy /I "../" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /YX /FD /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /Gy /I "../" /I "../PACE" /D "_WINDOWS" /D "_DEBUG" /D "ACE_AS_STATIC_LIBS" /D "WIN32" /FD /c
# SUBTRACT CPP /YX
@@ -753,6 +753,14 @@ SOURCE=.\Select_Reactor_Base.cpp # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Service_Config.cpp
# End Source File
# Begin Source File
@@ -1901,6 +1909,14 @@ SOURCE=.\Select_Reactor_Base.h # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.h
# End Source File
# Begin Source File
@@ -2877,6 +2893,10 @@ SOURCE=.\Select_Reactor_Base.i # End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.i
# End Source File
# Begin Source File
diff --git a/tests/ChangeLog b/tests/ChangeLog index 4c22a4680c1..55f05fcf975 100644 --- a/tests/ChangeLog +++ b/tests/ChangeLog @@ -1,3 +1,28 @@ +Tue Aug 06 00:39:15 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * ace/Select_Reactor_Notify.h: + * ace/Select_Reactor_Notify.cpp: Moved the declaration and + definition of ACE_Select_Reactor_Notify from + Select_Reactor_Base.* to the above files. + + * ace/Select_Reactor_Handler_Repository.h: + * ace/Select_Reactor_Handler_Repository.cpp: + * ace/Select_Reactor_Handler_Repository.inl: Moved the declaration + and definition of ACE_Select_Reactor_Handler_Repository from + Select_Reactor_Base.* to the above files. + + * ace/Select_Reactor_Base.h: + * ace/Select_Reactor_Base.cpp: + * ace/Select_Reactor_Base.i: Moved the classes mentioned above to + a new file. The above classes were cluttering the file and + things were getting confusing posing maintenance nightmare. + + * ace/Select_Reactor_T.cpp: Needed an include to compile on + Win32. + + * ace/ace_dll.dsp: + * ace/ace_lib.dsp: Added the new files to the project files. + Mon Aug 5 22:37:13 2002 Balachandran Natarajan <bala@cs.wustl.edu> * tests/Makefile: Added the new test to the Makefile. |