summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-08-08 13:58:09 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-08-08 13:58:09 +0000
commita6eceedfa723c56b54cac8f3a1714d7aafd1abd6 (patch)
treea3db83114ba30cd300ae76dc6c743bafe92cecc9
parent3b70657b3925dcd1369f9056cb4c488700b02db1 (diff)
downloadATCD-a6eceedfa723c56b54cac8f3a1714d7aafd1abd6.tar.gz
ChangeLogTag:Tue Aug 06 00:39:15 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--ace/Select_Reactor_Base.cpp1005
-rw-r--r--ace/Select_Reactor_Base.h357
-rw-r--r--ace/Select_Reactor_Base.i51
-rw-r--r--ace/Select_Reactor_Handler_Repository.cpp459
-rw-r--r--ace/Select_Reactor_Handler_Repository.h230
-rw-r--r--ace/Select_Reactor_Handler_Repository.inl41
-rw-r--r--ace/Select_Reactor_Notify.cpp576
-rw-r--r--ace/Select_Reactor_Notify.h189
-rw-r--r--ace/Select_Reactor_T.cpp3
-rw-r--r--ace/ace_dll.dsp20
-rw-r--r--ace/ace_lib.dsp28
-rw-r--r--tests/ChangeLog25
12 files changed, 1569 insertions, 1415 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index f74cc42835c..a1777a443a5 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -15,997 +15,6 @@
ACE_RCSID(ace, Select_Reactor_Base, "$Id$")
-#if defined (ACE_WIN32)
-#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
-#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
-#else
-#define ACE_SELECT_REACTOR_HANDLE(H) (H)
-#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
-#endif /* ACE_WIN32 */
-
-// Performs sanity checking on the ACE_HANDLE.
-
-int
-ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
-#if defined (ACE_WIN32)
- // It's too expensive to perform more exhaustive validity checks on
- // Win32 due to the way that they implement SOCKET HANDLEs.
- if (handle == ACE_INVALID_HANDLE)
-#else /* !ACE_WIN32 */
- if (handle < 0 || handle >= this->max_size_)
-#endif /* ACE_WIN32 */
- {
- errno = EINVAL;
- return 1;
- }
- else
- return 0;
-}
-
-// Performs sanity checking on the ACE_HANDLE.
-
-int
-ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
-#if defined (ACE_WIN32)
- // It's too expensive to perform more exhaustive validity checks on
- // Win32 due to the way that they implement SOCKET HANDLEs.
- if (handle != ACE_INVALID_HANDLE)
-#else /* !ACE_WIN32 */
- if (handle >= 0 && handle < this->max_handlep1_)
-#endif /* ACE_WIN32 */
- return 1;
- else
- {
- errno = EINVAL;
- return 0;
- }
-}
-
-size_t
-ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
-
- return this->max_handlep1_;
-}
-
-int
-ACE_Select_Reactor_Handler_Repository::open (size_t size)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
- this->max_size_ = size;
- this->max_handlep1_ = 0;
-
-#if defined (ACE_WIN32)
- // Try to allocate the memory.
- ACE_NEW_RETURN (this->event_handlers_,
- ACE_Event_Tuple[size],
- -1);
-
- // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }.
- for (size_t h = 0; h < size; h++)
- {
- ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
- }
-#else
- // Try to allocate the memory.
- ACE_NEW_RETURN (this->event_handlers_,
- ACE_Event_Handler *[size],
- -1);
-
- // Initialize the ACE_Event_Handler * to NULL.
- for (size_t h = 0; h < size; h++)
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
-#endif /* ACE_WIN32 */
-
- // Try to increase the number of handles if <size> is greater than
- // the current limit.
- return ACE::set_handle_limit (size);
-}
-
-// Initialize a repository of the appropriate <size>.
-
-ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
- : select_reactor_ (select_reactor),
- max_size_ (0),
- max_handlep1_ (0),
- event_handlers_ (0)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
-}
-
-int
-ACE_Select_Reactor_Handler_Repository::unbind_all (void)
-{
- // Unbind all of the <handle, ACE_Event_Handler>s.
- for (int handle = 0;
- handle < this->max_handlep1_;
- handle++)
- this->unbind (ACE_SELECT_REACTOR_HANDLE (handle),
- ACE_Event_Handler::ALL_EVENTS_MASK);
-
- return 0;
-}
-
-int
-ACE_Select_Reactor_Handler_Repository::close (void)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
-
- if (this->event_handlers_ != 0)
- {
- this->unbind_all ();
-
- delete [] this->event_handlers_;
- this->event_handlers_ = 0;
- }
- return 0;
-}
-
-// Return the <ACE_Event_Handler *> associated with the <handle>.
-
-ACE_Event_Handler *
-ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
- size_t *index_p)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
-
- ACE_Event_Handler *eh = 0;
- ssize_t i;
-
- // Only bother to search for the <handle> if it's in range.
- if (this->handle_in_range (handle))
- {
-#if defined (ACE_WIN32)
- i = 0;
-
- for (; i < this->max_handlep1_; i++)
- if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
- {
- eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
- break;
- }
-#else
- i = handle;
-
- eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
-#endif /* ACE_WIN32 */
- }
- else
- // g++ can't figure out that <i> won't be used below if the handle
- // is out of range, so keep it happy by defining <i> here . . .
- i = 0;
-
- if (eh != 0)
- {
- if (index_p != 0)
- *index_p = i;
- }
- else
- errno = ENOENT;
-
- return eh;
-}
-
-// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
-
-int
-ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
- ACE_Event_Handler *event_handler,
- ACE_Reactor_Mask mask)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
-
- if (handle == ACE_INVALID_HANDLE)
- handle = event_handler->get_handle ();
-
- if (this->invalid_handle (handle))
- return -1;
-
-#if defined (ACE_WIN32)
- int assigned_slot = -1;
-
- for (ssize_t i = 0; i < this->max_handlep1_; i++)
- {
- // Found it, so let's just reuse this location.
- if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
- {
- assigned_slot = i;
- break;
- }
- // Here's the first free slot, so let's take it.
- else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE
- && assigned_slot == -1)
- assigned_slot = i;
- }
-
- if (assigned_slot > -1)
- // We found a free spot, let's reuse it.
- {
- ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
- }
- else if (this->max_handlep1_ < this->max_size_)
- {
- // Insert at the end of the active portion.
- ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
- this->max_handlep1_++;
- }
- else
- {
- // No more room at the inn!
- errno = ENOMEM;
- return -1;
- }
-#else
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
-
- if (this->max_handlep1_ < handle + 1)
- this->max_handlep1_ = handle + 1;
-#endif /* ACE_WIN32 */
-
- // Add the <mask> for this <handle> in the Select_Reactor's wait_set.
- this->select_reactor_.bit_ops (handle,
- mask,
- this->select_reactor_.wait_set_,
- ACE_Reactor::ADD_MASK);
-
- // Note the fact that we've changed the state of the <wait_set_>,
- // which is used by the dispatching loop to determine whether it can
- // keep going or if it needs to reconsult select().
- this->select_reactor_.state_changed_ = 1;
-
- return 0;
-}
-
-// Remove the binding of <ACE_HANDLE>.
-
-int
-ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
- ACE_Reactor_Mask mask)
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
-
- size_t slot;
- ACE_Event_Handler *eh = this->find (handle, &slot);
-
- if (eh == 0)
- return -1;
-
- // Clear out the <mask> bits in the Select_Reactor's wait_set.
- this->select_reactor_.bit_ops (handle,
- mask,
- this->select_reactor_.wait_set_,
- ACE_Reactor::CLR_MASK);
-
- // And suspend_set.
- this->select_reactor_.bit_ops (handle,
- mask,
- this->select_reactor_.suspend_set_,
- ACE_Reactor::CLR_MASK);
-
- // Note the fact that we've changed the state of the <wait_set_>,
- // which is used by the dispatching loop to determine whether it can
- // keep going or if it needs to reconsult select().
- this->select_reactor_.state_changed_ = 1;
-
- // Close down the <Event_Handler> unless we've been instructed not
- // to.
- if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
- eh->handle_close (handle, mask);
-
- // If there are no longer any outstanding events on this <handle>
- // then we can totally shut down the Event_Handler.
- if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0
- && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0
- && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0)
-#if defined (ACE_WIN32)
- {
- ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
-
- if (this->max_handlep1_ == (int) slot + 1)
- {
- // We've deleted the last entry (i.e., i + 1 == the current
- // size of the array), so we need to figure out the last
- // valid place in the array that we should consider in
- // subsequent searches.
-
- int i;
-
- for (i = this->max_handlep1_ - 1;
- i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
- i--)
- continue;
-
- this->max_handlep1_ = i + 1;
- }
- }
-#else
- {
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0;
-
- if (this->max_handlep1_ == handle + 1)
- {
- // We've deleted the last entry, so we need to figure out
- // the last valid place in the array that is worth looking
- // at.
- ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
- ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
- ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
-
- ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
- ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
- ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
-
- // Compute the maximum of six values.
- this->max_handlep1_ = wait_rd_max;
- if (this->max_handlep1_ < wait_wr_max)
- this->max_handlep1_ = wait_wr_max;
- if (this->max_handlep1_ < wait_ex_max)
- this->max_handlep1_ = wait_ex_max;
-
- if (this->max_handlep1_ < suspend_rd_max)
- this->max_handlep1_ = suspend_rd_max;
- if (this->max_handlep1_ < suspend_wr_max)
- this->max_handlep1_ = suspend_wr_max;
- if (this->max_handlep1_ < suspend_ex_max)
- this->max_handlep1_ = suspend_ex_max;
-
- this->max_handlep1_++;
- }
- }
-#endif /* ACE_WIN32 */
-
- return 0;
-}
-
-ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
- (const ACE_Select_Reactor_Handler_Repository *s)
- : rep_ (s),
- current_ (-1)
-{
- this->advance ();
-}
-
-// Pass back the <next_item> that hasn't been seen in the Set.
-// Returns 0 when all items have been seen, else 1.
-
-int
-ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
-{
- int result = 1;
-
- if (this->current_ >= this->rep_->max_handlep1_)
- result = 0;
- else
- next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
- this->current_);
- return result;
-}
-
-int
-ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
-{
- return this->current_ >= this->rep_->max_handlep1_;
-}
-
-// Move forward by one element in the set.
-
-int
-ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
-{
- if (this->current_ < this->rep_->max_handlep1_)
- this->current_++;
-
- while (this->current_ < this->rep_->max_handlep1_)
- if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
- return 1;
- else
- this->current_++;
-
- return this->current_ < this->rep_->max_handlep1_;
-}
-
-// Dump the state of an object.
-
-void
-ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
-
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
- ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
-}
-
-void
-ACE_Select_Reactor_Handler_Repository::dump (void) const
-{
- ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
-
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"),
- this->max_handlep1_, this->max_size_));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("[")));
-
- ACE_Event_Handler *eh = 0;
-
- for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
- iter.next (eh) != 0;
- iter.advance ())
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"),
- eh, eh->get_handle ()));
-
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]")));
- ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
-}
-
-ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
-
-ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
- : max_notify_iterations_ (-1)
-{
-}
-
-void
-ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
-{
- // Must always be > 0 or < 0 to optimize the loop exit condition.
- if (iterations == 0)
- iterations = 1;
-
- this->max_notify_iterations_ = iterations;
-}
-
-int
-ACE_Select_Reactor_Notify::max_notify_iterations (void)
-{
- return this->max_notify_iterations_;
-}
-
-// purge_pending_notifications
-// Removes all entries from the notify_queue_ and each one that
-// matches <eh> is put on the free_queue_. The rest are saved on a
-// local queue and copied back to the notify_queue_ at the end.
-// Returns the number of entries removed. Returns -1 on error.
-// ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined.
-int
-ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask )
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a particularhandler),
- // and it matches the specified handler (or purging all), then
- // apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- { // should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
-
-#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
- ACE_UNUSED_ARG (eh);
- ACE_UNUSED_ARG (mask);
- ACE_NOTSUP_RETURN (-1);
-#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
-}
-
-void
-ACE_Select_Reactor_Notify::dump (void) const
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
-
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
- this->notification_pipe_.dump ();
- ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
-}
-
-int
-ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
- ACE_Timer_Queue *,
- int disable_notify_pipe)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::open");
-
- if (disable_notify_pipe == 0)
- {
- this->select_reactor_ =
- ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
-
- if (select_reactor_ == 0)
- {
- errno = EINVAL;
- return -1;
- }
-
- if (this->notification_pipe_.open () == -1)
- return -1;
-#if defined (F_SETFD)
- ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
- ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
-#endif /* F_SETFD */
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- {
- delete [] temp;
- return -1;
- }
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
-
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-
- // There seems to be a Win32 bug with this... Set this into
- // non-blocking mode.
- if (ACE::set_flags (this->notification_pipe_.read_handle (),
- ACE_NONBLOCK) == -1)
- return -1;
- else
- return this->select_reactor_->register_handler
- (this->notification_pipe_.read_handle (),
- this,
- ACE_Event_Handler::READ_MASK);
- }
- else
- {
- this->select_reactor_ = 0;
- return 0;
- }
-}
-
-int
-ACE_Select_Reactor_Notify::close (void)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::close");
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-
- return this->notification_pipe_.close ();
-}
-
-int
-ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- ACE_Time_Value *timeout)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
-
- // Just consider this method a "no-op" if there's no
- // <ACE_Select_Reactor> configured.
- if (this->select_reactor_ == 0)
- return 0;
-
- ACE_Notification_Buffer buffer (eh, mask);
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Artificial scope to limit the duration of the mutex.
- {
- // int notification_required = 0;
-
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // No pending notifications.
-
- // We will send notify for every message..
- // if (this->notify_queue_.is_empty ())
- // notification_required = 1;
-
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- i++)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
-
- ACE_ASSERT (temp != 0);
- *temp = buffer;
-
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
- }
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-
- ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
- (char *) &buffer,
- sizeof buffer,
- timeout);
- if (n == -1)
- return -1;
-
- return 0;
-}
-
-// Handles pending threads (if any) that are waiting to unblock the
-// Select_Reactor.
-
-int
-ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
- ACE_Handle_Set &rd_mask)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
-
- ACE_HANDLE read_handle =
- this->notification_pipe_.read_handle ();
-
- if (read_handle != ACE_INVALID_HANDLE
- && rd_mask.is_set (read_handle))
- {
- number_of_active_handles--;
- rd_mask.clr_bit (read_handle);
- return this->handle_input (read_handle);
- }
- else
- return 0;
-}
-
-
-ACE_HANDLE
-ACE_Select_Reactor_Notify::notify_handle (void)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
-
- return this->notification_pipe_.read_handle ();
-}
-
-
-// Special trick to unblock <select> when updates occur in somewhere
-// other than the main <ACE_Select_Reactor> thread. All we do is
-// write data to a pipe that the <ACE_Select_Reactor> is listening on.
-// Thanks to Paul Stephenson for suggesting this approach.
-int
-ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
-{
- // There is tonnes of code that can be abstracted...
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- ACE_Notification_Buffer *temp;
-
- ACE_UNUSED_ARG (buffer);
-
- // If the queue is empty just return 0
- if (notify_queue_.is_empty ())
- return 0;
-
- if (this->notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- if (temp->eh_ != 0)
- {
- // If the queue had a buffer that has an event handler, put
- // the element back in the queue and return a 1
- if (this->notify_queue_.enqueue_head (temp) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enque_head")),
- -1);
- }
-
- return 1;
- }
- // Else put the element in the free queue
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-#else
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- return 1;
-
-#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-
- // has no dispatchable buffer
- return 0;
-}
-
-int
-ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
-{
- int result = 0;
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Dispatch all messages that are in the <notify_queue_>.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- ACE_Notification_Buffer *temp;
-
- if (notify_queue_.is_empty ())
- return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
-
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-#else
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::QOS_MASK:
- result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("invalid mask = %d\n"),
- buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-
- return 1;
-}
-
-int
-ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
- ACE_Notification_Buffer &buffer)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
-
- ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
-
- if (n > 0)
- {
- // Check to see if we've got a short read.
- if (n != sizeof buffer)
- {
- ssize_t remainder = sizeof buffer - n;
-
- // If so, try to recover by reading the remainder. If this
- // doesn't work we're in big trouble since the input stream
- // won't be aligned correctly. I'm not sure quite what to
- // do at this point. It's probably best just to return -1.
- if (ACE::recv (handle,
- ((char *) &buffer) + n,
- remainder) != remainder)
- return -1;
- }
-
-
- return 1;
- }
-
- // Return -1 if things have gone seriously wrong.
- if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
- return -1;
-
- return 0;
-}
-
-
-int
-ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
-{
- ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
- // Precondition: this->select_reactor_.token_.current_owner () ==
- // ACE_Thread::self ();
-
- int number_dispatched = 0;
- int result = 0;
- ACE_Notification_Buffer buffer;
-
- while ((result = this->read_notify_pipe (handle, buffer)) > 0)
- {
- // Dispatch the buffer
- // NOTE: We count only if we made any dispatches ie. upcalls.
- if (this->dispatch_notify (buffer) > 0)
- number_dispatched++;
-
- // Bail out if we've reached the <notify_threshold_>. Note that
- // by default <notify_threshold_> is -1, so we'll loop until all
- // the notifications in the pipe have been dispatched.
- if (number_dispatched == this->max_notify_iterations_)
- break;
- }
-
- // Reassign number_dispatched to -1 if things have gone seriously
- // wrong.
- if (result < 0)
- number_dispatched = -1;
-
- // Enqueue ourselves into the list of waiting threads. When we
- // reacquire the token we'll be off and running again with ownership
- // of the token. The postcondition of this call is that
- // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
- this->select_reactor_->renew ();
- return number_dispatched;
-}
-
// Perform GET, CLR, SET, and ADD operations on the Handle_Sets.
//
// GET = 1, Retrieve current value
@@ -1112,17 +121,3 @@ ACE_Select_Reactor_Impl::resumable_handler (void)
return 0;
}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
-template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
-template class ACE_Node <ACE_Notification_Buffer *>;
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
-#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
-#pragma instantiate ACE_Node <ACE_Notification_Buffer *>
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h
index 6f4c4923d54..24df01219c3 100644
--- a/ace/Select_Reactor_Base.h
+++ b/ace/Select_Reactor_Base.h
@@ -24,12 +24,8 @@
#include "ace/Event_Handler.h"
#include "ace/Handle_Set.h"
#include "ace/Token.h"
-#include "ace/Pipe.h"
#include "ace/Reactor_Impl.h"
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
-#include "ace/Unbounded_Queue.h"
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#include "Select_Reactor_Handler_Repository.h"
// Add useful typedefs to simplify the following code.
typedef void (ACE_Handle_Set::*ACE_FDS_PTMF) (ACE_HANDLE);
@@ -42,7 +38,7 @@ typedef ACE_Noop_Token ACE_SELECT_TOKEN;
#endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */
// Forward declaration.
-class ACE_Select_Reactor_Impl;
+class ACE_Select_Reactor_Notify;
/**
* @class ACE_Select_Reactor_Handle_Set
@@ -63,355 +59,8 @@ public:
ACE_Handle_Set ex_mask_;
};
-/**
- * @class ACE_Event_Tuple
- *
- * @brief An ACE_Event_Handler and its associated ACE_HANDLE.
- *
- * One <ACE_Event_Handler> is registered for one or more
- * <ACE_HANDLE>. At various points, this information must be
- * stored explicitly. This class provides a lightweight
- * mechanism to do so.
- */
-class ACE_Export ACE_Event_Tuple
-{
-public:
- /// Default constructor.
- ACE_Event_Tuple (void);
-
- /// Constructor.
- ACE_Event_Tuple (ACE_Event_Handler *eh,
- ACE_HANDLE h);
-
- /// Destructor.
- ~ACE_Event_Tuple (void);
-
- /// Equality operator.
- int operator== (const ACE_Event_Tuple &rhs) const;
-
- /// Inequality operator.
- int operator!= (const ACE_Event_Tuple &rhs) const;
-
- /// Handle.
- ACE_HANDLE handle_;
-
- /// <ACE_Event_Handler> associated with the <ACE_HANDLE>.
- ACE_Event_Handler *event_handler_;
-};
-
-/**
- * @class ACE_Select_Reactor_Notify
- *
- * @brief Unblock the <ACE_Select_Reactor> from its event loop.
- *
- * This implementation is necessary for cases where the
- * <ACE_Select_Reactor> is run in a multi-threaded program. In
- * this case, we need to be able to unblock <select> or <poll>
- * when updates occur other than in the main
- * <ACE_Select_Reactor> thread. To do this, we signal an
- * auto-reset event the <ACE_Select_Reactor> is listening on.
- * If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is
- * passed to <notify>, the appropriate <handle_*> method is
- * dispatched in the context of the <ACE_Select_Reactor> thread.
- */
-class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify
-{
-public:
- /// Constructor.
- ACE_Select_Reactor_Notify (void);
-
- /// Destructor.
- ~ACE_Select_Reactor_Notify (void);
-
- // = Initialization and termination methods.
- /// Initialize.
- virtual int open (ACE_Reactor_Impl *,
- ACE_Timer_Queue * = 0,
- int disable_notify_pipe = 0);
-
- /// Destroy.
- virtual int close (void);
-
- /**
- * Called by a thread when it wants to unblock the
- * <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if
- * currently blocked in <select>/<poll>. Pass over both the
- * <Event_Handler> *and* the <mask> to allow the caller to dictate
- * which <Event_Handler> method the <ACE_Select_Reactor> will
- * invoke. The <ACE_Time_Value> indicates how long to blocking
- * trying to notify the <ACE_Select_Reactor>. If <timeout> == 0,
- * the caller will block until action is possible, else will wait
- * until the relative time specified in *<timeout> elapses).
- */
- virtual int notify (ACE_Event_Handler * = 0,
- ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK,
- ACE_Time_Value * = 0);
-
- /// Handles pending threads (if any) that are waiting to unblock the
- /// <ACE_Select_Reactor>.
- virtual int dispatch_notifications (int &number_of_active_handles,
- ACE_Handle_Set &rd_mask);
-
- /// Returns the ACE_HANDLE of the notify pipe on which the reactor
- /// is listening for notifications so that other threads can unblock
- /// the Select_Reactor
- virtual ACE_HANDLE notify_handle (void);
-
- /// Handle one of the notify call on the <handle>. This could be
- /// because of a thread trying to unblock the <Reactor_Impl>
- virtual int dispatch_notify (ACE_Notification_Buffer &buffer);
-
- /// Read one of the notify call on the <handle> into the
- /// <buffer>. This could be because of a thread trying to unblock
- /// the <Reactor_Impl>
- virtual int read_notify_pipe (ACE_HANDLE handle,
- ACE_Notification_Buffer &buffer);
-
- /// Verify whether the buffer has dispatchable info or not.
- virtual int is_dispatchable (ACE_Notification_Buffer &buffer);
-
- /// Called back by the <ACE_Select_Reactor> when a thread wants to
- /// unblock us.
- virtual int handle_input (ACE_HANDLE handle);
-
- /**
- * Set the maximum number of times that the
- * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
- * dispatch the <ACE_Event_Handlers> that are passed in via the
- * notify pipe before breaking out of its <recv> loop. By default,
- * this is set to -1, which means "iterate until the pipe is empty."
- * Setting this to a value like "1 or 2" will increase "fairness"
- * (and thus prevent starvation) at the expense of slightly higher
- * dispatching overhead.
- */
- virtual void max_notify_iterations (int);
-
- /**
- * Get the maximum number of times that the
- * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
- * dispatch the <ACE_Event_Handlers> that are passed in via the
- * notify pipe before breaking out of its <recv> loop.
- */
- virtual int max_notify_iterations (void);
-
- /**
- * Purge any notifications pending in this reactor for the specified
- * <ACE_Event_Handler> object. If <eh> == 0, all notifications for all
- * handlers are removed (but not any notifications posted just to wake up
- * the reactor itself). Returns the number of notifications purged.
- * Returns -1 on error.
- */
- virtual int purge_pending_notifications (ACE_Event_Handler *,
- ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
-
- /// Dump the state of an object.
- virtual void dump (void) const;
-
- /// Declare the dynamic allocation hooks.
- ACE_ALLOC_HOOK_DECLARE;
-
-protected:
- /**
- * Keep a back pointer to the <ACE_Select_Reactor>. If this value
- * if NULL then the <ACE_Select_Reactor> has been initialized with
- * <disable_notify_pipe>.
- */
- ACE_Select_Reactor_Impl *select_reactor_;
-
- /**
- * Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening
- * on, as well as the <ACE_HANDLE> that threads wanting the
- * attention of the <ACE_Select_Reactor> will write to.
- */
- ACE_Pipe notification_pipe_;
-
- /**
- * Keeps track of the maximum number of times that the
- * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
- * dispatch the <ACE_Event_Handlers> that are passed in via the
- * notify pipe before breaking out of its <recv> loop. By default,
- * this is set to -1, which means "iterate until the pipe is empty."
- */
- int max_notify_iterations_;
-
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // = This configuration queues up notifications in separate buffers that
- // are in user-space, rather than stored in a pipe in the OS
- // kernel. The kernel-level notifications are used only to trigger
- // the Reactor to check its notification queue. This enables many
- // more notifications to be stored than would otherwise be the case.
-
- /// Keeps track of allocated arrays of type
- /// <ACE_Notification_Buffer>.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
-
- /// Keeps track of all pending notifications.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
-
- /// Keeps track of all free buffers.
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
-
- /// Synchronization for handling of queues.
- ACE_SYNCH_MUTEX notify_queue_lock_;
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
-};
-
-/**
- * @class ACE_Select_Reactor_Handler_Repository
- *
- * @brief Used to map <ACE_HANDLE>s onto the appropriate
- * <ACE_Event_Handler> *.
- *
- * This class is necessary to shield differences between UNIX
- * and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32
- * it's a void *. This class hides all these details from the
- * bulk of the <ACE_Select_Reactor> code. All of these methods
- * are called with the main <Select_Reactor> token lock held.
- */
-class ACE_Export ACE_Select_Reactor_Handler_Repository
-{
-public:
- friend class ACE_Select_Reactor_Handler_Repository_Iterator;
-
- // = Initialization and termination methods.
- /// Default "do-nothing" constructor.
- ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &);
-
- /// Destructor.
- ~ACE_Select_Reactor_Handler_Repository (void);
-
- /// Initialize a repository of the appropriate <size>.
- /**
- * On Unix platforms, the size parameter should be as large as the
- * maximum number of file descriptors allowed for a given process.
- * This is necessary since a file descriptor is used to directly
- * index the array of event handlers maintained by the Reactor's
- * handler repository. Direct indexing is used for efficiency
- * reasons.
- */
- int open (size_t size);
-
- /// Close down the repository.
- int close (void);
-
- // = Search structure operations.
-
- /**
- * Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>.
- * If <index_p> is non-0, then return the index location of the
- * <handle>, if found.
- */
- ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0);
-
- /// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the
- /// appropriate <ACE_Reactor_Mask> settings.
- int bind (ACE_HANDLE,
- ACE_Event_Handler *,
- ACE_Reactor_Mask);
-
- /// Remove the binding of <ACE_HANDLE> in accordance with the <mask>.
- int unbind (ACE_HANDLE,
- ACE_Reactor_Mask mask);
-
- /// Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples.
- int unbind_all (void);
-
- // = Sanity checking.
-
- // Check the <handle> to make sure it's a valid ACE_HANDLE that
- // within the range of legal handles (i.e., >= 0 && < max_size_).
- int invalid_handle (ACE_HANDLE handle);
-
- // Check the <handle> to make sure it's a valid ACE_HANDLE that
- // within the range of currently registered handles (i.e., >= 0 && <
- // max_handlep1_).
- int handle_in_range (ACE_HANDLE handle);
-
- // = Accessors.
- /// Returns the current table size.
- size_t size (void) const;
-
- /// Maximum ACE_HANDLE value, plus 1.
- size_t max_handlep1 (void);
-
- /// Dump the state of an object.
- void dump (void) const;
- /// Declare the dynamic allocation hooks.
- ACE_ALLOC_HOOK_DECLARE;
-
-private:
- /// Reference to our <Select_Reactor>.
- ACE_Select_Reactor_Impl &select_reactor_;
-
- /// Maximum number of handles.
- ssize_t max_size_;
-
- /// The highest currently active handle, plus 1 (ranges between 0 and
- /// <max_size_>.
- int max_handlep1_;
-
-#if defined (ACE_WIN32)
- // = The mapping from <HANDLES> to <Event_Handlers>.
-
- /**
- * The NT version implements this via a dynamically allocated
- * array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE
- * as a void * we can't directly index into this array. Therefore,
- * we just do a linear search (for now). Next, we'll modify
- * things to use hashing or something faster...
- */
- ACE_Event_Tuple *event_handlers_;
-#else
- /**
- * The UNIX version implements this via a dynamically allocated
- * array of <ACE_Event_Handler *> that is indexed directly using
- * the ACE_HANDLE value.
- */
- ACE_Event_Handler **event_handlers_;
-#endif /* ACE_WIN32 */
-};
-
-/**
- * @class ACE_Select_Reactor_Handler_Repository_Iterator
- *
- * @brief Iterate through the <ACE_Select_Reactor_Handler_Repository>.
- */
-class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator
-{
-public:
- // = Initialization method.
- ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s);
-
- /// dtor.
- ~ACE_Select_Reactor_Handler_Repository_Iterator (void);
-
- // = Iteration methods.
-
- /// Pass back the <next_item> that hasn't been seen in the Set.
- /// Returns 0 when all items have been seen, else 1.
- int next (ACE_Event_Handler *&next_item);
-
- /// Returns 1 when all items have been seen, else 0.
- int done (void) const;
-
- /// Move forward by one element in the set. Returns 0 when all the
- /// items in the set have been seen, else 1.
- int advance (void);
-
- /// Dump the state of an object.
- void dump (void) const;
-
- /// Declare the dynamic allocation hooks.
- ACE_ALLOC_HOOK_DECLARE;
-
-private:
- /// Reference to the Handler_Repository we are iterating over.
- const ACE_Select_Reactor_Handler_Repository *rep_;
-
- /// Pointer to the current iteration level.
- ssize_t current_;
-};
+//=====================================================================
/**
* @class ACE_Select_Reactor_Impl
diff --git a/ace/Select_Reactor_Base.i b/ace/Select_Reactor_Base.i
index f04446e2e86..6dbadd25370 100644
--- a/ace/Select_Reactor_Base.i
+++ b/ace/Select_Reactor_Base.i
@@ -3,58 +3,7 @@
#include "ace/Reactor.h"
-ACE_INLINE
-ACE_Event_Tuple::~ACE_Event_Tuple (void)
-{
-}
-ACE_INLINE
-ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
-{
-}
-
-ACE_INLINE
-ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void)
-{
-}
-
-ACE_INLINE
-ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void)
-{
-}
-
-ACE_INLINE size_t
-ACE_Select_Reactor_Handler_Repository::size (void) const
-{
- return this->max_size_;
-}
-
-ACE_INLINE
-ACE_Event_Tuple::ACE_Event_Tuple (void)
-: handle_ (ACE_INVALID_HANDLE),
- event_handler_ (0)
-{
-}
-
-ACE_INLINE
-ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh,
- ACE_HANDLE h)
-: handle_ (h),
- event_handler_ (eh)
-{
-}
-
-ACE_INLINE int
-ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const
-{
- return this->handle_ == rhs.handle_;
-}
-
-ACE_INLINE int
-ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const
-{
- return !(*this == rhs);
-}
ACE_INLINE
ACE_Select_Reactor_Impl::ACE_Select_Reactor_Impl ()
diff --git a/ace/Select_Reactor_Handler_Repository.cpp b/ace/Select_Reactor_Handler_Repository.cpp
new file mode 100644
index 00000000000..62eee31185e
--- /dev/null
+++ b/ace/Select_Reactor_Handler_Repository.cpp
@@ -0,0 +1,459 @@
+#include "Select_Reactor_Handler_Repository.h"
+#include "ACE.h"
+#include "Select_Reactor_Base.h"
+#include "Reactor.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Select_Reactor_Handler_Repository.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ace,
+ Select_Reactor_Handler_Repository,
+ "$Id$")
+
+#if defined (ACE_WIN32)
+#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
+#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
+#else
+#define ACE_SELECT_REACTOR_HANDLE(H) (H)
+#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
+#endif /* ACE_WIN32 */
+
+
+ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void)
+{
+}
+
+// Performs sanity checking on the ACE_HANDLE.
+int
+ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
+#if defined (ACE_WIN32)
+ // It's too expensive to perform more exhaustive validity checks on
+ // Win32 due to the way that they implement SOCKET HANDLEs.
+ if (handle == ACE_INVALID_HANDLE)
+#else /* !ACE_WIN32 */
+ if (handle < 0 || handle >= this->max_size_)
+#endif /* ACE_WIN32 */
+ {
+ errno = EINVAL;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+// Performs sanity checking on the ACE_HANDLE.
+
+int
+ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
+#if defined (ACE_WIN32)
+ // It's too expensive to perform more exhaustive validity checks on
+ // Win32 due to the way that they implement SOCKET HANDLEs.
+ if (handle != ACE_INVALID_HANDLE)
+#else /* !ACE_WIN32 */
+ if (handle >= 0 && handle < this->max_handlep1_)
+#endif /* ACE_WIN32 */
+ return 1;
+ else
+ {
+ errno = EINVAL;
+ return 0;
+ }
+}
+
+size_t
+ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
+
+ return this->max_handlep1_;
+}
+
+int
+ACE_Select_Reactor_Handler_Repository::open (size_t size)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
+ this->max_size_ = size;
+ this->max_handlep1_ = 0;
+
+#if defined (ACE_WIN32)
+ // Try to allocate the memory.
+ ACE_NEW_RETURN (this->event_handlers_,
+ ACE_Event_Tuple[size],
+ -1);
+
+ // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }.
+ for (size_t h = 0; h < size; h++)
+ {
+ ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
+ }
+#else
+ // Try to allocate the memory.
+ ACE_NEW_RETURN (this->event_handlers_,
+ ACE_Event_Handler *[size],
+ -1);
+
+ // Initialize the ACE_Event_Handler * to NULL.
+ for (size_t h = 0; h < size; h++)
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
+#endif /* ACE_WIN32 */
+
+ // Try to increase the number of handles if <size> is greater than
+ // the current limit.
+ return ACE::set_handle_limit (size);
+}
+
+// Initialize a repository of the appropriate <size>.
+
+ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (
+ ACE_Select_Reactor_Impl &select_reactor)
+ : select_reactor_ (select_reactor),
+ max_size_ (0),
+ max_handlep1_ (0),
+ event_handlers_ (0)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
+}
+
+int
+ACE_Select_Reactor_Handler_Repository::unbind_all (void)
+{
+ // Unbind all of the <handle, ACE_Event_Handler>s.
+ for (int handle = 0;
+ handle < this->max_handlep1_;
+ handle++)
+ this->unbind (ACE_SELECT_REACTOR_HANDLE (handle),
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+
+ return 0;
+}
+
+int
+ACE_Select_Reactor_Handler_Repository::close (void)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
+
+ if (this->event_handlers_ != 0)
+ {
+ this->unbind_all ();
+
+ delete [] this->event_handlers_;
+ this->event_handlers_ = 0;
+ }
+ return 0;
+}
+
+// Return the <ACE_Event_Handler *> associated with the <handle>.
+
+ACE_Event_Handler *
+ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
+ size_t *index_p)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
+
+ ACE_Event_Handler *eh = 0;
+ ssize_t i;
+
+ // Only bother to search for the <handle> if it's in range.
+ if (this->handle_in_range (handle))
+ {
+#if defined (ACE_WIN32)
+ i = 0;
+
+ for (; i < this->max_handlep1_; i++)
+ if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
+ {
+ eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
+ break;
+ }
+#else
+ i = handle;
+
+ eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
+#endif /* ACE_WIN32 */
+ }
+ else
+ // g++ can't figure out that <i> won't be used below if the handle
+ // is out of range, so keep it happy by defining <i> here . . .
+ i = 0;
+
+ if (eh != 0)
+ {
+ if (index_p != 0)
+ *index_p = i;
+ }
+ else
+ errno = ENOENT;
+
+ return eh;
+}
+
+// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
+
+int
+ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
+ ACE_Event_Handler *event_handler,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
+
+ if (handle == ACE_INVALID_HANDLE)
+ handle = event_handler->get_handle ();
+
+ if (this->invalid_handle (handle))
+ return -1;
+
+#if defined (ACE_WIN32)
+ int assigned_slot = -1;
+
+ for (ssize_t i = 0; i < this->max_handlep1_; i++)
+ {
+ // Found it, so let's just reuse this location.
+ if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
+ {
+ assigned_slot = i;
+ break;
+ }
+ // Here's the first free slot, so let's take it.
+ else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE
+ && assigned_slot == -1)
+ assigned_slot = i;
+ }
+
+ if (assigned_slot > -1)
+ // We found a free spot, let's reuse it.
+ {
+ ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
+ }
+ else if (this->max_handlep1_ < this->max_size_)
+ {
+ // Insert at the end of the active portion.
+ ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
+ this->max_handlep1_++;
+ }
+ else
+ {
+ // No more room at the inn!
+ errno = ENOMEM;
+ return -1;
+ }
+#else
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
+
+ if (this->max_handlep1_ < handle + 1)
+ this->max_handlep1_ = handle + 1;
+#endif /* ACE_WIN32 */
+
+ // Add the <mask> for this <handle> in the Select_Reactor's wait_set.
+ this->select_reactor_.bit_ops (handle,
+ mask,
+ this->select_reactor_.wait_set_,
+ ACE_Reactor::ADD_MASK);
+
+ // Note the fact that we've changed the state of the <wait_set_>,
+ // which is used by the dispatching loop to determine whether it can
+ // keep going or if it needs to reconsult select().
+ this->select_reactor_.state_changed_ = 1;
+
+ return 0;
+}
+
+// Remove the binding of <ACE_HANDLE>.
+
+int
+ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
+
+ size_t slot;
+ ACE_Event_Handler *eh = this->find (handle, &slot);
+
+ if (eh == 0)
+ return -1;
+
+ // Clear out the <mask> bits in the Select_Reactor's wait_set.
+ this->select_reactor_.bit_ops (handle,
+ mask,
+ this->select_reactor_.wait_set_,
+ ACE_Reactor::CLR_MASK);
+
+ // And suspend_set.
+ this->select_reactor_.bit_ops (handle,
+ mask,
+ this->select_reactor_.suspend_set_,
+ ACE_Reactor::CLR_MASK);
+
+ // Note the fact that we've changed the state of the <wait_set_>,
+ // which is used by the dispatching loop to determine whether it can
+ // keep going or if it needs to reconsult select().
+ this->select_reactor_.state_changed_ = 1;
+
+ // Close down the <Event_Handler> unless we've been instructed not
+ // to.
+ if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
+ eh->handle_close (handle, mask);
+
+ // If there are no longer any outstanding events on this <handle>
+ // then we can totally shut down the Event_Handler.
+ if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0
+ && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0
+ && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0)
+#if defined (ACE_WIN32)
+ {
+ ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
+
+ if (this->max_handlep1_ == (int) slot + 1)
+ {
+ // We've deleted the last entry (i.e., i + 1 == the current
+ // size of the array), so we need to figure out the last
+ // valid place in the array that we should consider in
+ // subsequent searches.
+
+ int i;
+
+ for (i = this->max_handlep1_ - 1;
+ i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
+ i--)
+ continue;
+
+ this->max_handlep1_ = i + 1;
+ }
+ }
+#else
+ {
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0;
+
+ if (this->max_handlep1_ == handle + 1)
+ {
+ // We've deleted the last entry, so we need to figure out
+ // the last valid place in the array that is worth looking
+ // at.
+ ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
+ ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
+ ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
+
+ ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
+ ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
+ ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
+
+ // Compute the maximum of six values.
+ this->max_handlep1_ = wait_rd_max;
+ if (this->max_handlep1_ < wait_wr_max)
+ this->max_handlep1_ = wait_wr_max;
+ if (this->max_handlep1_ < wait_ex_max)
+ this->max_handlep1_ = wait_ex_max;
+
+ if (this->max_handlep1_ < suspend_rd_max)
+ this->max_handlep1_ = suspend_rd_max;
+ if (this->max_handlep1_ < suspend_wr_max)
+ this->max_handlep1_ = suspend_wr_max;
+ if (this->max_handlep1_ < suspend_ex_max)
+ this->max_handlep1_ = suspend_ex_max;
+
+ this->max_handlep1_++;
+ }
+ }
+#endif /* ACE_WIN32 */
+
+ return 0;
+}
+
+/****************************************************************************/
+
+ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void)
+{
+}
+
+ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
+ (const ACE_Select_Reactor_Handler_Repository *s)
+ : rep_ (s),
+ current_ (-1)
+{
+ this->advance ();
+}
+
+// Pass back the <next_item> that hasn't been seen in the Set.
+// Returns 0 when all items have been seen, else 1.
+
+int
+ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
+{
+ int result = 1;
+
+ if (this->current_ >= this->rep_->max_handlep1_)
+ result = 0;
+ else
+ next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
+ this->current_);
+ return result;
+}
+
+int
+ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
+{
+ return this->current_ >= this->rep_->max_handlep1_;
+}
+
+// Move forward by one element in the set.
+
+int
+ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
+{
+ if (this->current_ < this->rep_->max_handlep1_)
+ this->current_++;
+
+ while (this->current_ < this->rep_->max_handlep1_)
+ if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
+ return 1;
+ else
+ this->current_++;
+
+ return this->current_ < this->rep_->max_handlep1_;
+}
+
+// Dump the state of an object.
+
+void
+ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
+
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+void
+ACE_Select_Reactor_Handler_Repository::dump (void) const
+{
+ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
+
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"),
+ this->max_handlep1_, this->max_size_));
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("[")));
+
+ ACE_Event_Handler *eh = 0;
+
+ for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
+ iter.next (eh) != 0;
+ iter.advance ())
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"),
+ eh, eh->get_handle ()));
+
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]")));
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
diff --git a/ace/Select_Reactor_Handler_Repository.h b/ace/Select_Reactor_Handler_Repository.h
new file mode 100644
index 00000000000..b6cc5bc7189
--- /dev/null
+++ b/ace/Select_Reactor_Handler_Repository.h
@@ -0,0 +1,230 @@
+/* -*- C++ -*- */
+//=============================================================================
+/**
+ * @file Select_Reactor_Handler_Repository.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H
+#define ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H
+
+#include "ace/pre.h"
+
+#include "ACE_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "Event_Handler.h"
+
+class ACE_Select_Reactor_Impl;
+class ACE_Event_Handler;
+
+/**
+ * @class ACE_Event_Tuple
+ *
+ * @brief An ACE_Event_Handler and its associated ACE_HANDLE.
+ *
+ * One <ACE_Event_Handler> is registered for one or more
+ * <ACE_HANDLE>. At various points, this information must be
+ * stored explicitly. This class provides a lightweight
+ * mechanism to do so.
+ */
+class ACE_Export ACE_Event_Tuple
+{
+public:
+ /// Default constructor.
+ ACE_Event_Tuple (void);
+
+ /// Constructor.
+ ACE_Event_Tuple (ACE_Event_Handler *eh,
+ ACE_HANDLE h);
+
+ /// Destructor.
+ ~ACE_Event_Tuple (void);
+
+ /// Equality operator.
+ int operator== (const ACE_Event_Tuple &rhs) const;
+
+ /// Inequality operator.
+ int operator!= (const ACE_Event_Tuple &rhs) const;
+
+ /// Handle.
+ ACE_HANDLE handle_;
+
+ /// <ACE_Event_Handler> associated with the <ACE_HANDLE>.
+ ACE_Event_Handler *event_handler_;
+};
+
+
+//===================================================================
+/**
+ * @class ACE_Select_Reactor_Handler_Repository
+ *
+ * @brief Used to map <ACE_HANDLE>s onto the appropriate
+ * <ACE_Event_Handler> *.
+ *
+ * This class is necessary to shield differences between UNIX
+ * and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32
+ * it's a void *. This class hides all these details from the
+ * bulk of the <ACE_Select_Reactor> code. All of these methods
+ * are called with the main <Select_Reactor> token lock held.
+ */
+class ACE_Export ACE_Select_Reactor_Handler_Repository
+{
+public:
+ friend class ACE_Select_Reactor_Handler_Repository_Iterator;
+
+ // = Initialization and termination methods.
+ /// Default "do-nothing" constructor.
+ ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &);
+
+ /// Destructor.
+ ~ACE_Select_Reactor_Handler_Repository (void);
+
+ /// Initialize a repository of the appropriate <size>.
+ /**
+ * On Unix platforms, the size parameter should be as large as the
+ * maximum number of file descriptors allowed for a given process.
+ * This is necessary since a file descriptor is used to directly
+ * index the array of event handlers maintained by the Reactor's
+ * handler repository. Direct indexing is used for efficiency
+ * reasons.
+ */
+ int open (size_t size);
+
+ /// Close down the repository.
+ int close (void);
+
+ // = Search structure operations.
+
+ /**
+ * Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>.
+ * If <index_p> is non-0, then return the index location of the
+ * <handle>, if found.
+ */
+ ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0);
+
+ /// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the
+ /// appropriate <ACE_Reactor_Mask> settings.
+ int bind (ACE_HANDLE,
+ ACE_Event_Handler *,
+ ACE_Reactor_Mask);
+
+ /// Remove the binding of <ACE_HANDLE> in accordance with the <mask>.
+ int unbind (ACE_HANDLE,
+ ACE_Reactor_Mask mask);
+
+ /// Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples.
+ int unbind_all (void);
+
+ // = Sanity checking.
+
+ // Check the <handle> to make sure it's a valid ACE_HANDLE that
+ // within the range of legal handles (i.e., >= 0 && < max_size_).
+ int invalid_handle (ACE_HANDLE handle);
+
+ // Check the <handle> to make sure it's a valid ACE_HANDLE that
+ // within the range of currently registered handles (i.e., >= 0 && <
+ // max_handlep1_).
+ int handle_in_range (ACE_HANDLE handle);
+
+ // = Accessors.
+ /// Returns the current table size.
+ size_t size (void) const;
+
+ /// Maximum ACE_HANDLE value, plus 1.
+ size_t max_handlep1 (void);
+
+ /// Dump the state of an object.
+ void dump (void) const;
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+private:
+ /// Reference to our <Select_Reactor>.
+ ACE_Select_Reactor_Impl &select_reactor_;
+
+ /// Maximum number of handles.
+ ssize_t max_size_;
+
+ /// The highest currently active handle, plus 1 (ranges between 0 and
+ /// <max_size_>.
+ int max_handlep1_;
+
+#if defined (ACE_WIN32)
+ // = The mapping from <HANDLES> to <Event_Handlers>.
+
+ /**
+ * The NT version implements this via a dynamically allocated
+ * array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE
+ * as a void * we can't directly index into this array. Therefore,
+ * we just do a linear search (for now). Next, we'll modify
+ * things to use hashing or something faster...
+ */
+ ACE_Event_Tuple *event_handlers_;
+#else
+ /**
+ * The UNIX version implements this via a dynamically allocated
+ * array of <ACE_Event_Handler *> that is indexed directly using
+ * the ACE_HANDLE value.
+ */
+ ACE_Event_Handler **event_handlers_;
+#endif /* ACE_WIN32 */
+};
+
+//=================================================================
+
+/**
+ * @class ACE_Select_Reactor_Handler_Repository_Iterator
+ *
+ * @brief Iterate through the <ACE_Select_Reactor_Handler_Repository>.
+ */
+class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator
+{
+public:
+ // = Initialization method.
+ ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s);
+
+ /// dtor.
+ ~ACE_Select_Reactor_Handler_Repository_Iterator (void);
+
+ // = Iteration methods.
+
+ /// Pass back the <next_item> that hasn't been seen in the Set.
+ /// Returns 0 when all items have been seen, else 1.
+ int next (ACE_Event_Handler *&next_item);
+
+ /// Returns 1 when all items have been seen, else 0.
+ int done (void) const;
+
+ /// Move forward by one element in the set. Returns 0 when all the
+ /// items in the set have been seen, else 1.
+ int advance (void);
+
+ /// Dump the state of an object.
+ void dump (void) const;
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+private:
+ /// Reference to the Handler_Repository we are iterating over.
+ const ACE_Select_Reactor_Handler_Repository *rep_;
+
+ /// Pointer to the current iteration level.
+ ssize_t current_;
+};
+
+
+#if defined (__ACE_INLINE__)
+#include "Select_Reactor_Handler_Repository.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /*ACE_SELECT_REACTOR_HANDLER_REPOSITORY_H*/
diff --git a/ace/Select_Reactor_Handler_Repository.inl b/ace/Select_Reactor_Handler_Repository.inl
new file mode 100644
index 00000000000..9c9c234ec56
--- /dev/null
+++ b/ace/Select_Reactor_Handler_Repository.inl
@@ -0,0 +1,41 @@
+/* -*- C++ -*- */
+//$Id$
+ACE_INLINE
+ACE_Event_Tuple::~ACE_Event_Tuple (void)
+{
+}
+
+ACE_INLINE
+ACE_Event_Tuple::ACE_Event_Tuple (void)
+: handle_ (ACE_INVALID_HANDLE),
+ event_handler_ (0)
+{
+}
+
+ACE_INLINE
+ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh,
+ ACE_HANDLE h)
+: handle_ (h),
+ event_handler_ (eh)
+{
+}
+
+ACE_INLINE int
+ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const
+{
+ return this->handle_ == rhs.handle_;
+}
+
+ACE_INLINE int
+ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const
+{
+ return !(*this == rhs);
+}
+
+/************************************************************/
+
+ACE_INLINE size_t
+ACE_Select_Reactor_Handler_Repository::size (void) const
+{
+ return this->max_size_;
+}
diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp
new file mode 100644
index 00000000000..18886c44a36
--- /dev/null
+++ b/ace/Select_Reactor_Notify.cpp
@@ -0,0 +1,576 @@
+#include "Select_Reactor_Notify.h"
+#include "ACE.h"
+#include "Select_Reactor_Base.h"
+
+ACE_RCSID(ace,
+ Select_Reactor_Notify,
+ "$Id$")
+
+
+ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
+ : max_notify_iterations_ (-1)
+{
+}
+
+ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
+{
+}
+
+void
+ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
+{
+ // Must always be > 0 or < 0 to optimize the loop exit condition.
+ if (iterations == 0)
+ iterations = 1;
+
+ this->max_notify_iterations_ = iterations;
+}
+
+int
+ACE_Select_Reactor_Notify::max_notify_iterations (void)
+{
+ return this->max_notify_iterations_;
+}
+
+int
+ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask )
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ if (this->notify_queue_.is_empty ())
+ return 0;
+
+ ACE_Notification_Buffer *temp;
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
+
+ size_t queue_size = this->notify_queue_.size ();
+ int number_purged = 0;
+ size_t i;
+ for (i = 0; i < queue_size; ++i)
+ {
+ if (-1 == this->notify_queue_.dequeue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+
+ // If this is not a Reactor notify (it is for a particular handler),
+ // and it matches the specified handler (or purging all),
+ // and applying the mask would totally eliminate the notification, then
+ // release it and count the number purged.
+ if ((0 != temp->eh_) &&
+ (0 == eh || eh == temp->eh_) &&
+ ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
+ // is left with nothing when
+ // applying the mask
+ {
+ if (-1 == this->free_queue_.enqueue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ ++number_purged;
+ }
+ else
+ {
+ // To preserve it, move it to the local_queue.
+ // But first, if this is not a Reactor notify (it is for a particularhandler),
+ // and it matches the specified handler (or purging all), then
+ // apply the mask
+ if ((0 != temp->eh_) &&
+ (0 == eh || eh == temp->eh_))
+ ACE_CLR_BITS(temp->mask_, mask);
+ if (-1 == local_queue.enqueue_head (temp))
+ return -1;
+ }
+ }
+
+ if (this->notify_queue_.size ())
+ { // should be empty!
+ ACE_ASSERT (0);
+ return -1;
+ }
+
+ // now put it back in the notify queue
+ queue_size = local_queue.size ();
+ for (i = 0; i < queue_size; ++i)
+ {
+ if (-1 == local_queue.dequeue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+
+ if (-1 == this->notify_queue_.enqueue_head (temp))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ return number_purged;
+
+#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
+ ACE_UNUSED_ARG (eh);
+ ACE_UNUSED_ARG (mask);
+ ACE_NOTSUP_RETURN (-1);
+#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
+}
+
+void
+ACE_Select_Reactor_Notify::dump (void) const
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
+
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
+ this->notification_pipe_.dump ();
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+int
+ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
+ ACE_Timer_Queue *,
+ int disable_notify_pipe)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::open");
+
+ if (disable_notify_pipe == 0)
+ {
+ this->select_reactor_ =
+ ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
+
+ if (select_reactor_ == 0)
+ {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (this->notification_pipe_.open () == -1)
+ return -1;
+#if defined (F_SETFD)
+ ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
+ ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
+#endif /* F_SETFD */
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Notification_Buffer *temp;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_queue_.enqueue_head (temp) == -1)
+ {
+ delete [] temp;
+ return -1;
+ }
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
+ if (free_queue_.enqueue_head (temp + i) == -1)
+ return -1;
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ // There seems to be a Win32 bug with this... Set this into
+ // non-blocking mode.
+ if (ACE::set_flags (this->notification_pipe_.read_handle (),
+ ACE_NONBLOCK) == -1)
+ return -1;
+ else
+ return this->select_reactor_->register_handler
+ (this->notification_pipe_.read_handle (),
+ this,
+ ACE_Event_Handler::READ_MASK);
+ }
+ else
+ {
+ this->select_reactor_ = 0;
+ return 0;
+ }
+}
+
+int
+ACE_Select_Reactor_Notify::close (void)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::close");
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Buffer **b;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ {
+ delete [] *b;
+ *b = 0;
+ }
+
+ this->alloc_queue_.reset ();
+ this->notify_queue_.reset ();
+ this->free_queue_.reset ();
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ return this->notification_pipe_.close ();
+}
+
+int
+ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
+
+ // Just consider this method a "no-op" if there's no
+ // <ACE_Select_Reactor> configured.
+ if (this->select_reactor_ == 0)
+ return 0;
+
+ ACE_Notification_Buffer buffer (eh, mask);
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Artificial scope to limit the duration of the mutex.
+ {
+ // int notification_required = 0;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+
+ // We will send notify for every message..
+ // if (this->notify_queue_.is_empty ())
+ // notification_required = 1;
+
+ ACE_Notification_Buffer *temp = 0;
+
+ if (free_queue_.dequeue_head (temp) == -1)
+ {
+ // Grow the queue of available buffers.
+ ACE_Notification_Buffer *temp1;
+
+ ACE_NEW_RETURN (temp1,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_queue_.enqueue_head (temp1) == -1)
+ {
+ delete [] temp1;
+ return -1;
+ }
+
+ // Start at 1 and enqueue only
+ // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
+ // the first one will be used right now.
+ for (size_t i = 1;
+ i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
+ i++)
+ this->free_queue_.enqueue_head (temp1 + i);
+
+ temp = temp1;
+ }
+
+ ACE_ASSERT (temp != 0);
+ *temp = buffer;
+
+ if (notify_queue_.enqueue_tail (temp) == -1)
+ return -1;
+ }
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
+ (char *) &buffer,
+ sizeof buffer,
+ timeout);
+ if (n == -1)
+ return -1;
+
+ return 0;
+}
+
+// Handles pending threads (if any) that are waiting to unblock the
+// Select_Reactor.
+
+int
+ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
+ ACE_Handle_Set &rd_mask)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
+
+ ACE_HANDLE read_handle =
+ this->notification_pipe_.read_handle ();
+
+ if (read_handle != ACE_INVALID_HANDLE
+ && rd_mask.is_set (read_handle))
+ {
+ number_of_active_handles--;
+ rd_mask.clr_bit (read_handle);
+ return this->handle_input (read_handle);
+ }
+ else
+ return 0;
+}
+
+
+ACE_HANDLE
+ACE_Select_Reactor_Notify::notify_handle (void)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
+
+ return this->notification_pipe_.read_handle ();
+}
+
+
+// Special trick to unblock <select> when updates occur in somewhere
+// other than the main <ACE_Select_Reactor> thread. All we do is
+// write data to a pipe that the <ACE_Select_Reactor> is listening on.
+// Thanks to Paul Stephenson for suggesting this approach.
+int
+ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
+{
+ // There is tonnes of code that can be abstracted...
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ ACE_UNUSED_ARG (buffer);
+
+ // If the queue is empty just return 0
+ if (notify_queue_.is_empty ())
+ return 0;
+
+ if (this->notify_queue_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ if (temp->eh_ != 0)
+ {
+ // If the queue had a buffer that has an event handler, put
+ // the element back in the queue and return a 1
+ if (this->notify_queue_.enqueue_head (temp) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enque_head")),
+ -1);
+ }
+
+ return 1;
+ }
+ // Else put the element in the free queue
+ if (free_queue_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+#else
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ return 1;
+
+#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ // has no dispatchable buffer
+ return 0;
+}
+
+int
+ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
+{
+ int result = 0;
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Dispatch all messages that are in the <notify_queue_>.
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ if (notify_queue_.is_empty ())
+ return 0;
+ else if (notify_queue_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_queue_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+#else
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::QOS_MASK:
+ result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::GROUP_QOS_MASK:
+ result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("invalid mask = %d\n"),
+ buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ return 1;
+}
+
+int
+ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
+
+ ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
+
+ if (n > 0)
+ {
+ // Check to see if we've got a short read.
+ if (n != sizeof buffer)
+ {
+ ssize_t remainder = sizeof buffer - n;
+
+ // If so, try to recover by reading the remainder. If this
+ // doesn't work we're in big trouble since the input stream
+ // won't be aligned correctly. I'm not sure quite what to
+ // do at this point. It's probably best just to return -1.
+ if (ACE::recv (handle,
+ ((char *) &buffer) + n,
+ remainder) != remainder)
+ return -1;
+ }
+
+
+ return 1;
+ }
+
+ // Return -1 if things have gone seriously wrong.
+ if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
+ return -1;
+
+ return 0;
+}
+
+
+int
+ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
+{
+ ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
+ // Precondition: this->select_reactor_.token_.current_owner () ==
+ // ACE_Thread::self ();
+
+ int number_dispatched = 0;
+ int result = 0;
+ ACE_Notification_Buffer buffer;
+
+ while ((result = this->read_notify_pipe (handle, buffer)) > 0)
+ {
+ // Dispatch the buffer
+ // NOTE: We count only if we made any dispatches ie. upcalls.
+ if (this->dispatch_notify (buffer) > 0)
+ number_dispatched++;
+
+ // Bail out if we've reached the <notify_threshold_>. Note that
+ // by default <notify_threshold_> is -1, so we'll loop until all
+ // the notifications in the pipe have been dispatched.
+ if (number_dispatched == this->max_notify_iterations_)
+ break;
+ }
+
+ // Reassign number_dispatched to -1 if things have gone seriously
+ // wrong.
+ if (result < 0)
+ number_dispatched = -1;
+
+ // Enqueue ourselves into the list of waiting threads. When we
+ // reacquire the token we'll be off and running again with ownership
+ // of the token. The postcondition of this call is that
+ // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
+ this->select_reactor_->renew ();
+ return number_dispatched;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
+template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
+template class ACE_Node <ACE_Notification_Buffer *>;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Node <ACE_Notification_Buffer *>
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/Select_Reactor_Notify.h b/ace/Select_Reactor_Notify.h
new file mode 100644
index 00000000000..5e1b972021e
--- /dev/null
+++ b/ace/Select_Reactor_Notify.h
@@ -0,0 +1,189 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file Select_Reactor_Base.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef ACE_SELECT_REACTOR_NOTIFY_H
+#define ACE_SELECT_REACTOR_NOTIFY_H
+#include "ace/pre.h"
+#include "Reactor_Impl.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "ace/Pipe.h"
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+#include "Unbounded_Queue.h"
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+class ACE_Select_Reactor_Impl;
+
+
+/**
+ * @class ACE_Select_Reactor_Notify
+ *
+ * @brief Unblock the <ACE_Select_Reactor> from its event loop.
+ *
+ * This implementation is necessary for cases where the
+ * <ACE_Select_Reactor> is run in a multi-threaded program. In
+ * this case, we need to be able to unblock <select> or <poll>
+ * when updates occur other than in the main
+ * <ACE_Select_Reactor> thread. To do this, we signal an
+ * auto-reset event the <ACE_Select_Reactor> is listening on.
+ * If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is
+ * passed to <notify>, the appropriate <handle_*> method is
+ * dispatched in the context of the <ACE_Select_Reactor> thread.
+ */
+class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify
+{
+public:
+ /// Constructor.
+ ACE_Select_Reactor_Notify (void);
+
+ /// Destructor.
+ ~ACE_Select_Reactor_Notify (void);
+
+ // = Initialization and termination methods.
+ /// Initialize.
+ virtual int open (ACE_Reactor_Impl *,
+ ACE_Timer_Queue * = 0,
+ int disable_notify_pipe = 0);
+
+ /// Destroy.
+ virtual int close (void);
+
+ /**
+ * Called by a thread when it wants to unblock the
+ * <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if
+ * currently blocked in <select>/<poll>. Pass over both the
+ * <Event_Handler> *and* the <mask> to allow the caller to dictate
+ * which <Event_Handler> method the <ACE_Select_Reactor> will
+ * invoke. The <ACE_Time_Value> indicates how long to blocking
+ * trying to notify the <ACE_Select_Reactor>. If <timeout> == 0,
+ * the caller will block until action is possible, else will wait
+ * until the relative time specified in *<timeout> elapses).
+ */
+ virtual int notify (ACE_Event_Handler * = 0,
+ ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK,
+ ACE_Time_Value * = 0);
+
+ /// Handles pending threads (if any) that are waiting to unblock the
+ /// <ACE_Select_Reactor>.
+ virtual int dispatch_notifications (int &number_of_active_handles,
+ ACE_Handle_Set &rd_mask);
+
+ /// Returns the ACE_HANDLE of the notify pipe on which the reactor
+ /// is listening for notifications so that other threads can unblock
+ /// the Select_Reactor
+ virtual ACE_HANDLE notify_handle (void);
+
+ /// Handle one of the notify call on the <handle>. This could be
+ /// because of a thread trying to unblock the <Reactor_Impl>
+ virtual int dispatch_notify (ACE_Notification_Buffer &buffer);
+
+ /// Read one of the notify call on the <handle> into the
+ /// <buffer>. This could be because of a thread trying to unblock
+ /// the <Reactor_Impl>
+ virtual int read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer);
+
+ /// Verify whether the buffer has dispatchable info or not.
+ virtual int is_dispatchable (ACE_Notification_Buffer &buffer);
+
+ /// Called back by the <ACE_Select_Reactor> when a thread wants to
+ /// unblock us.
+ virtual int handle_input (ACE_HANDLE handle);
+
+ /**
+ * Set the maximum number of times that the
+ * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
+ * dispatch the <ACE_Event_Handlers> that are passed in via the
+ * notify pipe before breaking out of its <recv> loop. By default,
+ * this is set to -1, which means "iterate until the pipe is empty."
+ * Setting this to a value like "1 or 2" will increase "fairness"
+ * (and thus prevent starvation) at the expense of slightly higher
+ * dispatching overhead.
+ */
+ virtual void max_notify_iterations (int);
+
+ /**
+ * Get the maximum number of times that the
+ * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
+ * dispatch the <ACE_Event_Handlers> that are passed in via the
+ * notify pipe before breaking out of its <recv> loop.
+ */
+ virtual int max_notify_iterations (void);
+
+ /**
+ * Purge any notifications pending in this reactor for the specified
+ * <ACE_Event_Handler> object. If <eh> == 0, all notifications for all
+ * handlers are removed (but not any notifications posted just to wake up
+ * the reactor itself). Returns the number of notifications purged.
+ * Returns -1 on error.
+ */
+ virtual int purge_pending_notifications (ACE_Event_Handler *,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+
+ /// Dump the state of an object.
+ virtual void dump (void) const;
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+protected:
+ /**
+ * Keep a back pointer to the <ACE_Select_Reactor>. If this value
+ * if NULL then the <ACE_Select_Reactor> has been initialized with
+ * <disable_notify_pipe>.
+ */
+ ACE_Select_Reactor_Impl *select_reactor_;
+
+ /**
+ * Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening
+ * on, as well as the <ACE_HANDLE> that threads wanting the
+ * attention of the <ACE_Select_Reactor> will write to.
+ */
+ ACE_Pipe notification_pipe_;
+
+ /**
+ * Keeps track of the maximum number of times that the
+ * <ACE_Select_Reactor_Notify::handle_input> method will iterate and
+ * dispatch the <ACE_Event_Handlers> that are passed in via the
+ * notify pipe before breaking out of its <recv> loop. By default,
+ * this is set to -1, which means "iterate until the pipe is empty."
+ */
+ int max_notify_iterations_;
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // = This configuration queues up notifications in separate buffers that
+ // are in user-space, rather than stored in a pipe in the OS
+ // kernel. The kernel-level notifications are used only to trigger
+ // the Reactor to check its notification queue. This enables many
+ // more notifications to be stored than would otherwise be the case.
+
+ /// Keeps track of allocated arrays of type
+ /// <ACE_Notification_Buffer>.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_queue_;
+
+ /// Keeps track of all pending notifications.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_queue_;
+
+ /// Keeps track of all free buffers.
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_queue_;
+
+ /// Synchronization for handling of queues.
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+};
+
+
+#include "ace/post.h"
+#endif /*ACE_SELECT_REACTOR_NOTIFY_H*/
diff --git a/ace/Select_Reactor_T.cpp b/ace/Select_Reactor_T.cpp
index 9a45c4183ff..51ea42f35d6 100644
--- a/ace/Select_Reactor_T.cpp
+++ b/ace/Select_Reactor_T.cpp
@@ -13,6 +13,7 @@
#include "ace/Log_Msg.h"
#include "ace/Thread.h"
#include "ace/Timer_Heap.h"
+#include "Select_Reactor_Notify.h"
// @@ The latest version of SunCC can't grok the code if we put inline
// function here. Therefore, we temporarily disable the code here.
@@ -996,7 +997,7 @@ ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i
}
template <class ACE_SELECT_REACTOR_TOKEN> int
-ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::work_pending
+ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::work_pending
(const ACE_Time_Value &max_wait_time)
{
ACE_TRACE ("ACE_Select_Reactor_T::work_pending");
diff --git a/ace/ace_dll.dsp b/ace/ace_dll.dsp
index cd7cbeb09c3..ea8a5c4ff7d 100644
--- a/ace/ace_dll.dsp
+++ b/ace/ace_dll.dsp
@@ -762,6 +762,14 @@ SOURCE=.\Select_Reactor_Base.cpp
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Service_Config.cpp
# End Source File
# Begin Source File
@@ -1914,6 +1922,14 @@ SOURCE=.\Select_Reactor_Base.h
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.h
# End Source File
# Begin Source File
@@ -2890,6 +2906,10 @@ SOURCE=.\Select_Reactor_Base.i
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.i
# End Source File
# Begin Source File
diff --git a/ace/ace_lib.dsp b/ace/ace_lib.dsp
index 9aff45872f0..ca98ce59af1 100644
--- a/ace/ace_lib.dsp
+++ b/ace/ace_lib.dsp
@@ -42,8 +42,8 @@ RSC=rc.exe
# PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Release"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /MD /W3 /GX /O1 /I "../" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /FD /c
# SUBTRACT BASE CPP /YX
# ADD CPP /nologo /MT /W3 /GX /O1 /I "../" /I "../PACE" /D ACE_OS_HAS_DLL=0 /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /FD /c
@@ -69,8 +69,8 @@ LIB32=link.exe -lib
# PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Debug"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /Gy /I "../" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /FD /c
# SUBTRACT BASE CPP /YX
# ADD CPP /nologo /MTd /W3 /GX /Z7 /Od /Gy /I "../" /I "../PACE" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D ACE_OS_HAS_DLL=0 /FD /c
@@ -96,8 +96,8 @@ LIB32=link.exe -lib
# PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Release"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /G5 /MT /W3 /GX /O1 /I "../" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /YX /FD /c
# ADD CPP /nologo /MD /W3 /GX /Zi /O1 /I "../" /I "../PACE" /D "_WINDOWS" /D "NDEBUG" /D "ACE_AS_STATIC_LIBS" /D "WIN32" /FD /c
# SUBTRACT CPP /YX
@@ -122,8 +122,8 @@ LIB32=link.exe -lib
# PROP Output_Dir ""
# PROP Intermediate_Dir ".\LIB\Debug"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /G5 /MTd /W3 /Gm /GX /Zi /Od /Gy /I "../" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D ACE_HAS_DLL=0 /D "ACE_NO_INLINE" /YX /FD /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /Gy /I "../" /I "../PACE" /D "_WINDOWS" /D "_DEBUG" /D "ACE_AS_STATIC_LIBS" /D "WIN32" /FD /c
# SUBTRACT CPP /YX
@@ -753,6 +753,14 @@ SOURCE=.\Select_Reactor_Base.cpp
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Service_Config.cpp
# End Source File
# Begin Source File
@@ -1901,6 +1909,14 @@ SOURCE=.\Select_Reactor_Base.h
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Select_Reactor_Notify.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.h
# End Source File
# Begin Source File
@@ -2877,6 +2893,10 @@ SOURCE=.\Select_Reactor_Base.i
# End Source File
# Begin Source File
+SOURCE=.\Select_Reactor_Handler_Repository.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Select_Reactor_T.i
# End Source File
# Begin Source File
diff --git a/tests/ChangeLog b/tests/ChangeLog
index 4c22a4680c1..55f05fcf975 100644
--- a/tests/ChangeLog
+++ b/tests/ChangeLog
@@ -1,3 +1,28 @@
+Tue Aug 06 00:39:15 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * ace/Select_Reactor_Notify.h:
+ * ace/Select_Reactor_Notify.cpp: Moved the declaration and
+ definition of ACE_Select_Reactor_Notify from
+ Select_Reactor_Base.* to the above files.
+
+ * ace/Select_Reactor_Handler_Repository.h:
+ * ace/Select_Reactor_Handler_Repository.cpp:
+ * ace/Select_Reactor_Handler_Repository.inl: Moved the declaration
+ and definition of ACE_Select_Reactor_Handler_Repository from
+ Select_Reactor_Base.* to the above files.
+
+ * ace/Select_Reactor_Base.h:
+ * ace/Select_Reactor_Base.cpp:
+ * ace/Select_Reactor_Base.i: Moved the classes mentioned above to
+ a new file. The above classes were cluttering the file and
+ things were getting confusing posing maintenance nightmare.
+
+ * ace/Select_Reactor_T.cpp: Needed an include to compile on
+ Win32.
+
+ * ace/ace_dll.dsp:
+ * ace/ace_lib.dsp: Added the new files to the project files.
+
Mon Aug 5 22:37:13 2002 Balachandran Natarajan <bala@cs.wustl.edu>
* tests/Makefile: Added the new test to the Makefile.