diff options
author | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-08-25 10:43:40 +0000 |
---|---|---|
committer | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-08-25 10:43:40 +0000 |
commit | d018c54335491d79dd46e539bfebb895f9aec364 (patch) | |
tree | dee655fbd459edf7955f4097b9ed042126fc8989 | |
parent | d152db3bebb93c460493c22a6caae17c3f5cc756 (diff) | |
download | ATCD-d018c54335491d79dd46e539bfebb895f9aec364.tar.gz |
Templatized Select_Reactor
-rw-r--r-- | ace/Select_Reactor.cpp | 2015 | ||||
-rw-r--r-- | ace/Select_Reactor.h | 943 | ||||
-rw-r--r-- | ace/Select_Reactor.i | 290 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 751 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 411 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.i | 83 | ||||
-rw-r--r-- | ace/Select_Reactor_T.cpp | 1230 | ||||
-rw-r--r-- | ace/Select_Reactor_T.h | 605 | ||||
-rw-r--r-- | ace/Select_Reactor_T.i | 226 | ||||
-rw-r--r-- | ace/Synch.h | 22 | ||||
-rw-r--r-- | ace/Synch.i | 14 |
11 files changed, 3376 insertions, 3214 deletions
diff --git a/ace/Select_Reactor.cpp b/ace/Select_Reactor.cpp index 22e7745a440..863f2512e47 100644 --- a/ace/Select_Reactor.cpp +++ b/ace/Select_Reactor.cpp @@ -3,1993 +3,46 @@ #define ACE_BUILD_DLL #include "ace/Select_Reactor.h" -#include "ace/Reactor.h" -#include "ace/Thread.h" -#include "ace/Synch_T.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/SOCK_Connector.h" -#include "ace/Timer_Heap.h" - -#if !defined (__ACE_INLINE__) -#include "ace/Select_Reactor.i" -#endif /* __ACE_INLINE__ */ ACE_RCSID(ace, Select_Reactor, "$Id$") -ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor) - -#if defined (ACE_WIN32) -#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) -#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) -#else -#define ACE_SELECT_REACTOR_HANDLE(H) (H) -#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) -#endif /* ACE_WIN32 */ - -// Performs sanity checking on the ACE_HANDLE. - -int -ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); -#if defined (ACE_WIN32) - // It's too expensive to perform more exhaustive validity checks on - // Win32 due to the way that they implement SOCKET HANDLEs. - if (handle == ACE_INVALID_HANDLE) -#else /* !ACE_WIN32 */ - if (handle < 0 || handle >= this->max_size_) -#endif /* ACE_WIN32 */ - { - errno = EINVAL; - return 1; - } - else - return 0; -} - -// Performs sanity checking on the ACE_HANDLE. - -int -ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); -#if defined (ACE_WIN32) - // It's too expensive to perform more exhaustive validity checks on - // Win32 due to the way that they implement SOCKET HANDLEs. - if (handle != ACE_INVALID_HANDLE) -#else /* !ACE_WIN32 */ - if (handle >= 0 && handle < this->max_handlep1_) -#endif /* ACE_WIN32 */ - return 1; - else - { - errno = EINVAL; - return 0; - } -} - -size_t -ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); - - return this->max_handlep1_; -} - -int -ACE_Select_Reactor_Handler_Repository::open (size_t size) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); - this->max_size_ = size; - this->max_handlep1_ = 0; - -#if defined (ACE_WIN32) - // Try to allocate the memory. - ACE_NEW_RETURN (this->event_handlers_, - ACE_Event_Tuple[size], - -1); - - // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. - for (size_t h = 0; h < size; h++) - { - ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; - } -#else - // Try to allocate the memory. - ACE_NEW_RETURN (this->event_handlers_, - ACE_Event_Handler *[size], - -1); - - // Initialize the ACE_Event_Handler * to NULL. - for (size_t h = 0; h < size; h++) - ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; -#endif /* ACE_WIN32 */ - - // Check to see if the user is asking for too much and fail in this - // case. - if (size > FD_SETSIZE) - { - errno = ERANGE; - return -1; - } - else - { - // Try to increase the number of handles if <size> is greater than - // the current limit. We ignore the return value here because this - // is more of a "warning" not an error. - (void) ACE::set_handle_limit (size); - return 0; - } -} - -// Initialize a repository of the appropriate <size>. - -ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor &select_reactor) - : select_reactor_ (select_reactor), - max_size_ (0), - max_handlep1_ (0), - event_handlers_ (0) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); -} - -int -ACE_Select_Reactor_Handler_Repository::unbind_all (void) -{ - // Unbind all of the <handle, ACE_Event_Handler>s. - for (int handle = 0; - handle < this->max_handlep1_; - handle++) - this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), - ACE_Event_Handler::ALL_EVENTS_MASK); - - return 0; -} - -int -ACE_Select_Reactor_Handler_Repository::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); - - if (this->event_handlers_ != 0) - { - this->unbind_all (); - - delete [] this->event_handlers_; - this->event_handlers_ = 0; - } - return 0; -} - -// Return the <ACE_Event_Handler *> associated with the <handle>. - -ACE_Event_Handler * -ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, - size_t *index_p) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); - - ACE_Event_Handler *eh = 0; - ssize_t i; - - // Only bother to search for the <handle> if it's in range. - if (this->handle_in_range (handle)) - { -#if defined (ACE_WIN32) - i = 0; - - for (; i < this->max_handlep1_; i++) - if (ACE_SELECT_REACTOR_HANDLE (i) == handle) - { - eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); - break; - } -#else - i = handle; - - eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); -#endif /* ACE_WIN32 */ - } - else - // g++ can't figure out that i won't be used below if the handle - // is out of range, so keep it happy by defining i here . . . - i = 0; - - if (eh != 0 && index_p != 0) - *index_p = i; - else - errno = ENOENT; - - return eh; -} - -// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>. - -int -ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); - - if (handle == ACE_INVALID_HANDLE) - handle = event_handler->get_handle (); - - if (this->invalid_handle (handle)) - return -1; - -#if defined (ACE_WIN32) - int assigned_slot = -1; - - for (ssize_t i = 0; i < this->max_handlep1_; i++) - { - // Found it, so let's just reuse this location. - if (ACE_SELECT_REACTOR_HANDLE (i) == handle) - { - assigned_slot = i; - break; - } - // Here's the first free slot, so let's take it. - else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE - && assigned_slot == -1) - assigned_slot = i; - } - - if (assigned_slot > -1) - // We found a free spot, let's reuse it. - { - ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; - } - else if (this->max_handlep1_ < this->max_size_) - { - // Insert at the end of the active portion. - ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; - this->max_handlep1_++; - } - else - { - // No more room at the inn! - errno = ENOMEM; - return -1; - } -#else - ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; - - if (this->max_handlep1_ < handle + 1) - this->max_handlep1_ = handle + 1; -#endif /* ACE_WIN32 */ - - // Add the <mask> for this <handle> in the Select_Reactor's wait_set. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.wait_set_, - ACE_Reactor::ADD_MASK); - - // Note the fact that we've changed the state of the <wait_set_>, - // which is used by the dispatching loop to determine whether it can - // keep going or if it needs to reconsult select(). - this->select_reactor_.state_changed_ = 1; - - return 0; -} - -// Remove the binding of <ACE_HANDLE>. - -int -ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); - - size_t index; - ACE_Event_Handler *eh = this->find (handle, &index); - - if (eh == 0) - return -1; - - // Clear out the <mask> bits in the Select_Reactor's wait_set. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.wait_set_, - ACE_Reactor::CLR_MASK); - - // Note the fact that we've changed the state of the <wait_set_>, - // which is used by the dispatching loop to determine whether it can - // keep going or if it needs to reconsult select(). - this->select_reactor_.state_changed_ = 1; - - // Close down the <Event_Handler> unless we've been instructed not - // to. - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) - eh->handle_close (handle, mask); - - // If there are no longer any outstanding events on this <handle> - // then we can totally shut down the Event_Handler. - if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 - && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 - && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) -#if defined (ACE_WIN32) - { - ACE_SELECT_REACTOR_HANDLE (index) = ACE_INVALID_HANDLE; - ACE_SELECT_REACTOR_EVENT_HANDLER (this, index) = 0; - - if (this->max_handlep1_ == (int) index + 1) - { - // We've deleted the last entry (i.e., i + 1 == the current - // size of the array), so we need to figure out the last - // valid place in the array that we should consider in - // subsequent searches. - - int i; - - for (i = this->max_handlep1_ - 1; - i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; - i--) - continue; - - this->max_handlep1_ = i + 1; - } - } -#else - { - ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; - - if (this->max_handlep1_ == handle + 1) - { - // We've deleted the last entry, so we need to figure out - // the last valid place in the array that is worth looking - // at. - ACE_HANDLE rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); - ACE_HANDLE wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); - ACE_HANDLE ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); - - // Compute the maximum of three values. - this->max_handlep1_ = rd_max < wr_max ? wr_max : rd_max; - - if (this->max_handlep1_ < ex_max) - this->max_handlep1_ = ex_max; - - this->max_handlep1_++; - } - } -#endif /* ACE_WIN32 */ - - return 0; -} - -ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator - (const ACE_Select_Reactor_Handler_Repository *s) - : rep_ (s), - current_ (-1) -{ - this->advance (); -} - -// Pass back the <next_item> that hasn't been seen in the Set. -// Returns 0 when all items have been seen, else 1. - -int -ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) -{ - int result = 1; - - if (this->current_ >= this->rep_->max_handlep1_) - result = 0; - else - next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, - this->current_); - return result; -} - -int -ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const -{ - return this->current_ >= this->rep_->max_handlep1_; -} - -// Move forward by one element in the set. - -int -ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) -{ - if (this->current_ < this->rep_->max_handlep1_) - this->current_++; - - while (this->current_ < this->rep_->max_handlep1_) - if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) - return 1; - else - this->current_++; - - return this->current_ < this->rep_->max_handlep1_; -} - -// Dump the state of an object. - -void -ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("rep_ = %u"), this->rep_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("current_ = %d"), this->current_)); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -void -ACE_Select_Reactor_Handler_Repository::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), - this->max_handlep1_, this->max_size_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("["))); - - ACE_Event_Handler *eh = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); - iter.next (eh) != 0; - iter.advance ()) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (eh = %x, eh->handle_ = %d)"), - eh, eh->get_handle ())); - - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" ]"))); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) - -int -ACE_Select_Reactor::any_ready (ACE_Select_Reactor_Handle_Set &wait_set) -{ - ACE_TRACE ("ACE_Select_Reactor::fill_in_ready"); - -#if !defined (ACE_WIN32) - // Make this call signal safe. - ACE_Sig_Guard sb; -#endif /* ACE_WIN32 */ - - int number_ready = this->ready_set_.rd_mask_.num_set () - + this->ready_set_.wr_mask_.num_set () - + this->ready_set_.ex_mask_.num_set (); - - if (number_ready > 0) - { - wait_set.rd_mask_ = this->ready_set_.rd_mask_; - wait_set.wr_mask_ = this->ready_set_.wr_mask_; - wait_set.ex_mask_ = this->ready_set_.ex_mask_; - - this->ready_set_.rd_mask_.reset (); - this->ready_set_.wr_mask_.reset (); - this->ready_set_.ex_mask_.reset (); - } - - return number_ready; -} - -int -ACE_Select_Reactor::handler_i (int signum, ACE_Event_Handler **eh) -{ - ACE_TRACE ("ACE_Select_Reactor::handler_i"); - ACE_Event_Handler *handler = this->signal_handler_->handler (signum); - - if (handler == 0) - return -1; - else if (*eh != 0) - *eh = handler; - return 0; -} - -int -ACE_Select_Reactor::initialized (void) -{ - ACE_TRACE ("ACE_Select_Reactor::initialized"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, 0)); - return this->initialized_; -} - -int -ACE_Select_Reactor::owner (ACE_thread_t tid, ACE_thread_t *o_id) -{ - ACE_TRACE ("ACE_Select_Reactor::owner"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - if (o_id) - *o_id = this->owner_; - - this->owner_ = tid; - - return 0; -} - -int -ACE_Select_Reactor::owner (ACE_thread_t *t_id) -{ - ACE_TRACE ("ACE_Select_Reactor::owner"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - *t_id = this->owner_; - return 0; -} - -void -ACE_Select_Reactor::requeue_position (int rp) -{ - ACE_TRACE ("ACE_Select_Reactor::requeue_position"); - ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_)); -#if defined (ACE_WIN32) - ACE_UNUSED_ARG (rp); - // Must always requeue ourselves "next" on Win32. - this->requeue_position_ = 0; -#else - this->requeue_position_ = rp; -#endif /* ACE_WIN32 */ -} - -int -ACE_Select_Reactor::requeue_position (void) -{ - ACE_TRACE ("ACE_Select_Reactor::requeue_position"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->requeue_position_; -} - -void -ACE_Select_Reactor::max_notify_iterations (int iterations) -{ - ACE_TRACE ("ACE_Select_Reactor::max_notify_iterations"); - ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_)); - - this->notify_handler_->max_notify_iterations (iterations); -} - -int -ACE_Select_Reactor::max_notify_iterations (void) -{ - ACE_TRACE ("ACE_Select_Reactor::max_notify_iterations"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->notify_handler_->max_notify_iterations (); -} - -// Enqueue ourselves into the list of waiting threads. -void -ACE_Select_Reactor::renew (void) -{ - ACE_TRACE ("ACE_Select_Reactor::renew"); -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - if (this->supress_notify_renew () == 0) - this->token_.renew (this->requeue_position_); -#endif /* defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) */ -} - -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - -void -ACE_Select_Reactor_Token::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Token::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -ACE_Select_Reactor_Token::ACE_Select_Reactor_Token (ACE_Select_Reactor &r) - : select_reactor_ (&r) -#if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) - , ACE_Local_Mutex (0) // Triggers unique name by stringifying "this"... -#endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ -{ - ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); -} - -ACE_Select_Reactor_Token::ACE_Select_Reactor_Token (void) - : select_reactor_ (0) -#if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) - , ACE_Local_Mutex (0) // Triggers unique name by stringifying "this"... -#endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ -{ - ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); -} - -ACE_Select_Reactor_Token::~ACE_Select_Reactor_Token (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Token::~ACE_Select_Reactor_Token"); -} - -ACE_Select_Reactor & -ACE_Select_Reactor_Token::select_reactor (void) -{ - return *this->select_reactor_; -} - -void -ACE_Select_Reactor_Token::select_reactor (ACE_Select_Reactor &select_reactor) -{ - this->select_reactor_ = &select_reactor; -} - -// Used to wakeup the Select_Reactor. - -void -ACE_Select_Reactor_Token::sleep_hook (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Token::sleep_hook"); - if (this->select_reactor_->notify () == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("%p\n"), - ASYS_TEXT ("sleep_hook failed"))); -} - -#endif /* ACE_MT_SAFE */ - -ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void) - : max_notify_iterations_ (-1) -{ -} - -void -ACE_Select_Reactor_Notify::max_notify_iterations (int iterations) -{ - // Must always be > 0 or < 0 to optimize the loop exit condition. - if (iterations == 0) - iterations = 1; - - this->max_notify_iterations_ = iterations; -} - -int -ACE_Select_Reactor_Notify::max_notify_iterations (void) -{ - return this->max_notify_iterations_; -} - -void -ACE_Select_Reactor_Notify::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("select_reactor_ = %x"), this->select_reactor_)); - this->notification_pipe_.dump (); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -int -ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, - ACE_Timer_Queue *, - int disable_notify_pipe) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::open"); - - if (disable_notify_pipe == 0) - { - this->select_reactor_ = ACE_dynamic_cast (ACE_Select_Reactor *, r); - - if (select_reactor_ == 0) - { - errno = EINVAL; - return -1; - } - - if (this->notification_pipe_.open () == -1) - return -1; - - // There seems to be a Win32 bug with this... Set this into - // non-blocking mode. - if (ACE::set_flags (this->notification_pipe_.read_handle (), - ACE_NONBLOCK) == -1) - return -1; - else - return this->select_reactor_->register_handler - (this->notification_pipe_.read_handle (), - this, - ACE_Event_Handler::READ_MASK); - } - else - { - this->select_reactor_ = 0; - return 0; - } -} - -int -ACE_Select_Reactor_Notify::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::close"); - return this->notification_pipe_.close (); -} - -ssize_t -ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); - - // Just consider this method a "no-op" if there's no - // <ACE_Select_Reactor> configured. - if (this->select_reactor_ == 0) - return 0; - else - { - ACE_Notification_Buffer buffer (eh, mask); - - ssize_t n = ACE::send (this->notification_pipe_.write_handle (), - (char *) &buffer, - sizeof buffer, - timeout); - if (n == -1) - return -1; - else - return 0; - } -} - -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. - -int -ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, - const ACE_Handle_Set &rd_mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::handle_notification"); - - ACE_HANDLE read_handle = - this->notification_pipe_.read_handle (); - - if (read_handle != ACE_INVALID_HANDLE - && rd_mask.is_set (read_handle)) - { - number_of_active_handles--; - return this->handle_input (read_handle); - } - else - return 0; -} - -// Special trick to unblock <select> when updates occur in somewhere -// other than the main <ACE_Select_Reactor> thread. All we do is -// write data to a pipe that the <ACE_Select_Reactor> is listening on. -// Thanks to Paul Stephenson for suggesting this approach. - -int -ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); - // Precondition: this->select_reactor_.token_.current_owner () == - // ACE_Thread::self (); - - ACE_Notification_Buffer buffer; - ssize_t n; - int number_dispatched = 0; - - while ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) - { - // Check to see if we've got a short read. - if (n != sizeof buffer) - { - ssize_t remainder = sizeof buffer - n; - - // If so, try to recover by reading the remainder. If this - // doesn't work we're in big trouble since the input stream - // won't be aligned correctly. I'm not sure quite what to - // do at this point. It's probably best just to return -1. - if (ACE::recv (handle, - ((char *) &buffer) + n, - remainder) != remainder) - return -1; - } - - // If eh == 0 then another thread is unblocking the - // ACE_Select_Reactor to update the ACE_Select_Reactor's - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the ACE_Event_Handler pointer - // we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - - number_dispatched++; - - // Bail out if we've reached the <notify_threshold_>. Note that - // by default <notify_threshold_> is -1, so we'll loop until all - // the notifications in the pipe have been dispatched. - if (number_dispatched == this->max_notify_iterations_) - break; - } - - // Reassign number_dispatched to -1 if things have gone seriously - // wrong. - if (n <= 0 && errno != EWOULDBLOCK) - number_dispatched = -1; - - // Enqueue ourselves into the list of waiting threads. When we - // reacquire the token we'll be off and running again with ownership - // of the token. The postcondition of this call is that - // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); - this->select_reactor_->renew (); - return number_dispatched; -} - -int -ACE_Select_Reactor::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Select_Reactor::notify"); - - ssize_t n = 0; - - // Pass over both the Event_Handler *and* the mask to allow the - // caller to dictate which Event_Handler method the receiver - // invokes. Note that this call can timeout. - - n = this->notify_handler_->notify (eh, mask, timeout); - return n == -1 ? -1 : 0; -} - -int -ACE_Select_Reactor::resume_handler (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor::resume_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->resume_i (handle); -} - -int -ACE_Select_Reactor::suspend_handler (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor::suspend_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->suspend_i (handle); -} - -int -ACE_Select_Reactor::suspend_handlers (void) -{ - ACE_TRACE ("ACE_Select_Reactor::suspend_handlers"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - ACE_Event_Handler *eh = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); - iter.next (eh) != 0; - iter.advance ()) - this->suspend_i (eh->get_handle ()); - - return 0; -} - -int -ACE_Select_Reactor::resume_handlers (void) -{ - ACE_TRACE ("ACE_Select_Reactor::resume_handlers"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - ACE_Event_Handler *eh = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); - iter.next (eh) != 0; - iter.advance ()) - this->resume_i (eh->get_handle ()); - - return 0; -} - -int -ACE_Select_Reactor::register_handler (ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->register_handler_i (handler->get_handle (), handler, mask); -} - -int -ACE_Select_Reactor::register_handler (ACE_HANDLE handle, - ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->register_handler_i (handle, handler, mask); -} - -int -ACE_Select_Reactor::register_handler (const ACE_Handle_Set &handles, - ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->register_handler_i (handles, handler, mask); -} - -int -ACE_Select_Reactor::handler (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Event_Handler **handler) -{ - ACE_TRACE ("ACE_Select_Reactor::handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->handler_i (handle, mask, handler); -} - -int -ACE_Select_Reactor::remove_handler (const ACE_Handle_Set &handles, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->remove_handler_i (handles, mask); -} - -int -ACE_Select_Reactor::remove_handler (ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->remove_handler_i (handler->get_handle (), mask); -} - -int -ACE_Select_Reactor::remove_handler (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->remove_handler_i (handle, mask); -} - -// Performs operations on the "ready" bits. - -int -ACE_Select_Reactor::ready_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_Select_Reactor::ready_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->bit_ops (handle, - mask, - this->ready_set_, - ops); -} - -int -ACE_Select_Reactor::open (size_t size, - int restart, - ACE_Sig_Handler *sh, - ACE_Timer_Queue *tq, - int disable_notify_pipe, - ACE_Reactor_Notify *notify) -{ - ACE_TRACE ("ACE_Select_Reactor::open"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - // Can't initialize ourselves more than once. - if (this->initialized_ > 0) - return -1; - - this->owner_ = ACE_Thread::self (); - this->restart_ = restart; - this->signal_handler_ = sh; - this->timer_queue_ = tq; - this->notify_handler_ = notify; - - int result = 0; - - // Allows the signal handler to be overridden. - if (this->signal_handler_ == 0) - { - ACE_NEW_RETURN (this->signal_handler_, - ACE_Sig_Handler, - -1); - - if (this->signal_handler_ == 0) - result = -1; - else - this->delete_signal_handler_ = 1; - } - - // Allows the timer queue to be overridden. - if (result != -1 && this->timer_queue_ == 0) - { - ACE_NEW_RETURN (this->timer_queue_, - ACE_Timer_Heap, - -1); - - if (this->timer_queue_ == 0) - result = -1; - else - this->delete_timer_queue_ = 1; - } - - // Allows the Notify_Handler to be overridden. - if (result != -1 && this->notify_handler_ == 0) - { - ACE_NEW_RETURN (this->notify_handler_, - ACE_Select_Reactor_Notify, - -1); - - if (this->notify_handler_ == 0) - result = -1; - else - this->delete_notify_handler_ = 1; - } - - if (result != -1 && this->handler_rep_.open (size) == -1) - result = -1; - else if (this->notify_handler_->open (this, - 0, - disable_notify_pipe) == -1) - result = -1; - - if (result != -1) - // We're all set to go. - this->initialized_ = 1; - else - // This will close down all the allocated resources properly. - this->close (); - - return result; -} - -int -ACE_Select_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler) -{ - if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0) - delete this->signal_handler_; - this->signal_handler_ = signal_handler; - this->delete_signal_handler_ = 0; - return 0; -} - -int -ACE_Select_Reactor::set_timer_queue (ACE_Timer_Queue *timer_queue) -{ - if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0) - delete this->timer_queue_; - this->timer_queue_ = timer_queue; - this->delete_timer_queue_ = 0; - return 0; -} - -ACE_Select_Reactor::ACE_Select_Reactor (ACE_Sig_Handler *sh, - ACE_Timer_Queue *tq, - int disable_notify_pipe, - ACE_Reactor_Notify *notify) - : handler_rep_ (*this), - timer_queue_ (0), - delete_timer_queue_ (0), - delete_signal_handler_ (0), - delete_notify_handler_ (0), - requeue_position_ (-1), // Requeue at end of waiters by default. - initialized_ (0), - state_changed_ (0), -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - token_ (*this), -#endif /* ACE_MT_SAFE */ - lock_adapter_ (token_), - supress_renew_ (0) -{ - ACE_TRACE ("ACE_Select_Reactor::ACE_Select_Reactor"); - - if (this->open (ACE_Select_Reactor::DEFAULT_SIZE, - 0, - sh, - tq, - disable_notify_pipe, - notify) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("%p\n"), - ASYS_TEXT ("ACE_Select_Reactor::open ") - ASYS_TEXT ("failed inside ACE_Select_Reactor::CTOR"))); -} - -// Initialize ACE_Select_Reactor. - -ACE_Select_Reactor::ACE_Select_Reactor (size_t size, - int rs, - ACE_Sig_Handler *sh, - ACE_Timer_Queue *tq, - int disable_notify_pipe, - ACE_Reactor_Notify *notify) - : handler_rep_ (*this), - timer_queue_ (0), - delete_timer_queue_ (0), - delete_signal_handler_ (0), - delete_notify_handler_ (0), - requeue_position_ (-1), // Requeue at end of waiters by default. - initialized_ (0), - state_changed_ (0), -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - token_ (*this), -#endif /* ACE_MT_SAFE */ - lock_adapter_ (token_), - supress_renew_ (0) -{ - ACE_TRACE ("ACE_Select_Reactor::ACE_Select_Reactor"); - - if (this->open (size, - rs, - sh, - tq, - disable_notify_pipe, - notify) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("%p\n"), - ASYS_TEXT ("ACE_Select_Reactor::open ") - ASYS_TEXT ("failed inside ACE_Select_Reactor::CTOR"))); -} - -// Close down the ACE_Select_Reactor instance, detaching any remaining -// Event_Handers. This had better be called from the main event loop -// thread... - -int -ACE_Select_Reactor::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor::close"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - if (this->delete_signal_handler_) - { - delete this->signal_handler_; - this->signal_handler_ = 0; - this->delete_signal_handler_ = 0; - } - - this->handler_rep_.close (); - - if (this->delete_timer_queue_) - { - delete this->timer_queue_; - this->timer_queue_ = 0; - this->delete_timer_queue_ = 0; - } - - this->notify_handler_->close (); - - if (this->delete_notify_handler_) - { - delete this->notify_handler_; - this->notify_handler_ = 0; - this->delete_notify_handler_ = 0; - } - - this->initialized_ = 0; - - return 0; -} - -int -ACE_Select_Reactor::current_info (ACE_HANDLE, size_t &) -{ - return -1; -} - -ACE_Select_Reactor::~ACE_Select_Reactor (void) -{ - ACE_TRACE ("ACE_Select_Reactor::~ACE_Select_Reactor"); - this->close (); -} - -int -ACE_Select_Reactor::remove_handler_i (const ACE_Handle_Set &handles, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler_i"); - ACE_HANDLE h; - - ACE_Handle_Set_Iterator handle_iter (handles); - - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->remove_handler_i (h, mask) == -1) - return -1; - - return 0; -} - -int -ACE_Select_Reactor::register_handler_i (const ACE_Handle_Set &handles, - ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler_i"); - ACE_HANDLE h; - - ACE_Handle_Set_Iterator handle_iter (handles); - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->register_handler_i (h, handler, mask) == -1) - return -1; - - return 0; -} - -int -ACE_Select_Reactor::register_handler (const ACE_Sig_Set &sigset, - ACE_Event_Handler *new_sh, - ACE_Sig_Action *new_disp) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler"); - - int result = 0; - -#if (ACE_NSIG > 0) - for (int s = 1; s < ACE_NSIG; s++) - if (sigset.is_member (s) - && this->signal_handler_->register_handler (s, new_sh, - new_disp) == -1) - result = -1; -#else - ACE_UNUSED_ARG (sigset); - ACE_UNUSED_ARG (new_sh); - ACE_UNUSED_ARG (new_disp); -#endif /* ACE_NSIG */ - return result; -} - -int -ACE_Select_Reactor::remove_handler (const ACE_Sig_Set &sigset) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler"); - int result = 0; - -#if (ACE_NSIG > 0) - for (int s = 1; s < ACE_NSIG; s++) - if (sigset.is_member (s) - && this->signal_handler_->remove_handler (s) == -1) - result = -1; -#else - ACE_UNUSED_ARG (sigset); -#endif /* ACE_NSIG */ - - return result; -} - -// Note the queue handles its own locking. - -long -ACE_Select_Reactor::schedule_timer (ACE_Event_Handler *handler, - const void *arg, - const ACE_Time_Value &delta_time, - const ACE_Time_Value &interval) -{ - ACE_TRACE ("ACE_Select_Reactor::schedule_timer"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - return this->timer_queue_->schedule - (handler, arg, timer_queue_->gettimeofday () + delta_time, interval); -} - -// Main event loop driver that blocks for <max_wait_time> before -// returning (will return earlier if I/O or signal events occur). - -int -ACE_Select_Reactor::handle_events (ACE_Time_Value &max_wait_time) -{ - ACE_TRACE ("ACE_Select_Reactor::handle_events"); - - return this->handle_events (&max_wait_time); -} - -int -ACE_Select_Reactor::handle_error (void) -{ - ACE_TRACE ("ACE_Select_Reactor::handle_error"); - if (errno == EINTR) - return this->restart_; -#if defined (__MVS__) - // On MVS Open Edition, there can be a number of failure codes on a bad - // socket, so check_handles on anything other than EINTR. - else - return this->check_handles (); -#else - else if (errno == EBADF) - return this->check_handles (); - else - return -1; -#endif /* __MVS__ */ -} - -void -ACE_Select_Reactor::notify_handle (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Handle_Set &ready_mask, - ACE_Event_Handler *event_handler, - ACE_EH_PTMF ptmf) -{ - ACE_TRACE ("ACE_Select_Reactor::notify_handle"); - // Check for removed handlers. - if (event_handler == 0) - return; - - int status = (event_handler->*ptmf) (handle); - - if (status < 0) - this->remove_handler_i (handle, mask); - else if (status > 0) - ready_mask.set_bit (handle); -} - -// Perform GET, CLR, SET, and ADD operations on the Handle_Sets. -// -// GET = 1, Retrieve current value -// SET = 2, Set value of bits to new mask (changes the entire mask) -// ADD = 3, Bitwise "or" the value into the mask (only changes -// enabled bits) -// CLR = 4 Bitwise "and" the negation of the value out of the mask -// (only changes enabled bits) -// -// Returns the original mask. Must be called with locks held. - -int -ACE_Select_Reactor::bit_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Select_Reactor_Handle_Set &handle_set, - int ops) -{ - ACE_TRACE ("ACE_Select_Reactor::bit_ops"); - if (this->handler_rep_.handle_in_range (handle) == 0) - return -1; - -#if !defined (ACE_WIN32) - ACE_Sig_Guard sb; // Block out all signals until method returns. -#endif /* ACE_WIN32 */ - - ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit; - u_long omask = ACE_Event_Handler::NULL_MASK; - - switch (ops) - { - case ACE_Reactor::GET_MASK: - if (handle_set.rd_mask_.is_set (handle)) - ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK); - if (handle_set.wr_mask_.is_set (handle)) - ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK); - if (handle_set.ex_mask_.is_set (handle)) - ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK); - break; - - case ACE_Reactor::CLR_MASK: - ptmf = &ACE_Handle_Set::clr_bit; - /* FALLTHRU */ - case ACE_Reactor::SET_MASK: - /* FALLTHRU */ - case ACE_Reactor::ADD_MASK: - - // The following code is rather subtle... Note that if we are - // doing a ACE_Reactor::SET_MASK then if the bit is not enabled - // in the mask we need to clear the bit from the ACE_Handle_Set. - // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or - // a ACE_Reactor::ADD_MASK we just carry out the operations - // specified by the mask. - - // READ, ACCEPT, and CONNECT flag will place the handle in the - // read set. - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) - || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK) - || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)) - { - (handle_set.rd_mask_.*ptmf) (handle); - ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK); - } - else if (ops == ACE_Reactor::SET_MASK) - handle_set.rd_mask_.clr_bit (handle); - - // WRITE and CONNECT flag will place the handle in the write set - if (ACE_BIT_ENABLED (mask, - ACE_Event_Handler::WRITE_MASK) - || ACE_BIT_ENABLED (mask, - ACE_Event_Handler::CONNECT_MASK)) - { - (handle_set.wr_mask_.*ptmf) (handle); - ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK); - } - else if (ops == ACE_Reactor::SET_MASK) - handle_set.wr_mask_.clr_bit (handle); - - // EXCEPT (and CONNECT on Win32) flag will place the handle in - // the except set. - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) -#if defined (ACE_WIN32) - || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK) -#endif /* ACE_WIN32 */ - ) - { - (handle_set.ex_mask_.*ptmf) (handle); - ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK); - } - else if (ops == ACE_Reactor::SET_MASK) - handle_set.ex_mask_.clr_bit (handle); - break; - default: - return -1; - } - return omask; -} - -// Perform GET, CLR, SET, and ADD operations on the select() -// Handle_Sets. -// -// GET = 1, Retrieve current value -// SET = 2, Set value of bits to new mask (changes the entire mask) -// ADD = 3, Bitwise "or" the value into the mask (only changes -// enabled bits) -// CLR = 4 Bitwise "and" the negation of the value out of the mask -// (only changes enabled bits) -// -// Returns the original mask. - -int -ACE_Select_Reactor::mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_Select_Reactor::mask_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - return this->bit_ops (handle, mask, - this->wait_set_, - ops); -} - -// Must be called with locks held. - -int -ACE_Select_Reactor::handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Event_Handler **handler) -{ - ACE_TRACE ("ACE_Select_Reactor::handler_i"); - ACE_Event_Handler *h = this->handler_rep_.find (handle); - - if (h == 0) - return -1; - else - { - if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) - || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) - && this->wait_set_.rd_mask_.is_set (handle) == 0) - return -1; - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK) - && this->wait_set_.wr_mask_.is_set (handle) == 0) - return -1; - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) - && this->wait_set_.ex_mask_.is_set (handle) == 0) - return -1; - } - - if (handler != 0) - *handler = h; - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor::resume_i (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor::resume"); - if (this->handler_rep_.find (handle) == 0) - return -1; - - if (this->suspend_set_.rd_mask_.is_set (handle)) - { - this->wait_set_.rd_mask_.set_bit (handle); - this->suspend_set_.rd_mask_.clr_bit (handle); - } - if (this->suspend_set_.wr_mask_.is_set (handle)) - { - this->wait_set_.wr_mask_.set_bit (handle); - this->suspend_set_.wr_mask_.clr_bit (handle); - } - if (this->suspend_set_.ex_mask_.is_set (handle)) - { - this->wait_set_.ex_mask_.set_bit (handle); - this->suspend_set_.ex_mask_.clr_bit (handle); - } - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor::suspend_i (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor::suspend"); - if (this->handler_rep_.find (handle) == 0) - return -1; - - if (this->wait_set_.rd_mask_.is_set (handle)) - { - this->suspend_set_.rd_mask_.set_bit (handle); - this->wait_set_.rd_mask_.clr_bit (handle); - } - if (this->wait_set_.wr_mask_.is_set (handle)) - { - this->suspend_set_.wr_mask_.set_bit (handle); - this->wait_set_.wr_mask_.clr_bit (handle); - } - if (this->wait_set_.ex_mask_.is_set (handle)) - { - this->suspend_set_.ex_mask_.set_bit (handle); - this->wait_set_.ex_mask_.clr_bit (handle); - } - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor::register_handler_i (ACE_HANDLE handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler_i"); - - // Insert the <handle, event_handle> tuple into the Handler - // Repository. - return this->handler_rep_.bind (handle, event_handler, mask); -} - -int -ACE_Select_Reactor::remove_handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler_i"); - - // Unbind this handle. - return this->handler_rep_.unbind (handle, mask); -} - -// Must be called with lock held. - -int -ACE_Select_Reactor::wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &dispatch_set, - ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_Select_Reactor::wait_for_multiple_events"); - u_long width = 0; - ACE_Time_Value timer_buf (0); - ACE_Time_Value *this_timeout = &timer_buf; - - int number_of_active_handles = this->any_ready (dispatch_set); - - // If there are any bits enabled in the <ready_set_> then we'll - // handle those first, otherwise we'll block in select(). - - if (number_of_active_handles == 0) - { - do - { - if (this->timer_queue_->calculate_timeout (max_wait_time, - this_timeout) == 0) - this_timeout = 0; - - width = (u_long) this->handler_rep_.max_handlep1 (); - - dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; - dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; - dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; - - number_of_active_handles = ACE_OS::select (int (width), - dispatch_set.rd_mask_, - dispatch_set.wr_mask_, - dispatch_set.ex_mask_, - this_timeout); - } - while (number_of_active_handles == -1 && this->handle_error () > 0); - - // @@ Remove?! - if (number_of_active_handles > 0) - { -#if !defined (ACE_WIN32) - dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ()); - dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ()); - dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ()); -#endif /* ACE_WIN32 */ - } - } - - // Return the number of events to dispatch. - return number_of_active_handles; -} - -int -ACE_Select_Reactor::dispatch_timer_handlers (void) -{ - int number_dispatched = this->timer_queue_->expire (); - return this->state_changed_ ? -1 : number_dispatched; -} - -int -ACE_Select_Reactor::dispatch_notification_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - // Check to see if the ACE_HANDLE associated with the - // Select_Reactor's notify hook is enabled. If so, it means that - // one or more other threads are trying to update the - // ACE_Select_Reactor's internal tables. We'll handle all these - // threads and then break out to continue the event loop. - - int number_dispatched = - this->notify_handler_->dispatch_notifications (number_of_active_handles, - dispatch_set.rd_mask_); - return this->state_changed_ ? -1 : number_dispatched; -#else - ACE_UNUSED_ARG (number_of_active_handles); - ACE_UNUSED_ARG (dispatch_set); - return 0; -#endif /* ACE_MT_SAFE */ -} - -int -ACE_Select_Reactor::dispatch_io_set (int number_of_active_handles, - int& number_dispatched, - int mask, - ACE_Handle_Set& dispatch_mask, - ACE_Handle_Set& ready_mask, - ACE_EH_PTMF callback) -{ - ACE_HANDLE handle; - - ACE_Handle_Set_Iterator handle_iter (dispatch_mask); - - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE - && number_dispatched < number_of_active_handles - && this->state_changed_ == 0) - { - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor::dispatching\n"))); - number_dispatched++; - this->notify_handle (handle, - mask, - ready_mask, - this->handler_rep_.find (handle), - callback); - } - - if (number_dispatched > 0 && this->state_changed_) - { - return -1; - } - - return 0; -} - -int -ACE_Select_Reactor::dispatch_io_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ - int number_dispatched = 0; - - // Handle output events (this code needs to come first to handle - // the obscure case of piggy-backed data coming along with the - // final handshake message of a nonblocking connection). - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor::dispatch - WRITE\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::WRITE_MASK, - dispatch_set.wr_mask_, - this->ready_set_.wr_mask_, - &ACE_Event_Handler::handle_output) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor::dispatch - EXCEPT\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::EXCEPT_MASK, - dispatch_set.ex_mask_, - this->ready_set_.ex_mask_, - &ACE_Event_Handler::handle_exception) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor::dispatch - READ\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::READ_MASK, - dispatch_set.rd_mask_, - this->ready_set_.rd_mask_, - &ACE_Event_Handler::handle_input) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - number_of_active_handles -= number_dispatched; - return number_dispatched; -} - -int -ACE_Select_Reactor::dispatch (int number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ - ACE_TRACE ("ACE_Select_Reactor::dispatch"); - - // The following do/while loop keeps dispatching as long as there - // are still active handles. Note that the only way we should ever - // iterate more than once through this loop is if signals occur - // while we're dispatching other handlers. - - do - { - // Note that we keep track of changes to our state. If any of - // the dispatch_*() methods below return -1 it means that the - // <wait_set_> state has changed as the result of an - // <ACE_Event_Handler> being dispatched. This means that we - // need to bail out and rerun the select() loop since our - // existing notion of handles in <dispatch_set> may no longer be - // correct. - // - // In the beginning, our state starts out unchanged. After - // every iteration (i.e., due to signals), our state starts out - // unchanged again. - - this->state_changed_ = 0; - - // Perform the Template Method for dispatching all the handlers. - - // Handle timers first since they may have higher latency - // constraints. - - if (this->dispatch_timer_handlers () == -1) - // State has changed or timer queue has failed, exit inner - // loop. - break; - - else if (number_of_active_handles <= 0) - // Bail out since we got here since select() was interrupted. - { - if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); - - // If any HANDLES in the <ready_set_> are activated as a - // result of signals they should be dispatched since - // they may be time critical... - number_of_active_handles = this->any_ready (dispatch_set); - } - else - return number_of_active_handles; - } - - // Next dispatch the notification handlers (if there are any to - // dispatch). These are required to handle multi-threads that - // are trying to update the <Reactor>. - - else if (this->dispatch_notification_handlers (number_of_active_handles, - dispatch_set) == -1) - break; // State has changed, exit inner loop. - - // Finally, dispatch the I/O handlers. - else if (this->dispatch_io_handlers (number_of_active_handles, - dispatch_set) == -1) - // State has changed, so exit the inner loop. - break; - } - while (number_of_active_handles > 0); - - return 1; -} - -int -ACE_Select_Reactor::release_token (void) -{ -#if defined (ACE_WIN32) - this->token_.release (); - return (int) EXCEPTION_CONTINUE_SEARCH; -#else - return 0; -#endif /* ACE_WIN32 */ -} - -int -ACE_Select_Reactor::handle_events (ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_Select_Reactor::handle_events"); - - // Stash the current time -- the destructor of this object will - // automatically compute how much time elpased since this method was - // called. - ACE_Countdown_Time countdown (max_wait_time); - -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1); - - if (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_) == 0) - return -1; - - // Update the countdown to reflect time waiting for the mutex. - countdown.update (); -#endif /* ACE_MT_SAFE */ - - return this->handle_events_i (max_wait_time); -} - -int -ACE_Select_Reactor::handle_events_i (ACE_Time_Value *max_wait_time) -{ - int result = -1; - - ACE_SEH_TRY { - ACE_Select_Reactor_Handle_Set dispatch_set; - - int number_of_active_handles = - this->wait_for_multiple_events (dispatch_set, - max_wait_time); - - result = this->dispatch (number_of_active_handles, dispatch_set); - } - ACE_SEH_EXCEPT (this->release_token ()) { - // As it stands now, we catch and then rethrow all Win32 - // structured exceptions so that we can make sure to release the - // <token_> lock correctly. - } - - this->state_changed_ = 1; - - return result; -} - -int -ACE_Select_Reactor::check_handles (void) -{ - ACE_TRACE ("ACE_Select_Reactor::check_handles"); - -#if defined (ACE_WIN32) || defined (__MVS__) - ACE_Time_Value time_poll = ACE_Time_Value::zero; - ACE_Handle_Set rd_mask; -#endif /* ACE_WIN32 || MVS */ - - ACE_Event_Handler *eh = 0; - int result = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); - iter.next (eh) != 0; - iter.advance ()) - { - ACE_HANDLE handle = eh->get_handle (); - - // Skip back to the beginning of the loop if the HANDLE is - // invalid. - if (handle == ACE_INVALID_HANDLE) - continue; - -#if defined (ACE_WIN32) || defined (__MVS__) - // Win32 needs to do the check this way because fstat won't work on - // a socket handle. MVS Open Edition needs to do it this way because, - // even though the docs say to check a handle with either select or - // fstat, the fstat method always says the handle is ok. - rd_mask.set_bit (handle); - - if (ACE_OS::select (int (handle) + 1, - rd_mask, 0, 0, - &time_poll) < 0) - { - result = 1; - this->remove_handler_i (handle, - ACE_Event_Handler::ALL_EVENTS_MASK); - } - rd_mask.clr_bit (handle); -#else /* !ACE_WIN32 && !MVS */ - struct stat temp; - - if (ACE_OS::fstat (handle, &temp) == -1) - { - result = 1; - this->remove_handler_i (handle, - ACE_Event_Handler::ALL_EVENTS_MASK); - } -#endif /* ACE_WIN32 || MVS */ - } - - return result; -} - -void -ACE_Select_Reactor::dump (void) const -{ - ACE_TRACE ("ACE_Select_Reactor::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - - this->timer_queue_->dump (); - this->handler_rep_.dump (); - this->signal_handler_->dump (); - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("delete_signal_handler_ = %d\n"), - this->delete_signal_handler_)); - - ACE_HANDLE h; - - for (ACE_Handle_Set_Iterator handle_iter_wr (this->wait_set_.wr_mask_); - (h = handle_iter_wr ()) != ACE_INVALID_HANDLE; - ++handle_iter_wr) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("write_handle = %d\n"), h)); - - for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); - (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; - ++handle_iter_rd) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("read_handle = %d\n"), h)); - - for (ACE_Handle_Set_Iterator handle_iter_ex (this->wait_set_.ex_mask_); - (h = handle_iter_ex ()) != ACE_INVALID_HANDLE; - ++handle_iter_ex) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("except_handle = %d\n"), h)); - - for (ACE_Handle_Set_Iterator handle_iter_wr_ready (this->ready_set_.wr_mask_); - (h = handle_iter_wr_ready ()) != ACE_INVALID_HANDLE; - ++handle_iter_wr_ready) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("write_handle_ready = %d\n"), h)); - - for (ACE_Handle_Set_Iterator handle_iter_rd_ready (this->ready_set_.rd_mask_); - (h = handle_iter_rd_ready ()) != ACE_INVALID_HANDLE; - ++handle_iter_rd_ready) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("read_handle_ready = %d\n"), h)); - - for (ACE_Handle_Set_Iterator handle_iter_ex_ready (this->ready_set_.ex_mask_); - (h = handle_iter_ex_ready ()) != ACE_INVALID_HANDLE; - ++handle_iter_ex_ready) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("except_handle_ready = %d\n"), h)); - - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("restart_ = %d\n"), this->restart_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nrequeue_position_ = %d\n"), this->requeue_position_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\ninitialized_ = %d\n"), this->initialized_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nowner_ = %d\n"), this->owner_)); - -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - this->notify_handler_->dump (); - this->token_.dump (); -#endif /* ACE_MT_SAFE */ - - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -ACE_MT (template class ACE_Guard<ACE_SELECT_REACTOR_MUTEX>); +# if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) +# if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) +template class ACE_Select_Reactor_Token_T<ACE_Local_Mutex>; +template class ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> >; +template class ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> >; +template class ACE_Guard< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> >; +# else +template class ACE_Select_Reactor_Token_T<ACE_Token>; +template class ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Token> >; +template class ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Token> >; +template class ACE_Guard< ACE_Select_Reactor_Token_T<ACE_Token> >; +# endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ +# else +template class ACE_Select_Reactor_Token_T<ACE_Noop_Token>; +template class ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Noop_Token> >; +template class ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Noop_Token> >; +# endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */ template class ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX>; -template class ACE_Lock_Adapter<ACE_Select_Reactor_Token>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) -# pragma instantiate ACE_Guard<ACE_SELECT_REACTOR_MUTEX> -#endif /* ACE_MT_SAFE */ +# if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) +# if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) +# pragma instantiate ACE_Select_Reactor_Token_T<ACE_Local_Mutex> +# pragma instantiate ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> > +# pragma instantiate ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> > +# pragma instantiate ACE_Guard< ACE_Select_Reactor_Token_T<ACE_Local_Mutex> > +# else +# pragma instantiate ACE_Select_Reactor_Token_T<ACE_Token> +# pragma instantiate ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Token> > +# pragma instantiate ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Token> > +# pragma instantiate ACE_Guard< ACE_Select_Reactor_Token_T<ACE_Token> > +# endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ +# else +# pragma instantiate ACE_Select_Reactor_Token_T<ACE_Noop_Token> +# pragma instantiate ACE_Select_Reactor_T< ACE_Select_Reactor_Token_T<ACE_Noop_Token> > +# pragma instantiate ACE_Lock_Adapter< ACE_Select_Reactor_Token_T<ACE_Noop_Token> > +# endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */ #pragma instantiate ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX> -#pragma instantiate ACE_Lock_Adapter<ACE_Select_Reactor_Token> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Select_Reactor.h b/ace/Select_Reactor.h index a43c64c60b5..bc78ad28b15 100644 --- a/ace/Select_Reactor.h +++ b/ace/Select_Reactor.h @@ -17,951 +17,20 @@ #if !defined (ACE_SELECT_REACTOR_H) #define ACE_SELECT_REACTOR_H -#include "ace/Signal.h" -#include "ace/Timer_Queue.h" -#include "ace/Event_Handler.h" -#include "ace/Handle_Set.h" -#include "ace/Token.h" -#include "ace/Pipe.h" -#include "ace/Reactor_Impl.h" - -// Add useful typedefs to simplify the following code. -typedef void (ACE_Handle_Set::*ACE_FDS_PTMF) (ACE_HANDLE); -typedef int (ACE_Event_Handler::*ACE_EH_PTMF) (ACE_HANDLE); - -// Forward declaration. -class ACE_Select_Reactor; - -class ACE_Export ACE_Select_Reactor_Handle_Set -{ - // = TITLE - // Track handles we are interested for various events. -public: - ACE_Handle_Set rd_mask_; - // Read events (e.g., input pending, accept pending). - - ACE_Handle_Set wr_mask_; - // Write events (e.g., flow control abated, non-blocking connection - // complete). - - ACE_Handle_Set ex_mask_; - // Exception events (e.g., SIG_URG). -}; +#include "ace/Select_Reactor_T.h" #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - #if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) #include "ace/Local_Tokens.h" -typedef ACE_Local_Mutex ACE_SELECT_REACTOR_MUTEX; +typedef ACE_Select_Reactor_Token_T<ACE_Local_Mutex> ACE_Select_Reactor_Token; #else -typedef ACE_Token ACE_SELECT_REACTOR_MUTEX; +typedef ACE_Select_Reactor_Token_T<ACE_Token> ACE_Select_Reactor_Token; #endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ - -class ACE_Export ACE_Select_Reactor_Token : public ACE_SELECT_REACTOR_MUTEX -{ - // = TITLE - // Used as a synchronization mechanism to coordinate concurrent - // access to a Select_Reactor object. - // - // = DESCRIPTION - // This class is used to make the <ACE_Select_Reactor> - // thread-safe. By default, the thread that runs the - // <handle_events> loop holds the token, even when it is blocked - // in the <select> call. Whenever another thread wants to - // access the <ACE_Reactor> via its <register_handler>, - // <remove_handler>, etc. methods) it must ask the token owner - // for temporary release of the token. To accomplish this, the - // owner of a token must define a <sleep_hook> through which it - // can be notified to temporarily release the token if the - // current situation permits this. - // - // The owner of the token is responsible for deciding which - // request for the token can be granted. By using the - // <ACE_Token::renew> API, the thread that releases the token - // temporarily can specify to get the token back right after the - // other thread has completed using the token. Thus, there is a - // dedicated thread that owns the token ``by default.'' This - // thread grants other threads access to the token by ensuring - // that whenever somebody else has finished using the token the - // ``default owner'' first holds the token again, i.e., the - // owner has the chance to schedule other threads. - // - // The thread that most likely needs the token most of the time - // is the thread running the dispatch loop. Typically the token - // gets released prior to entering the <select> call and gets - // ``re-acquired'' as soon as the <select> call returns, which - // results probably in many calls to <release>/<acquire> that - // are not really needed since no other thread would need the - // token in the meantime. That's why the dispatcher thread is - // chosen to be the owner of the token. - // - // In case the token would have been released while in <select> - // there would be a good chance that the <fd_set> could have - // been modified while the <select> returns from blocking and - // trying to re-acquire the lock. Through the token mechanism - // it is ensured that while another thread is holding the token, - // the dispatcher thread is blocked in the <renew> call and not - // in <select>. Thus, it is not critical to change the - // <fd_set>. The implementation of the <sleep_hook> mechanism - // provided by the <ACE_Select_Reactor_Token> enables the - // default owner to be the thread that executes the dispatch - // loop. -public: - ACE_Select_Reactor_Token (ACE_Select_Reactor &r); - ACE_Select_Reactor_Token (void); - virtual ~ACE_Select_Reactor_Token (void); - - virtual void sleep_hook (void); - // Called just before the ACE_Event_Handler goes to sleep. - - ACE_Select_Reactor &select_reactor (void); - void select_reactor (ACE_Select_Reactor &); - // Set/Get methods - - virtual void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - ACE_Select_Reactor *select_reactor_; -}; #else -// If we're non-MT safe then this is just a no-op... -typedef ACE_Null_Mutex ACE_Select_Reactor_Token; -#endif /* ACE_MT_SAFE */ - -class ACE_Export ACE_Event_Tuple -{ - // = TITLE - // An ACE_Event_Handler and its associated ACE_HANDLE. - // - // = DESCRIPTION - // One ACE_Event_Handler is registered for one or more - // ACE_HANDLE, in some points this information must be stored - // explicitly. This structure provides a lightweight mechanism - // to do so. -public: - ACE_Event_Tuple (void); - ACE_Event_Tuple (ACE_Event_Handler* eh, ACE_HANDLE h); - ~ACE_Event_Tuple (void); - - int operator== (const ACE_Event_Tuple &rhs) const; - // Equality operator. - - int operator!= (const ACE_Event_Tuple &rhs) const; - // Inequality operator. - - ACE_HANDLE handle_; - ACE_Event_Handler* event_handler_; -}; - -class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify -{ - // = TITLE - // Unblock the <ACE_Select_Reactor> from its event loop. - // - // = DESCRIPTION - // This implementation is necessary for cases where the - // <ACE_Select_Reactor> is run in a multi-threaded program. In - // this case, we need to be able to unblock select() or poll() - // when updates occur other than in the main - // <ACE_Select_Reactor> thread. To do this, we signal an - // auto-reset event the <ACE_Select_Reactor> is listening on. - // If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is - // passed to <notify>, the appropriate <handle_*> method is - // dispatched in the context of the <ACE_Select_Reactor> thread. -public: - ACE_Select_Reactor_Notify (void); - ~ACE_Select_Reactor_Notify (void); - // Default dtor. - - // = Initialization and termination methods. - virtual int open (ACE_Reactor_Impl *, - ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0); - // Initialize. - - virtual int close (void); - // Destroy. - - virtual ssize_t notify (ACE_Event_Handler * = 0, - ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, - ACE_Time_Value * = 0); - // Called by a thread when it wants to unblock the - // <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if - // currently blocked in select()/poll(). Pass over both the - // <Event_Handler> *and* the <mask> to allow the caller to dictate - // which <Event_Handler> method the <ACE_Select_Reactor> will - // invoke. The <ACE_Time_Value> indicates how long to blocking - // trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, - // the caller will block until action is possible, else will wait - // until the relative time specified in *<timeout> elapses). - - virtual int dispatch_notifications (int &number_of_active_handles, - const ACE_Handle_Set &rd_mask); - // Handles pending threads (if any) that are waiting to unblock the - // <ACE_Select_Reactor>. - - virtual int handle_input (ACE_HANDLE handle); - // Called back by the <ACE_Select_Reactor> when a thread wants to - // unblock us. - - virtual void max_notify_iterations (int); - // Set the maximum number of times that the - // <ACE_Select_Reactor_Notify::handle_input> method will iterate and - // dispatch the <ACE_Event_Handlers> that are passed in via the - // notify pipe before breaking out of its <recv> loop. By default, - // this is set to -1, which means "iterate until the pipe is empty." - // Setting this to a value like "1 or 2" will increase "fairness" - // (and thus prevent starvation) at the expense of slightly higher - // dispatching overhead. - - virtual int max_notify_iterations (void); - // Get the maximum number of times that the - // <ACE_Select_Reactor_Notify::handle_input> method will iterate and - // dispatch the <ACE_Event_Handlers> that are passed in via the - // notify pipe before breaking out of its <recv> loop. - - virtual void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - ACE_Select_Reactor *select_reactor_; - // Keep a back pointer to the <ACE_Select_Reactor>. If this value - // if NULL then the <ACE_Select_Reactor> has been initialized with - // <disable_notify_pipe>. - - ACE_Pipe notification_pipe_; - // Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening - // on, as well as the <ACE_HANDLE> that threads wanting the - // attention of the <ACE_Select_Reactor> will write to. - - int max_notify_iterations_; - // Keeps track of the maximum number of times that the - // <ACE_Select_Reactor_Notify::handle_input> method will iterate and - // dispatch the <ACE_Event_Handlers> that are passed in via the - // notify pipe before breaking out of its <recv> loop. By default, - // this is set to -1, which means "iterate until the pipe is empty." -}; - -class ACE_Export ACE_Select_Reactor_Handler_Repository -{ - // = TITLE - // Used to map <ACE_HANDLE>s onto the appropriate - // <ACE_Event_Handler> *. - // - // = DESCRIPTION - // This class is necessary to shield differences between UNIX - // and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32 - // it's a void *. This class hides all these details from the - // bulk of the <ACE_Select_Reactor> code. All of these methods - // are called with the main <Select_Reactor> token lock held. -public: - friend class ACE_Select_Reactor_Handler_Repository_Iterator; - - // = Initialization and termination methods. - ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor &); - // Default "do-nothing" constructor. - - ~ACE_Select_Reactor_Handler_Repository (void); - // dtor. - - int open (size_t size); - // Initialize a repository of the appropriate <size>. - - int close (void); - // Close down the repository. - - // = Search structure operations. - - ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0); - // Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>. - // If <index_p> is non-0, then return the index location of the - // <handle>, if found. - - int bind (ACE_HANDLE, - ACE_Event_Handler *, - ACE_Reactor_Mask); - // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the - // appropriate <ACE_Reactor_Mask> settings. - - int unbind (ACE_HANDLE, - ACE_Reactor_Mask mask); - // Remove the binding of <ACE_HANDLE> in accordance with the <mask>. - - int unbind_all (void); - // Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples. - - // = Sanity checking. - - // Check the <handle> to make sure it's a valid ACE_HANDLE that - // within the range of legal handles (i.e., >= 0 && < max_size_). - int invalid_handle (ACE_HANDLE handle); - - // Check the <handle> to make sure it's a valid ACE_HANDLE that - // within the range of currently registered handles (i.e., >= 0 && < - // max_handlep1_). - int handle_in_range (ACE_HANDLE handle); - - // = Accessors. - size_t size (void); - // Returns the current table size. - - size_t max_handlep1 (void); - // Maximum ACE_HANDLE value, plus 1. - - void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - ACE_Select_Reactor &select_reactor_; - // Reference to our <Select_Reactor>. - - ssize_t max_size_; - // Maximum number of handles. - - int max_handlep1_; - // The highest currently active handle, plus 1 (ranges between 0 and - // <max_size_>. - -#if defined (ACE_WIN32) - // = The mapping from <HANDLES> to <Event_Handlers>. - - ACE_Event_Tuple *event_handlers_; - // The NT version implements this via a dynamically allocated - // array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE - // as a void * we can't directly index into this array. Therefore, - // we just do a linear search (for now). Next, we'll modify - // things to use hashing or something faster... -#else - ACE_Event_Handler **event_handlers_; - // The UNIX version implements this via a dynamically allocated - // array of <ACE_Event_Handler *> that is indexed directly using - // the ACE_HANDLE value. -#endif /* ACE_WIN32 */ -}; - -class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator -{ - // = TITLE - // Iterate through the <ACE_Select_Reactor_Handler_Repository>. -public: - // = Initialization method. - ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s); - - ~ACE_Select_Reactor_Handler_Repository_Iterator (void); - // dtor. - - // = Iteration methods. - - int next (ACE_Event_Handler *&next_item); - // Pass back the <next_item> that hasn't been seen in the Set. - // Returns 0 when all items have been seen, else 1. - - int done (void) const; - // Returns 1 when all items have been seen, else 0. - - int advance (void); - // Move forward by one element in the set. Returns 0 when all the - // items in the set have been seen, else 1. - - void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - const ACE_Select_Reactor_Handler_Repository *rep_; - // Reference to the Handler_Repository we are iterating over. - - ssize_t current_; - // Pointer to the current iteration level. -}; - -class ACE_Export ACE_Select_Reactor : public ACE_Reactor_Impl -{ - // = TITLE - // An object oriented event demultiplexor and event handler - // dispatcher. - // - // = DESCRIPTION - // The ACE_Select_Reactor is an object-oriented event - // demultiplexor and event handler dispatcher. The sources of - // events that the ACE_Select_Reactor waits for and dispatches - // includes I/O events, signals, and timer events. All public - // methods acquire the main <Select_Reactor> token lock and call - // down to private or protected methods, which assume that the - // lock is held and so therefore don't (re)acquire the lock. -public: - enum - { - DEFAULT_SIZE = ACE_DEFAULT_SELECT_REACTOR_SIZE - // Default size of the Select_Reactor's handle table. - }; - - // = Initialization and termination methods. - - ACE_Select_Reactor (ACE_Sig_Handler * = 0, - ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0, - ACE_Reactor_Notify *notify = 0); - // Initialize <ACE_Select_Reactor> with the default size. - - ACE_Select_Reactor (size_t size, - int restart = 0, - ACE_Sig_Handler * = 0, - ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0, - ACE_Reactor_Notify *notify = 0); - // Initialize <ACE_Select_Reactor> with size <size>. - - virtual int open (size_t size = DEFAULT_SIZE, - int restart = 0, - ACE_Sig_Handler * = 0, - ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0, - ACE_Reactor_Notify * = 0); - // Initialize <ACE_Select_Reactor> with size <size>. - - virtual int current_info (ACE_HANDLE, size_t & /* size */); - // Returns -1 (not used in this implementation); - - virtual int set_sig_handler (ACE_Sig_Handler *signal_handler); - // Use a user specified signal handler instead. - - virtual int set_timer_queue (ACE_Timer_Queue *timer_queue); - // Use a user specified timer queue instead. - - virtual int close (void); - // Close down the select_reactor and release all of its resources. - - virtual ~ACE_Select_Reactor (void); - // Close down the select_reactor and release all of its resources. - - // = Event loop drivers. - - virtual int handle_events (ACE_Time_Value *max_wait_time = 0); - virtual int alertable_handle_events (ACE_Time_Value *max_wait_time = 0); - // This event loop driver that blocks for <max_wait_time> before - // returning. It will return earlier if timer events, I/O events, - // or signal events occur. Note that <max_wait_time> can be 0, in - // which case this method blocks indefinitely until events occur. - // - // <max_wait_time> is decremented to reflect how much time this call - // took. For instance, if a time value of 3 seconds is passed to - // handle_events and an event occurs after 2 seconds, - // <max_wait_time> will equal 1 second. This can be used if an - // application wishes to handle events for some fixed amount of - // time. - // - // Returns the total number of <ACE_Event_Handler>s that were - // dispatched, 0 if the <max_wait_time> elapsed without dispatching - // any handlers, or -1 if something goes wrong. - // - // Current <alertable_handle_events> is identical to - // <handle_events>. - - virtual int handle_events (ACE_Time_Value &max_wait_time); - virtual int alertable_handle_events (ACE_Time_Value &max_wait_time); - // This method is just like the one above, except the - // <max_wait_time> value is a reference and can therefore never be - // NULL. - // - // Current <alertable_handle_events> is identical to - // <handle_events>. - - // = Register and remove <ACE_Event_Handler>s. - virtual int register_handler (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // Register a <eh> with a particular <mask>. Note that the - // <Select_Reactor> will call eh->get_handle() to extract the - // underlying I/O handle. - - virtual int register_handler (ACE_HANDLE handle, - ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // Register a <eh> with a particular <mask>. Note that since the - // <handle> is given the Select_Reactor will *not* call - // eh->get_handle() to extract the underlying I/O handle. - -#if defined (ACE_WIN32) - - // Originally this interface was available for all platforms, but - // because ACE_HANDLE is an int on non-Win32 platforms, compilers - // are not able to tell the difference between - // register_handler(ACE_Event_Handler*,ACE_Reactor_Mask) and - // register_handler(ACE_Event_Handler*,ACE_HANDLE). Therefore, we - // have restricted this method to Win32 only. - - virtual int register_handler (ACE_Event_Handler *event_handler, - ACE_HANDLE event_handle = ACE_INVALID_HANDLE); - // Not implemented. - -#endif /* ACE_WIN32 */ - - virtual int register_handler (ACE_HANDLE event_handle, - ACE_HANDLE io_handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask); - // Not implemented. - - virtual int register_handler (const ACE_Handle_Set &handles, - ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // Register <eh> with all the <handles> in the <Handle_Set>. - - virtual int register_handler (int signum, - ACE_Event_Handler *new_sh, - ACE_Sig_Action *new_disp = 0, - ACE_Event_Handler **old_sh = 0, - ACE_Sig_Action *old_disp = 0); - // Register <new_sh> to handle the signal <signum> using the - // <new_disp>. Returns the <old_sh> that was previously registered - // (if any), along with the <old_disp> of the signal handler. - - virtual int register_handler (const ACE_Sig_Set &sigset, - ACE_Event_Handler *new_sh, - ACE_Sig_Action *new_disp = 0); - // Registers <new_sh> to handle a set of signals <sigset> using the - // <new_disp>. - - virtual int remove_handler (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // Removes the <mask> binding of <eh> from the Select_Reactor. If - // there are no more bindings for this <eh> then it is removed from - // the Select_Reactor. Note that the Select_Reactor will call - // eh->get_handle() to extract the underlying I/O handle. - - virtual int remove_handler (ACE_HANDLE handle, - ACE_Reactor_Mask); - // Removes the <mask> bind of <Event_Handler> whose handle is - // <handle> from the Select_Reactor. If there are no more bindings - // for this <eh> then it is removed from the Select_Reactor. - - virtual int remove_handler (const ACE_Handle_Set &handle_set, - ACE_Reactor_Mask); - // Removes all the <mask> bindings for handles in the <handle_set> - // bind of <Event_Handler>. If there are no more bindings for any - // of these handlers then they are removed from the Select_Reactor. - - virtual int remove_handler (int signum, - ACE_Sig_Action *new_disp, - ACE_Sig_Action *old_disp = 0, - int sigkey = -1); - // Remove the ACE_Event_Handler currently associated with <signum>. - // <sigkey> is ignored in this implementation since there is only - // one instance of a signal handler. Install the new disposition - // (if given) and return the previous disposition (if desired by the - // caller). Returns 0 on success and -1 if <signum> is invalid. - - virtual int remove_handler (const ACE_Sig_Set &sigset); - // Calls <remove_handler> for every signal in <sigset>. - - // = Suspend and resume Handlers. - - virtual int suspend_handler (ACE_Event_Handler *eh); - // Temporarily suspend the <Event_Handler> associated with <eh>. - - virtual int suspend_handler (ACE_HANDLE handle); - // Temporarily suspend the <Event_Handler> associated with <handle>. - - virtual int suspend_handler (const ACE_Handle_Set &handles); - // Suspend all <handles> in handle set temporarily. - - virtual int suspend_handlers (void); - // Suspend all the <Event_Handlers> in the Select_Reactor. - - virtual int resume_handler (ACE_Event_Handler *eh); - // Resume a temporarily suspend <Event_Handler> associated with - // <eh>. - - virtual int resume_handler (ACE_HANDLE handle); - // Resume a temporarily suspended <Event_Handler> associated with - // <handle>. - - virtual int resume_handler (const ACE_Handle_Set &handles); - // Resume all <handles> in handle set. - - virtual int resume_handlers (void); - // Resume all the <Event_Handlers> in the Select_Reactor. - - virtual int uses_event_associations (void); - // Return 1 if we any event associations were made by the reactor - // for the handles that it waits on, 0 otherwise. Since the - // Select_Reactor does not do any event associations, this function - // always return 0. - - // = Timer management. - virtual long schedule_timer (ACE_Event_Handler *, - const void *arg, - const ACE_Time_Value &delta_time, - const ACE_Time_Value &interval = ACE_Time_Value::zero); - // Schedule an <event_handler> that will expire after <delta_time> - // amount of time. If it expires then <arg> is passed in as the - // value to the <event_handler>'s <handle_timeout> callback method. - // If <interval> is != to <ACE_Time_Value::zero> then it is used to - // reschedule the <event_handler> automatically. This method - // returns a <timer_id> that uniquely identifies the <event_handler> - // in an internal list. This <timer_id> can be used to cancel an - // <event_handler> before it expires. The cancellation ensures that - // <timer_ids> are unique up to values of greater than 2 billion - // timers. As long as timers don't stay around longer than this - // there should be no problems with accidentally deleting the wrong - // timer. Returns -1 on failure (which is guaranteed never to be a - // valid <timer_id>. - - virtual int cancel_timer (ACE_Event_Handler *event_handler, - int dont_call_handle_close = 1); - // Cancel all <event_handlers> that match the address of - // <event_handler>. If <dont_call_handle_close> is 0 then the - // <handle_close> method of <event_handler> will be invoked. - // Returns number of handler's cancelled. - - virtual int cancel_timer (long timer_id, - const void **arg = 0, - int dont_call_handle_close = 1); - // Cancel the single <ACE_Event_Handler> that matches the <timer_id> - // value (which was returned from the <schedule> method). If arg is - // non-NULL then it will be set to point to the ``magic cookie'' - // argument passed in when the <Event_Handler> was registered. This - // makes it possible to free up the memory and avoid memory leaks. - // If <dont_call_handle_close> is 0 then the <handle_close> method - // of <event_handler> will be invoked. Returns 1 if cancellation - // succeeded and 0 if the <timer_id> wasn't found. - - // = High-level Event_Handler scheduling operations - - virtual int schedule_wakeup (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // ADD the dispatch MASK "bit" bound with the <eh> and the <mask>. - - virtual int schedule_wakeup (ACE_HANDLE handle, - ACE_Reactor_Mask mask); - // ADD the dispatch MASK "bit" bound with the <handle> and the <mask>. - - virtual int cancel_wakeup (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // CLR the dispatch MASK "bit" bound with the <eh> and the <mask>. - - virtual int cancel_wakeup (ACE_HANDLE handle, - ACE_Reactor_Mask mask); - // CLR the dispatch MASK "bit" bound with the <handle> and the <mask>. - - // = Notification methods. - virtual int notify (ACE_Event_Handler * = 0, - ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, - ACE_Time_Value * = 0); - // Called by a thread when it wants to unblock the Select_Reactor. - // This wakeups the <ACE_Select_Reactor> if currently blocked in - // select()/poll(). Pass over both the <Event_Handler> *and* the - // <mask> to allow the caller to dictate which <Event_Handler> - // method the <Select_Reactor> will invoke. The <ACE_Time_Value> - // indicates how long to blocking trying to notify the - // <Select_Reactor>. If <timeout> == 0, the caller will block until - // action is possible, else will wait until the relative time - // specified in *<timeout> elapses). - - virtual void max_notify_iterations (int); - // Set the maximum number of times that the - // <ACE_Select_Reactor_Notify::handle_input> method will iterate and - // dispatch the <ACE_Event_Handlers> that are passed in via the - // notify pipe before breaking out of its <recv> loop. By default, - // this is set to -1, which means "iterate until the pipe is empty." - // Setting this to a value like "1 or 2" will increase "fairness" - // (and thus prevent starvation) at the expense of slightly higher - // dispatching overhead. - - virtual int max_notify_iterations (void); - // Get the maximum number of times that the - // <ACE_Select_Reactor_Notify::handle_input> method will iterate and - // dispatch the <ACE_Event_Handlers> that are passed in via the - // notify pipe before breaking out of its <recv> loop. - - virtual void requeue_position (int); - // Set position that the main ACE_Select_Reactor thread is requeued in the - // list of waiters during a notify() callback. - - virtual int requeue_position (void); - // Get position that the main ACE_Select_Reactor thread is requeued in the - // list of waiters during a notify() callback. - - // = Low-level wait_set mask manipulation methods. - virtual int mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops); - // GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and - // <mask>. - - virtual int mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops); - // GET/SET/ADD/CLR the dispatch MASK "bit" bound with the <handle> - // and <mask>. - - // = Low-level ready_set mask manipulation methods. - virtual int ready_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops); - // GET/SET/ADD/CLR the ready "bit" bound with the <eh> and <mask>. - - virtual int ready_ops (ACE_HANDLE handle, - ACE_Reactor_Mask, - int ops); - // GET/SET/ADD/CLR the ready "bit" bound with the <handle> and <mask>. - - virtual void wakeup_all_threads (void); - // Wake up all threads in waiting in the event loop - - // = Only the owner thread that can perform a <handle_events>. - - virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); - // Set the new owner of the thread and return the old owner. - - virtual int owner (ACE_thread_t *); - // Return the current owner of the thread. - - // = Miscellaneous Handler operations. - virtual int handler (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Event_Handler **eh = 0); - // Check to see if <handle> is associated with a valid Event_Handler - // bound to <mask>. Return the <eh> associated with this <handler> - // if <eh> != 0. - - virtual int handler (int signum, - ACE_Event_Handler ** = 0); - // Check to see if <signum> is associated with a valid Event_Handler - // bound to a signal. Return the <eh> associated with this - // <handler> if <eh> != 0. - - virtual int initialized (void); - // Returns true if we've been successfully initialized, else false. - - virtual size_t size (void); - // Returns the current size of the Reactor's internal descriptor - // table. - - virtual ACE_Lock &lock (void); - // Returns a reference to the <ACE_Select_Reactor_Token> that is - // used to serialize the internal Select_Reactor's processing logic. - // This can be useful for situations where you need to avoid - // deadlock efficiently when <ACE_Event_Handlers> are used in - // multiple threads. - - virtual void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -protected: - // = Internal methods that do the actual work. - - // All of these methods assume that the <Select_Reactor>'s token - // lock is held by the public methods that call down to them. - - virtual int register_handler_i (ACE_HANDLE handle, - ACE_Event_Handler *eh, - ACE_Reactor_Mask mask); - // Do the work of actually binding the <handle> and <eh> with the - // <mask>. - - virtual int register_handler_i (const ACE_Handle_Set &handles, - ACE_Event_Handler *handler, - ACE_Reactor_Mask mask); - // Register a set of <handles>. - - virtual int remove_handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask); - // Do the work of actually unbinding the <handle> and <eh> with the - // <mask>. - - virtual int remove_handler_i (const ACE_Handle_Set &handles, - ACE_Reactor_Mask); - // Remove a set of <handles>. - - virtual int suspend_i (ACE_HANDLE handle); - // Suspend the <Event_Handler> associated with <handle> - - virtual int resume_i (ACE_HANDLE handle); - // Resume the <Event_Handler> associated with <handle> - - virtual int handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask, - ACE_Event_Handler ** = 0); - // Implement the public <handler> method. - - virtual int handler_i (int signum, ACE_Event_Handler ** = 0); - // Implement the public <handler> method. - - virtual int any_ready (ACE_Select_Reactor_Handle_Set &handle_set); - // Check if there are any HANDLEs enabled in the <ready_set_>, and - // if so, update the <handle_set> and return the number ready. If - // there aren't any HANDLEs enabled return 0. - - virtual int handle_error (void); - // Take corrective action when errors occur. - - virtual int check_handles (void); - // Make sure the handles are all valid. - - virtual int bit_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Select_Reactor_Handle_Set &wait_Set, - int ops); - // Allow manipulation of the <wait_set_> mask and <ready_set_> mask. - - virtual int wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &, - ACE_Time_Value *); - // Wait for events to occur. - - // = Dispatching methods. - - virtual int dispatch (int nfound, - ACE_Select_Reactor_Handle_Set &); - // Template Method that dispatches <ACE_Event_Handler>s for time - // events, I/O events, and signal events. Returns the total number - // of <ACE_Event_Handler>s that were dispatched or -1 if something - // goes wrong. - - virtual int dispatch_timer_handlers (void); - // Dispatch any expired timer handlers. Returns -1 if the state of - // the <wait_set_> has changed, else returns number of timer - // handlers dispatched. - - virtual int dispatch_notification_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set); - // Dispatch any notification handlers. Returns -1 if the state of - // the <wait_set_> has changed, else returns number of handlers - // notified. - - virtual int dispatch_io_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set); - // Dispatch all the input/output/except handlers that are enabled in - // the <dispatch_set>. Returns -1 if the state of the <wait_set_> - // has changed, else returns number of handlers dispatched. - - virtual int dispatch_io_set (int number_of_active_handles, - int& number_dispatched, - int mask, - ACE_Handle_Set& dispatch_mask, - ACE_Handle_Set& ready_mask, - ACE_EH_PTMF callback); - // Factors the dispatching of an io handle set (each WRITE, EXCEPT - // or READ set of handles). - // It updates the number of handles already dispatched and - // invokes this->notify_handle for all the handles in <dispatch_set> - // using the <mask>, <ready_set> and <callback> parameters. - // Must return -1 if this->state_changed otherwise it must return 0. - - virtual void notify_handle (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Handle_Set &, - ACE_Event_Handler *eh, - ACE_EH_PTMF callback); - // Notify the appropriate <callback> in the context of the <eh> - // associated with <handle> that a particular event has occurred. - - ACE_Select_Reactor_Handler_Repository handler_rep_; - // Table that maps <ACE_HANDLEs> to <ACE_Event_Handler *>'s. - - ACE_Timer_Queue *timer_queue_; - // Defined as a pointer to allow overriding by derived classes... - - int delete_timer_queue_; - // Keeps track of whether we should delete the timer queue (if we - // didn't create it, then we don't delete it). - - ACE_Sig_Handler *signal_handler_; - // Handle signals without requiring global/static variables. - - int delete_signal_handler_; - // Keeps track of whether we should delete the signal handler (if we - // didn't create it, then we don't delete it). - - ACE_Reactor_Notify *notify_handler_; - // Callback object that unblocks the <ACE_Select_Reactor> if it's - // sleeping. - - int delete_notify_handler_; - // Keeps track of whether we need to delete the notify handler (if - // we didn't create it, then we don't delete it). - - ACE_Select_Reactor_Handle_Set wait_set_; - // Tracks handles that are waited for by select(). - - ACE_Select_Reactor_Handle_Set suspend_set_; - // Tracks handles that are currently suspended. - - ACE_Select_Reactor_Handle_Set ready_set_; - // Track HANDLES we are interested in for various events that must - // be dispatched *without* going through select(). - - int restart_; - // Restart automatically when interrupted - - int requeue_position_; - // Position that the main ACE_Select_Reactor thread is requeued in - // the list of waiters during a notify() callback. If this value == - // -1 we are requeued at the end of the list. Else if it's 0 then - // we are requeued at the front of the list. Else if it's > 1 then - // that indicates the number of waiters to skip over. - - int initialized_; - // True if we've been initialized yet... - - ACE_thread_t owner_; - // The original thread that created this Select_Reactor. - - int state_changed_; - // True if state has changed during dispatching of - // <ACE_Event_Handlers>, else false. This is used to determine - // whether we need to make another trip through the - // <Select_Reactor>'s <wait_for_multiple_events> loop. - - ACE_Select_Reactor_Token token_; - // Synchronization token for the MT_SAFE ACE_Select_Reactor. - - ACE_Lock_Adapter<ACE_Select_Reactor_Token> lock_adapter_; - // Adapter used to return internal lock to outside world. - - void renew (void); - // Enqueue ourselves into the list of waiting threads at the - // appropriate point specified by <requeue_position_>. - - int release_token (void); - // Release the token lock when a Win32 structured exception occurs. - - int handle_events_i (ACE_Time_Value *max_wait_time = 0); - // Stops the VC++ compiler from bitching about exceptions and destructors - - - int supress_notify_renew (void); - void supress_notify_renew (int sr); - // Controls/access whether the notify handler should renew the - // Select_Reactor's token or not. - - friend class ACE_Select_Reactor_Notify; - friend class ACE_Select_Reactor_Handler_Repository; - -private: - int supress_renew_; - // Determine whether we should renew Select_Reactor's token after handling - // the notification message. +typedef ACE_Select_Reactor_Token_T<ACE_Noop_Token> ACE_Select_Reactor_Token; +#endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */ - ACE_Select_Reactor (const ACE_Select_Reactor &); - ACE_Select_Reactor &operator = (const ACE_Select_Reactor &); - // Deny access since member-wise won't work... -}; +typedef ACE_Select_Reactor_T<ACE_Select_Reactor_Token> ACE_Select_Reactor; #if defined (__ACE_INLINE__) #include "ace/Select_Reactor.i" diff --git a/ace/Select_Reactor.i b/ace/Select_Reactor.i index 055b961c003..6318deb79a0 100644 --- a/ace/Select_Reactor.i +++ b/ace/Select_Reactor.i @@ -1,292 +1,2 @@ /* -*- C++ -*- */ // $Id$ - -#include "ace/Reactor.h" - -ACE_INLINE -ACE_Event_Tuple::~ACE_Event_Tuple (void) -{ -} - -ACE_INLINE -ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void) -{ -} - -ACE_INLINE -ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void) -{ -} - -ACE_INLINE -ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void) -{ -} - -ACE_INLINE int -ACE_Select_Reactor::resume_handler (ACE_Event_Handler *h) -{ - ACE_TRACE ("ACE_Select_Reactor::resume_handler"); - return this->resume_handler (h->get_handle ()); -} - -ACE_INLINE int -ACE_Select_Reactor::resume_handler (const ACE_Handle_Set &handles) -{ - ACE_TRACE ("ACE_Select_Reactor::resume_handler"); - ACE_Handle_Set_Iterator handle_iter (handles); - ACE_HANDLE h; - - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->resume_i (h) == -1) - return -1; - - return 0; -} - -ACE_INLINE int -ACE_Select_Reactor::suspend_handler (ACE_Event_Handler *h) -{ - ACE_TRACE ("ACE_Select_Reactor::suspend_handler"); - return this->suspend_handler (h->get_handle ()); -} - -ACE_INLINE int -ACE_Select_Reactor::suspend_handler (const ACE_Handle_Set &handles) -{ - ACE_TRACE ("ACE_Select_Reactor::suspend_handler"); - ACE_Handle_Set_Iterator handle_iter (handles); - ACE_HANDLE h; - - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); - - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->suspend_i (h) == -1) - return -1; - - return 0; -} - -ACE_INLINE int -ACE_Select_Reactor::register_handler (int signum, - ACE_Event_Handler *new_sh, - ACE_Sig_Action *new_disp, - ACE_Event_Handler **old_sh, - ACE_Sig_Action *old_disp) -{ - ACE_TRACE ("ACE_Select_Reactor::register_handler"); - return this->signal_handler_->register_handler (signum, - new_sh, new_disp, - old_sh, old_disp); -} - -#if defined (ACE_WIN32) - -ACE_INLINE int -ACE_Select_Reactor::register_handler (ACE_Event_Handler *event_handler, - ACE_HANDLE event_handle) -{ - // Don't have an implementation for this yet... - ACE_UNUSED_ARG (event_handler); - ACE_UNUSED_ARG (event_handle); - ACE_NOTSUP_RETURN (-1); -} - -#endif /* ACE_WIN32 */ - -ACE_INLINE int -ACE_Select_Reactor::register_handler (ACE_HANDLE event_handle, - ACE_HANDLE io_handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) -{ - // Don't have an implementation for this yet... - ACE_UNUSED_ARG (event_handle); - ACE_UNUSED_ARG (io_handle); - ACE_UNUSED_ARG (event_handler); - ACE_UNUSED_ARG (mask); - ACE_NOTSUP_RETURN (-1); -} - -ACE_INLINE int -ACE_Select_Reactor::handler (int signum, ACE_Event_Handler **handler) -{ - ACE_TRACE ("ACE_Select_Reactor::handler"); - return this->handler_i (signum, handler); -} - -ACE_INLINE int -ACE_Select_Reactor::remove_handler (int signum, - ACE_Sig_Action *new_disp, - ACE_Sig_Action *old_disp, - int sigkey) -{ - ACE_TRACE ("ACE_Select_Reactor::remove_handler"); - return this->signal_handler_->remove_handler (signum, new_disp, old_disp, sigkey); -} - -ACE_INLINE int -ACE_Select_Reactor::uses_event_associations (void) -{ - // Since the Select_Reactor does not do any event associations, this - // function always return 0. - return 0; -} - -// = The remaining methods in this file must be called with locks -// held. Note the queue handles its own locking. - -ACE_INLINE int -ACE_Select_Reactor::cancel_timer (ACE_Event_Handler *handler, - int dont_call_handle_close) -{ - ACE_TRACE ("ACE_Select_Reactor::cancel_timer"); - return this->timer_queue_ != 0 && - this->timer_queue_->cancel (handler, dont_call_handle_close); -} - -ACE_INLINE int -ACE_Select_Reactor::cancel_timer (long timer_id, - const void **arg, - int dont_call_handle_close) -{ - ACE_TRACE ("ACE_Select_Reactor::cancel_timer"); - return this->timer_queue_->cancel (timer_id, - arg, - dont_call_handle_close); -} - -// Performs operations on the "ready" bits. - -ACE_INLINE int -ACE_Select_Reactor::ready_ops (ACE_Event_Handler *handler, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_Select_Reactor::ready_ops"); - return this->ready_ops (handler->get_handle (), mask, ops); -} - -// Performs operations on the "dispatch" masks. - -ACE_INLINE int -ACE_Select_Reactor::mask_ops (ACE_Event_Handler *handler, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_Select_Reactor::mask_ops"); - return this->mask_ops (handler->get_handle (), mask, ops); -} - -ACE_INLINE int -ACE_Select_Reactor::schedule_wakeup (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::schedule_wakeup"); - return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK); -} - -ACE_INLINE int -ACE_Select_Reactor::cancel_wakeup (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::cancel_wakeup"); - return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK); -} - -ACE_INLINE int -ACE_Select_Reactor::schedule_wakeup (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::schedule_wakeup"); - return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK); -} - -ACE_INLINE int -ACE_Select_Reactor::cancel_wakeup (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor::cancel_wakeup"); - return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK); -} - -ACE_INLINE ACE_Lock & -ACE_Select_Reactor::lock (void) -{ - ACE_TRACE ("ACE_Select_Reactor::lock"); - return this->lock_adapter_; -} - -ACE_INLINE void -ACE_Select_Reactor::wakeup_all_threads (void) -{ - // Send a notification, but don't block if there's no one to receive - // it. - this->notify (0, ACE_Event_Handler::NULL_MASK, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -ACE_INLINE int -ACE_Select_Reactor::alertable_handle_events (ACE_Time_Value *max_wait_time) -{ - return this->handle_events (max_wait_time); -} - -ACE_INLINE int -ACE_Select_Reactor::alertable_handle_events (ACE_Time_Value &max_wait_time) -{ - return this->handle_events (max_wait_time); -} - -ACE_INLINE size_t -ACE_Select_Reactor_Handler_Repository::size (void) -{ - return this->max_size_; -} - -ACE_INLINE size_t -ACE_Select_Reactor::size (void) -{ - return this->handler_rep_.size (); -} - -ACE_INLINE int -ACE_Select_Reactor::supress_notify_renew (void) -{ - return this->supress_renew_; -} - -ACE_INLINE void -ACE_Select_Reactor::supress_notify_renew (int sr) -{ - this->supress_renew_ = sr; -} - - -ACE_INLINE -ACE_Event_Tuple::ACE_Event_Tuple (void) -: handle_ (ACE_INVALID_HANDLE), - event_handler_ (0) -{ -} - -ACE_INLINE -ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, - ACE_HANDLE h) -: handle_ (h), - event_handler_ (eh) -{ -} - -ACE_INLINE int -ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const -{ - return this->handle_ == rhs.handle_; -} - -ACE_INLINE int -ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const -{ - return !(*this == rhs); -} diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp new file mode 100644 index 00000000000..4f133a4702a --- /dev/null +++ b/ace/Select_Reactor_Base.cpp @@ -0,0 +1,751 @@ +// $Id$ + +#define ACE_BUILD_DLL + +#include "ace/Select_Reactor_Base.h" +#include "ace/Reactor.h" +#include "ace/Thread.h" +#include "ace/Synch_T.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Connector.h" +#include "ace/Timer_Heap.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Select_Reactor_Base.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, Select_Reactor_Base, "$Id$") + +#if defined (ACE_WIN32) +#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) +#else +#define ACE_SELECT_REACTOR_HANDLE(H) (H) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) +#endif /* ACE_WIN32 */ + +// Performs sanity checking on the ACE_HANDLE. + +int +ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle == ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle < 0 || handle >= this->max_size_) +#endif /* ACE_WIN32 */ + { + errno = EINVAL; + return 1; + } + else + return 0; +} + +// Performs sanity checking on the ACE_HANDLE. + +int +ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); +#if defined (ACE_WIN32) + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle != ACE_INVALID_HANDLE) +#else /* !ACE_WIN32 */ + if (handle >= 0 && handle < this->max_handlep1_) +#endif /* ACE_WIN32 */ + return 1; + else + { + errno = EINVAL; + return 0; + } +} + +size_t +ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); + + return this->max_handlep1_; +} + +int +ACE_Select_Reactor_Handler_Repository::open (size_t size) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); + this->max_size_ = size; + this->max_handlep1_ = 0; + +#if defined (ACE_WIN32) + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Tuple[size], + -1); + + // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. + for (size_t h = 0; h < size; h++) + { + ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; + } +#else + // Try to allocate the memory. + ACE_NEW_RETURN (this->event_handlers_, + ACE_Event_Handler *[size], + -1); + + // Initialize the ACE_Event_Handler * to NULL. + for (size_t h = 0; h < size; h++) + ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; +#endif /* ACE_WIN32 */ + + // Check to see if the user is asking for too much and fail in this + // case. + if (size > FD_SETSIZE) + { + errno = ERANGE; + return -1; + } + else + { + // Try to increase the number of handles if <size> is greater than + // the current limit. We ignore the return value here because this + // is more of a "warning" not an error. + (void) ACE::set_handle_limit (size); + return 0; + } +} + +// Initialize a repository of the appropriate <size>. + +ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor) + : select_reactor_ (select_reactor), + max_size_ (0), + max_handlep1_ (0), + event_handlers_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); +} + +int +ACE_Select_Reactor_Handler_Repository::unbind_all (void) +{ + // Unbind all of the <handle, ACE_Event_Handler>s. + for (int handle = 0; + handle < this->max_handlep1_; + handle++) + this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), + ACE_Event_Handler::ALL_EVENTS_MASK); + + return 0; +} + +int +ACE_Select_Reactor_Handler_Repository::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); + + if (this->event_handlers_ != 0) + { + this->unbind_all (); + + delete [] this->event_handlers_; + this->event_handlers_ = 0; + } + return 0; +} + +// Return the <ACE_Event_Handler *> associated with the <handle>. + +ACE_Event_Handler * +ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, + size_t *index_p) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); + + ACE_Event_Handler *eh = 0; + ssize_t i; + + // Only bother to search for the <handle> if it's in range. + if (this->handle_in_range (handle)) + { +#if defined (ACE_WIN32) + i = 0; + + for (; i < this->max_handlep1_; i++) + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); + break; + } +#else + i = handle; + + eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); +#endif /* ACE_WIN32 */ + } + else + // g++ can't figure out that i won't be used below if the handle + // is out of range, so keep it happy by defining i here . . . + i = 0; + + if (eh != 0 && index_p != 0) + *index_p = i; + else + errno = ENOENT; + + return eh; +} + +// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>. + +int +ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); + + if (handle == ACE_INVALID_HANDLE) + handle = event_handler->get_handle (); + + if (this->invalid_handle (handle)) + return -1; + +#if defined (ACE_WIN32) + int assigned_slot = -1; + + for (ssize_t i = 0; i < this->max_handlep1_; i++) + { + // Found it, so let's just reuse this location. + if (ACE_SELECT_REACTOR_HANDLE (i) == handle) + { + assigned_slot = i; + break; + } + // Here's the first free slot, so let's take it. + else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE + && assigned_slot == -1) + assigned_slot = i; + } + + if (assigned_slot > -1) + // We found a free spot, let's reuse it. + { + ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; + } + else if (this->max_handlep1_ < this->max_size_) + { + // Insert at the end of the active portion. + ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; + this->max_handlep1_++; + } + else + { + // No more room at the inn! + errno = ENOMEM; + return -1; + } +#else + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; + + if (this->max_handlep1_ < handle + 1) + this->max_handlep1_ = handle + 1; +#endif /* ACE_WIN32 */ + + // Add the <mask> for this <handle> in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::ADD_MASK); + + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + return 0; +} + +// Remove the binding of <ACE_HANDLE>. + +int +ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); + + size_t index; + ACE_Event_Handler *eh = this->find (handle, &index); + + if (eh == 0) + return -1; + + // Clear out the <mask> bits in the Select_Reactor's wait_set. + this->select_reactor_.bit_ops (handle, + mask, + this->select_reactor_.wait_set_, + ACE_Reactor::CLR_MASK); + + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->select_reactor_.state_changed_ = 1; + + // Close down the <Event_Handler> unless we've been instructed not + // to. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) + eh->handle_close (handle, mask); + + // If there are no longer any outstanding events on this <handle> + // then we can totally shut down the Event_Handler. + if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 + && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) +#if defined (ACE_WIN32) + { + ACE_SELECT_REACTOR_HANDLE (index) = ACE_INVALID_HANDLE; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, index) = 0; + + if (this->max_handlep1_ == (int) index + 1) + { + // We've deleted the last entry (i.e., i + 1 == the current + // size of the array), so we need to figure out the last + // valid place in the array that we should consider in + // subsequent searches. + + int i; + + for (i = this->max_handlep1_ - 1; + i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; + i--) + continue; + + this->max_handlep1_ = i + 1; + } + } +#else + { + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; + + if (this->max_handlep1_ == handle + 1) + { + // We've deleted the last entry, so we need to figure out + // the last valid place in the array that is worth looking + // at. + ACE_HANDLE rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); + ACE_HANDLE wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); + ACE_HANDLE ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); + + // Compute the maximum of three values. + this->max_handlep1_ = rd_max < wr_max ? wr_max : rd_max; + + if (this->max_handlep1_ < ex_max) + this->max_handlep1_ = ex_max; + + this->max_handlep1_++; + } + } +#endif /* ACE_WIN32 */ + + return 0; +} + +ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator + (const ACE_Select_Reactor_Handler_Repository *s) + : rep_ (s), + current_ (-1) +{ + this->advance (); +} + +// Pass back the <next_item> that hasn't been seen in the Set. +// Returns 0 when all items have been seen, else 1. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) +{ + int result = 1; + + if (this->current_ >= this->rep_->max_handlep1_) + result = 0; + else + next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, + this->current_); + return result; +} + +int +ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const +{ + return this->current_ >= this->rep_->max_handlep1_; +} + +// Move forward by one element in the set. + +int +ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) +{ + if (this->current_ < this->rep_->max_handlep1_) + this->current_++; + + while (this->current_ < this->rep_->max_handlep1_) + if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) + return 1; + else + this->current_++; + + return this->current_ < this->rep_->max_handlep1_; +} + +// Dump the state of an object. + +void +ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("rep_ = %u"), this->rep_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("current_ = %d"), this->current_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +void +ACE_Select_Reactor_Handler_Repository::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), + this->max_handlep1_, this->max_size_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("["))); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); + iter.next (eh) != 0; + iter.advance ()) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (eh = %x, eh->handle_ = %d)"), + eh, eh->get_handle ())); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" ]"))); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) + +ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void) + : max_notify_iterations_ (-1) +{ +} + +void +ACE_Select_Reactor_Notify::max_notify_iterations (int iterations) +{ + // Must always be > 0 or < 0 to optimize the loop exit condition. + if (iterations == 0) + iterations = 1; + + this->max_notify_iterations_ = iterations; +} + +int +ACE_Select_Reactor_Notify::max_notify_iterations (void) +{ + return this->max_notify_iterations_; +} + +void +ACE_Select_Reactor_Notify::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("select_reactor_ = %x"), this->select_reactor_)); + this->notification_pipe_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +int +ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, + ACE_Timer_Queue *, + int disable_notify_pipe) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::open"); + + if (disable_notify_pipe == 0) + { + this->select_reactor_ = ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); + + if (select_reactor_ == 0) + { + errno = EINVAL; + return -1; + } + + if (this->notification_pipe_.open () == -1) + return -1; + + // There seems to be a Win32 bug with this... Set this into + // non-blocking mode. + if (ACE::set_flags (this->notification_pipe_.read_handle (), + ACE_NONBLOCK) == -1) + return -1; + else + return this->select_reactor_->register_handler + (this->notification_pipe_.read_handle (), + this, + ACE_Event_Handler::READ_MASK); + } + else + { + this->select_reactor_ = 0; + return 0; + } +} + +int +ACE_Select_Reactor_Notify::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::close"); + return this->notification_pipe_.close (); +} + +ssize_t +ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); + + // Just consider this method a "no-op" if there's no + // <ACE_Select_Reactor> configured. + if (this->select_reactor_ == 0) + return 0; + else + { + ACE_Notification_Buffer buffer (eh, mask); + + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + if (n == -1) + return -1; + else + return 0; + } +} + +// Handles pending threads (if any) that are waiting to unblock the +// Select_Reactor. + +int +ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::handle_notification"); + + ACE_HANDLE read_handle = + this->notification_pipe_.read_handle (); + + if (read_handle != ACE_INVALID_HANDLE + && rd_mask.is_set (read_handle)) + { + number_of_active_handles--; + return this->handle_input (read_handle); + } + else + return 0; +} + +// Special trick to unblock <select> when updates occur in somewhere +// other than the main <ACE_Select_Reactor> thread. All we do is +// write data to a pipe that the <ACE_Select_Reactor> is listening on. +// Thanks to Paul Stephenson for suggesting this approach. + +int +ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); + // Precondition: this->select_reactor_.token_.current_owner () == + // ACE_Thread::self (); + + ACE_Notification_Buffer buffer; + ssize_t n; + int number_dispatched = 0; + + while ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) + { + // Check to see if we've got a short read. + if (n != sizeof buffer) + { + ssize_t remainder = sizeof buffer - n; + + // If so, try to recover by reading the remainder. If this + // doesn't work we're in big trouble since the input stream + // won't be aligned correctly. I'm not sure quite what to + // do at this point. It's probably best just to return -1. + if (ACE::recv (handle, + ((char *) &buffer) + n, + remainder) != remainder) + return -1; + } + + // If eh == 0 then another thread is unblocking the + // ACE_Select_Reactor to update the ACE_Select_Reactor's + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the ACE_Event_Handler pointer + // we've been passed. + if (buffer.eh_ != 0) + { + int result = 0; + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + + number_dispatched++; + + // Bail out if we've reached the <notify_threshold_>. Note that + // by default <notify_threshold_> is -1, so we'll loop until all + // the notifications in the pipe have been dispatched. + if (number_dispatched == this->max_notify_iterations_) + break; + } + + // Reassign number_dispatched to -1 if things have gone seriously + // wrong. + if (n <= 0 && errno != EWOULDBLOCK) + number_dispatched = -1; + + // Enqueue ourselves into the list of waiting threads. When we + // reacquire the token we'll be off and running again with ownership + // of the token. The postcondition of this call is that + // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); + this->select_reactor_->renew (); + return number_dispatched; +} + +// Perform GET, CLR, SET, and ADD operations on the Handle_Sets. +// +// GET = 1, Retrieve current value +// SET = 2, Set value of bits to new mask (changes the entire mask) +// ADD = 3, Bitwise "or" the value into the mask (only changes +// enabled bits) +// CLR = 4 Bitwise "and" the negation of the value out of the mask +// (only changes enabled bits) +// +// Returns the original mask. Must be called with locks held. + +int +ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Select_Reactor_Handle_Set &handle_set, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops"); + if (this->handler_rep_.handle_in_range (handle) == 0) + return -1; + +#if !defined (ACE_WIN32) + ACE_Sig_Guard sb; // Block out all signals until method returns. +#endif /* ACE_WIN32 */ + + ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit; + u_long omask = ACE_Event_Handler::NULL_MASK; + + switch (ops) + { + case ACE_Reactor::GET_MASK: + if (handle_set.rd_mask_.is_set (handle)) + ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK); + if (handle_set.wr_mask_.is_set (handle)) + ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK); + if (handle_set.ex_mask_.is_set (handle)) + ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK); + break; + + case ACE_Reactor::CLR_MASK: + ptmf = &ACE_Handle_Set::clr_bit; + /* FALLTHRU */ + case ACE_Reactor::SET_MASK: + /* FALLTHRU */ + case ACE_Reactor::ADD_MASK: + + // The following code is rather subtle... Note that if we are + // doing a ACE_Reactor::SET_MASK then if the bit is not enabled + // in the mask we need to clear the bit from the ACE_Handle_Set. + // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or + // a ACE_Reactor::ADD_MASK we just carry out the operations + // specified by the mask. + + // READ, ACCEPT, and CONNECT flag will place the handle in the + // read set. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) + || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK) + || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)) + { + (handle_set.rd_mask_.*ptmf) (handle); + ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK); + } + else if (ops == ACE_Reactor::SET_MASK) + handle_set.rd_mask_.clr_bit (handle); + + // WRITE and CONNECT flag will place the handle in the write set + if (ACE_BIT_ENABLED (mask, + ACE_Event_Handler::WRITE_MASK) + || ACE_BIT_ENABLED (mask, + ACE_Event_Handler::CONNECT_MASK)) + { + (handle_set.wr_mask_.*ptmf) (handle); + ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK); + } + else if (ops == ACE_Reactor::SET_MASK) + handle_set.wr_mask_.clr_bit (handle); + + // EXCEPT (and CONNECT on Win32) flag will place the handle in + // the except set. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) +#if defined (ACE_WIN32) + || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK) +#endif /* ACE_WIN32 */ + ) + { + (handle_set.ex_mask_.*ptmf) (handle); + ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK); + } + else if (ops == ACE_Reactor::SET_MASK) + handle_set.ex_mask_.clr_bit (handle); + break; + default: + return -1; + } + return omask; +} diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h new file mode 100644 index 00000000000..a64f8562ccf --- /dev/null +++ b/ace/Select_Reactor_Base.h @@ -0,0 +1,411 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Select_Reactor_Base.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_SELECT_REACTOR_BASE_H) +#define ACE_SELECT_REACTOR_BASE_H + +#include "ace/Signal.h" +#include "ace/Timer_Queue.h" +#include "ace/Event_Handler.h" +#include "ace/Handle_Set.h" +#include "ace/Token.h" +#include "ace/Pipe.h" +#include "ace/Reactor_Impl.h" + +// Add useful typedefs to simplify the following code. +typedef void (ACE_Handle_Set::*ACE_FDS_PTMF) (ACE_HANDLE); +typedef int (ACE_Event_Handler::*ACE_EH_PTMF) (ACE_HANDLE); + +// Forward declaration. +class ACE_Select_Reactor_Impl; + +class ACE_Export ACE_Select_Reactor_Handle_Set +{ + // = TITLE + // Track handles we are interested for various events. +public: + ACE_Handle_Set rd_mask_; + // Read events (e.g., input pending, accept pending). + + ACE_Handle_Set wr_mask_; + // Write events (e.g., flow control abated, non-blocking connection + // complete). + + ACE_Handle_Set ex_mask_; + // Exception events (e.g., SIG_URG). +}; + +class ACE_Export ACE_Event_Tuple +{ + // = TITLE + // An ACE_Event_Handler and its associated ACE_HANDLE. + // + // = DESCRIPTION + // One ACE_Event_Handler is registered for one or more + // ACE_HANDLE, in some points this information must be stored + // explicitly. This structure provides a lightweight mechanism + // to do so. +public: + ACE_Event_Tuple (void); + ACE_Event_Tuple (ACE_Event_Handler* eh, ACE_HANDLE h); + ~ACE_Event_Tuple (void); + + int operator== (const ACE_Event_Tuple &rhs) const; + // Equality operator. + + int operator!= (const ACE_Event_Tuple &rhs) const; + // Inequality operator. + + ACE_HANDLE handle_; + ACE_Event_Handler* event_handler_; +}; + +class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify +{ + // = TITLE + // Unblock the <ACE_Select_Reactor> from its event loop. + // + // = DESCRIPTION + // This implementation is necessary for cases where the + // <ACE_Select_Reactor> is run in a multi-threaded program. In + // this case, we need to be able to unblock select() or poll() + // when updates occur other than in the main + // <ACE_Select_Reactor> thread. To do this, we signal an + // auto-reset event the <ACE_Select_Reactor> is listening on. + // If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is + // passed to <notify>, the appropriate <handle_*> method is + // dispatched in the context of the <ACE_Select_Reactor> thread. +public: + ACE_Select_Reactor_Notify (void); + ~ACE_Select_Reactor_Notify (void); + // Default dtor. + + // = Initialization and termination methods. + virtual int open (ACE_Reactor_Impl *, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0); + // Initialize. + + virtual int close (void); + // Destroy. + + virtual ssize_t notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0); + // Called by a thread when it wants to unblock the + // <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if + // currently blocked in select()/poll(). Pass over both the + // <Event_Handler> *and* the <mask> to allow the caller to dictate + // which <Event_Handler> method the <ACE_Select_Reactor> will + // invoke. The <ACE_Time_Value> indicates how long to blocking + // trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, + // the caller will block until action is possible, else will wait + // until the relative time specified in *<timeout> elapses). + + virtual int dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask); + // Handles pending threads (if any) that are waiting to unblock the + // <ACE_Select_Reactor>. + + virtual int handle_input (ACE_HANDLE handle); + // Called back by the <ACE_Select_Reactor> when a thread wants to + // unblock us. + + virtual void max_notify_iterations (int); + // Set the maximum number of times that the + // <ACE_Select_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. By default, + // this is set to -1, which means "iterate until the pipe is empty." + // Setting this to a value like "1 or 2" will increase "fairness" + // (and thus prevent starvation) at the expense of slightly higher + // dispatching overhead. + + virtual int max_notify_iterations (void); + // Get the maximum number of times that the + // <ACE_Select_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. + + virtual void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Select_Reactor_Impl *select_reactor_; + // Keep a back pointer to the <ACE_Select_Reactor>. If this value + // if NULL then the <ACE_Select_Reactor> has been initialized with + // <disable_notify_pipe>. + + ACE_Pipe notification_pipe_; + // Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening + // on, as well as the <ACE_HANDLE> that threads wanting the + // attention of the <ACE_Select_Reactor> will write to. + + int max_notify_iterations_; + // Keeps track of the maximum number of times that the + // <ACE_Select_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. By default, + // this is set to -1, which means "iterate until the pipe is empty." +}; + +class ACE_Export ACE_Select_Reactor_Handler_Repository +{ + // = TITLE + // Used to map <ACE_HANDLE>s onto the appropriate + // <ACE_Event_Handler> *. + // + // = DESCRIPTION + // This class is necessary to shield differences between UNIX + // and Win32. In UNIX, <ACE_HANDLE> is an int, whereas in Win32 + // it's a void *. This class hides all these details from the + // bulk of the <ACE_Select_Reactor> code. All of these methods + // are called with the main <Select_Reactor> token lock held. +public: + friend class ACE_Select_Reactor_Handler_Repository_Iterator; + + // = Initialization and termination methods. + ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &); + // Default "do-nothing" constructor. + + ~ACE_Select_Reactor_Handler_Repository (void); + // dtor. + + int open (size_t size); + // Initialize a repository of the appropriate <size>. + + int close (void); + // Close down the repository. + + // = Search structure operations. + + ACE_Event_Handler *find (ACE_HANDLE handle, size_t *index_p = 0); + // Return the <ACE_Event_Handler *> associated with <ACE_HANDLE>. + // If <index_p> is non-0, then return the index location of the + // <handle>, if found. + + int bind (ACE_HANDLE, + ACE_Event_Handler *, + ACE_Reactor_Mask); + // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE> with the + // appropriate <ACE_Reactor_Mask> settings. + + int unbind (ACE_HANDLE, + ACE_Reactor_Mask mask); + // Remove the binding of <ACE_HANDLE> in accordance with the <mask>. + + int unbind_all (void); + // Remove all the <ACE_HANDLE, ACE_Event_Handler> tuples. + + // = Sanity checking. + + // Check the <handle> to make sure it's a valid ACE_HANDLE that + // within the range of legal handles (i.e., >= 0 && < max_size_). + int invalid_handle (ACE_HANDLE handle); + + // Check the <handle> to make sure it's a valid ACE_HANDLE that + // within the range of currently registered handles (i.e., >= 0 && < + // max_handlep1_). + int handle_in_range (ACE_HANDLE handle); + + // = Accessors. + size_t size (void); + // Returns the current table size. + + size_t max_handlep1 (void); + // Maximum ACE_HANDLE value, plus 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Select_Reactor_Impl &select_reactor_; + // Reference to our <Select_Reactor>. + + ssize_t max_size_; + // Maximum number of handles. + + int max_handlep1_; + // The highest currently active handle, plus 1 (ranges between 0 and + // <max_size_>. + +#if defined (ACE_WIN32) + // = The mapping from <HANDLES> to <Event_Handlers>. + + ACE_Event_Tuple *event_handlers_; + // The NT version implements this via a dynamically allocated + // array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE + // as a void * we can't directly index into this array. Therefore, + // we just do a linear search (for now). Next, we'll modify + // things to use hashing or something faster... +#else + ACE_Event_Handler **event_handlers_; + // The UNIX version implements this via a dynamically allocated + // array of <ACE_Event_Handler *> that is indexed directly using + // the ACE_HANDLE value. +#endif /* ACE_WIN32 */ +}; + +class ACE_Export ACE_Select_Reactor_Handler_Repository_Iterator +{ + // = TITLE + // Iterate through the <ACE_Select_Reactor_Handler_Repository>. +public: + // = Initialization method. + ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s); + + ~ACE_Select_Reactor_Handler_Repository_Iterator (void); + // dtor. + + // = Iteration methods. + + int next (ACE_Event_Handler *&next_item); + // Pass back the <next_item> that hasn't been seen in the Set. + // Returns 0 when all items have been seen, else 1. + + int done (void) const; + // Returns 1 when all items have been seen, else 0. + + int advance (void); + // Move forward by one element in the set. Returns 0 when all the + // items in the set have been seen, else 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + const ACE_Select_Reactor_Handler_Repository *rep_; + // Reference to the Handler_Repository we are iterating over. + + ssize_t current_; + // Pointer to the current iteration level. +}; + +class ACE_Export ACE_Select_Reactor_Impl : public ACE_Reactor_Impl +{ + // = TITLE + // This class simply defines how Select_Reactor's basic interface + // functions should look like and provides a common base class for + // Select_Reactor using various locking mechanism. +public: + enum + { + DEFAULT_SIZE = ACE_DEFAULT_SELECT_REACTOR_SIZE + // Default size of the Select_Reactor's handle table. + }; + + ACE_Select_Reactor_Impl (); + + friend class ACE_Select_Reactor_Notify; + friend class ACE_Select_Reactor_Handler_Repository; + +protected: + virtual int bit_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Select_Reactor_Handle_Set &wait_Set, + int ops); + // Allow manipulation of the <wait_set_> mask and <ready_set_> mask. + + virtual void renew (void) = 0; + // Enqueue ourselves into the list of waiting threads at the + // appropriate point specified by <requeue_position_>. + + ACE_Select_Reactor_Handler_Repository handler_rep_; + // Table that maps <ACE_HANDLEs> to <ACE_Event_Handler *>'s. + + ACE_Select_Reactor_Handle_Set wait_set_; + // Tracks handles that are waited for by select(). + + ACE_Select_Reactor_Handle_Set suspend_set_; + // Tracks handles that are currently suspended. + + ACE_Select_Reactor_Handle_Set ready_set_; + // Track HANDLES we are interested in for various events that must + // be dispatched *without* going through select(). + + ACE_Timer_Queue *timer_queue_; + // Defined as a pointer to allow overriding by derived classes... + + int delete_timer_queue_; + // Keeps track of whether we should delete the timer queue (if we + // didn't create it, then we don't delete it). + + ACE_Sig_Handler *signal_handler_; + // Handle signals without requiring global/static variables. + + int delete_signal_handler_; + // Keeps track of whether we should delete the signal handler (if we + // didn't create it, then we don't delete it). + + ACE_Reactor_Notify *notify_handler_; + // Callback object that unblocks the <ACE_Select_Reactor> if it's + // sleeping. + + int delete_notify_handler_; + // Keeps track of whether we need to delete the notify handler (if + // we didn't create it, then we don't delete it). + + int restart_; + // Restart automatically when interrupted + + int requeue_position_; + // Position that the main ACE_Select_Reactor thread is requeued in + // the list of waiters during a notify() callback. If this value == + // -1 we are requeued at the end of the list. Else if it's 0 then + // we are requeued at the front of the list. Else if it's > 1 then + // that indicates the number of waiters to skip over. + + int initialized_; + // True if we've been initialized yet... + + ACE_thread_t owner_; + // The original thread that created this Select_Reactor. + + int state_changed_; + // True if state has changed during dispatching of + // <ACE_Event_Handlers>, else false. This is used to determine + // whether we need to make another trip through the + // <Select_Reactor>'s <wait_for_multiple_events> loop. + + int supress_notify_renew (void); + void supress_notify_renew (int sr); + // Controls/access whether the notify handler should renew the + // Select_Reactor's token or not. + +private: + int supress_renew_; + // Determine whether we should renew Select_Reactor's token after handling + // the notification message. + + ACE_Select_Reactor_Impl (const ACE_Select_Reactor_Impl &); + ACE_Select_Reactor_Impl &operator = (const ACE_Select_Reactor_Impl &); + // Deny access since member-wise won't work... +}; + +#if defined (__ACE_INLINE__) +#include "ace/Select_Reactor_Base.i" +#endif /* __ACE_INLINE__ */ + +#endif /* ACE_SELECT_REACTOR_BASE_H */ diff --git a/ace/Select_Reactor_Base.i b/ace/Select_Reactor_Base.i new file mode 100644 index 00000000000..001a19fb753 --- /dev/null +++ b/ace/Select_Reactor_Base.i @@ -0,0 +1,83 @@ +/* -*- C++ -*- */ +// $Id$ + +#include "ace/Reactor.h" + +ACE_INLINE +ACE_Event_Tuple::~ACE_Event_Tuple (void) +{ +} + +ACE_INLINE +ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void) +{ +} + +ACE_INLINE +ACE_Select_Reactor_Handler_Repository::~ACE_Select_Reactor_Handler_Repository (void) +{ +} + +ACE_INLINE +ACE_Select_Reactor_Handler_Repository_Iterator::~ACE_Select_Reactor_Handler_Repository_Iterator (void) +{ +} + +ACE_INLINE size_t +ACE_Select_Reactor_Handler_Repository::size (void) +{ + return this->max_size_; +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (void) +: handle_ (ACE_INVALID_HANDLE), + event_handler_ (0) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, + ACE_HANDLE h) +: handle_ (h), + event_handler_ (eh) +{ +} + +ACE_INLINE int +ACE_Event_Tuple::operator== (const ACE_Event_Tuple &rhs) const +{ + return this->handle_ == rhs.handle_; +} + +ACE_INLINE int +ACE_Event_Tuple::operator!= (const ACE_Event_Tuple &rhs) const +{ + return !(*this == rhs); +} + +ACE_INLINE +ACE_Select_Reactor_Impl::ACE_Select_Reactor_Impl () + : handler_rep_ (*this), + timer_queue_ (0), + delete_timer_queue_ (0), + delete_signal_handler_ (0), + delete_notify_handler_ (0), + requeue_position_ (-1), // Requeue at end of waiters by default. + initialized_ (0), + state_changed_ (0), + supress_renew_ (0) +{ +} + +ACE_INLINE int +ACE_Select_Reactor_Impl::supress_notify_renew (void) +{ + return this->supress_renew_; +} + +ACE_INLINE void +ACE_Select_Reactor_Impl::supress_notify_renew (int sr) +{ + this->supress_renew_ = sr; +} diff --git a/ace/Select_Reactor_T.cpp b/ace/Select_Reactor_T.cpp new file mode 100644 index 00000000000..fb173264dde --- /dev/null +++ b/ace/Select_Reactor_T.cpp @@ -0,0 +1,1230 @@ +// $Id$ + +#if !defined (ACE_SELECT_REACTOR_T_C) +#define ACE_SELECT_REACTOR_T_C + +#define ACE_BUILD_DLL +#include "ace/Select_Reactor_T.h" +#include "ace/Timer_Heap.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Select_Reactor_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, Select_Reactor_T, "$Id$") + + ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_T) + +#if defined (ACE_WIN32) +#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) +#else +#define ACE_SELECT_REACTOR_HANDLE(H) (H) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) +#endif /* ACE_WIN32 */ + + template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::any_ready (ACE_Select_Reactor_Handle_Set &wait_set) +{ + ACE_TRACE ("ACE_Select_Reactor_T::fill_in_ready"); + +#if !defined (ACE_WIN32) + // Make this call signal safe. + ACE_Sig_Guard sb; +#endif /* ACE_WIN32 */ + + int number_ready = this->ready_set_.rd_mask_.num_set () + + this->ready_set_.wr_mask_.num_set () + + this->ready_set_.ex_mask_.num_set (); + + if (number_ready > 0) + { + wait_set.rd_mask_ = this->ready_set_.rd_mask_; + wait_set.wr_mask_ = this->ready_set_.wr_mask_; + wait_set.ex_mask_ = this->ready_set_.ex_mask_; + + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + } + + return number_ready; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler_i (int signum, ACE_Event_Handler **eh) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); + ACE_Event_Handler *handler = this->signal_handler_->handler (signum); + + if (handler == 0) + return -1; + else if (*eh != 0) + *eh = handler; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::initialized (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::initialized"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, 0)); + return this->initialized_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t tid, ACE_thread_t *o_id) +{ + ACE_TRACE ("ACE_Select_Reactor_T::owner"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (o_id) + *o_id = this->owner_; + + this->owner_ = tid; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t *t_id) +{ + ACE_TRACE ("ACE_Select_Reactor_T::owner"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + *t_id = this->owner_; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::requeue_position (int rp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); + ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); +#if defined (ACE_WIN32) + ACE_UNUSED_ARG (rp); + // Must always requeue ourselves "next" on Win32. + this->requeue_position_ = 0; +#else + this->requeue_position_ = rp; +#endif /* ACE_WIN32 */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::requeue_position (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->requeue_position_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::max_notify_iterations (int iterations) +{ + ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); + ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); + + this->notify_handler_->max_notify_iterations (iterations); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::max_notify_iterations (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->notify_handler_->max_notify_iterations (); +} + +// Enqueue ourselves into the list of waiting threads. +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::renew (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::renew"); +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + if (this->supress_notify_renew () == 0) + this->token_.renew (this->requeue_position_); +#endif /* defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) */ +} + +template <class ACE_SELECT_REACTOR_MUTEX> void +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_Token::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +template <class ACE_SELECT_REACTOR_MUTEX> +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::ACE_Select_Reactor_Token_T (ACE_Select_Reactor_Impl &r) + : select_reactor_ (&r) +{ + ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); +} + +template <class ACE_SELECT_REACTOR_MUTEX> +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::ACE_Select_Reactor_Token_T (void) + : select_reactor_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); +} + +template <class ACE_SELECT_REACTOR_MUTEX> +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::~ACE_Select_Reactor_Token_T (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Token::~ACE_Select_Reactor_Token"); +} + +template <class ACE_SELECT_REACTOR_MUTEX> ACE_Select_Reactor_Impl & +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::select_reactor (void) +{ + return *this->select_reactor_; +} + +template <class ACE_SELECT_REACTOR_MUTEX> void +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::select_reactor (ACE_Select_Reactor_Impl &select_reactor) +{ + this->select_reactor_ = &select_reactor; +} + +// Used to wakeup the Select_Reactor. + +template <class ACE_SELECT_REACTOR_MUTEX> void +ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::sleep_hook (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Token::sleep_hook"); + if (this->select_reactor_->notify () == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("sleep_hook failed"))); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Select_Reactor_T::notify"); + + ssize_t n = 0; + + // Pass over both the Event_Handler *and* the mask to allow the + // caller to dictate which Event_Handler method the receiver + // invokes. Note that this call can timeout. + + n = this->notify_handler_->notify (eh, mask, timeout); + return n == -1 ? -1 : 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->resume_i (handle); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->suspend_i (handle); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handlers (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handlers"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + this->suspend_i (eh->get_handle ()); + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handlers (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handlers"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + this->resume_i (eh->get_handle ()); + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handler->get_handle (), handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_HANDLE handle, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handle, handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handles, handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **handler) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->handler_i (handle, mask, handler); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (const ACE_Handle_Set &handles, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handles, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handler->get_handle (), mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handle, mask); +} + +// Performs operations on the "ready" bits. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ready_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ready_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->bit_ops (handle, + mask, + this->ready_set_, + ops); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::open (size_t size, + int restart, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify) +{ + ACE_TRACE ("ACE_Select_Reactor_T::open"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + // Can't initialize ourselves more than once. + if (this->initialized_ > 0) + return -1; + + this->owner_ = ACE_Thread::self (); + this->restart_ = restart; + this->signal_handler_ = sh; + this->timer_queue_ = tq; + this->notify_handler_ = notify; + + int result = 0; + + // Allows the signal handler to be overridden. + if (this->signal_handler_ == 0) + { + ACE_NEW_RETURN (this->signal_handler_, + ACE_Sig_Handler, + -1); + + if (this->signal_handler_ == 0) + result = -1; + else + this->delete_signal_handler_ = 1; + } + + // Allows the timer queue to be overridden. + if (result != -1 && this->timer_queue_ == 0) + { + ACE_NEW_RETURN (this->timer_queue_, + ACE_Timer_Heap, + -1); + + if (this->timer_queue_ == 0) + result = -1; + else + this->delete_timer_queue_ = 1; + } + + // Allows the Notify_Handler to be overridden. + if (result != -1 && this->notify_handler_ == 0) + { + ACE_NEW_RETURN (this->notify_handler_, + ACE_Select_Reactor_Notify, + -1); + + if (this->notify_handler_ == 0) + result = -1; + else + this->delete_notify_handler_ = 1; + } + + if (result != -1 && this->handler_rep_.open (size) == -1) + result = -1; + else if (this->notify_handler_->open (this, + 0, + disable_notify_pipe) == -1) + result = -1; + + if (result != -1) + // We're all set to go. + this->initialized_ = 1; + else + // This will close down all the allocated resources properly. + this->close (); + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::set_sig_handler (ACE_Sig_Handler *signal_handler) +{ + if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0) + delete this->signal_handler_; + this->signal_handler_ = signal_handler; + this->delete_signal_handler_ = 0; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::set_timer_queue (ACE_Timer_Queue *timer_queue) +{ + if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0) + delete this->timer_queue_; + this->timer_queue_ = timer_queue; + this->delete_timer_queue_ = 0; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ACE_Select_Reactor_T (ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify) + : token_ (*this), + lock_adapter_ (token_) + +{ + ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); + + if (this->open (ACE_Select_Reactor_T::DEFAULT_SIZE, + 0, + sh, + tq, + disable_notify_pipe, + notify) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE_Select_Reactor_T::open ") + ASYS_TEXT ("failed inside ACE_Select_Reactor_T::CTOR"))); +} + +// Initialize ACE_Select_Reactor_T. + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ACE_Select_Reactor_T (size_t size, + int rs, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify) + : token_ (*this), + lock_adapter_ (token_) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); + + if (this->open (size, + rs, + sh, + tq, + disable_notify_pipe, + notify) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE_Select_Reactor_T::open ") + ASYS_TEXT ("failed inside ACE_Select_Reactor_T::CTOR"))); +} + +// Close down the ACE_Select_Reactor_T instance, detaching any remaining +// Event_Handers. This had better be called from the main event loop +// thread... + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::close"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (this->delete_signal_handler_) + { + delete this->signal_handler_; + this->signal_handler_ = 0; + this->delete_signal_handler_ = 0; + } + + this->handler_rep_.close (); + + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->timer_queue_ = 0; + this->delete_timer_queue_ = 0; + } + + this->notify_handler_->close (); + + if (this->delete_notify_handler_) + { + delete this->notify_handler_; + this->notify_handler_ = 0; + this->delete_notify_handler_ = 0; + } + + this->initialized_ = 0; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::current_info (ACE_HANDLE, size_t &) +{ + return -1; +} + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::~ACE_Select_Reactor_T (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::~ACE_Select_Reactor_T"); + this->close (); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i (const ACE_Handle_Set &handles, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); + ACE_HANDLE h; + + ACE_Handle_Set_Iterator handle_iter (handles); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->remove_handler_i (h, mask) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); + ACE_HANDLE h; + + ACE_Handle_Set_Iterator handle_iter (handles); + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->register_handler_i (h, handler, mask) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; s++) + if (sigset.is_member (s) + && this->signal_handler_->register_handler (s, new_sh, + new_disp) == -1) + result = -1; +#else + ACE_UNUSED_ARG (sigset); + ACE_UNUSED_ARG (new_sh); + ACE_UNUSED_ARG (new_disp); +#endif /* ACE_NSIG */ + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (const ACE_Sig_Set &sigset) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; s++) + if (sigset.is_member (s) + && this->signal_handler_->remove_handler (s) == -1) + result = -1; +#else + ACE_UNUSED_ARG (sigset); +#endif /* ACE_NSIG */ + + return result; +} + +// Note the queue handles its own locking. + +template <class ACE_SELECT_REACTOR_TOKEN> long +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_timer (ACE_Event_Handler *handler, + const void *arg, + const ACE_Time_Value &delta_time, + const ACE_Time_Value &interval) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_timer"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + return this->timer_queue_->schedule + (handler, arg, timer_queue_->gettimeofday () + delta_time, interval); +} + +// Main event loop driver that blocks for <max_wait_time> before +// returning (will return earlier if I/O or signal events occur). + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events (ACE_Time_Value &max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); + + return this->handle_events (&max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_error (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_error"); + if (errno == EINTR) + return this->restart_; +#if defined (__MVS__) + // On MVS Open Edition, there can be a number of failure codes on a bad + // socket, so check_handles on anything other than EINTR. + else + return this->check_handles (); +#else + else if (errno == EBADF) + return this->check_handles (); + else + return -1; +#endif /* __MVS__ */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &ready_mask, + ACE_Event_Handler *event_handler, + ACE_EH_PTMF ptmf) +{ + ACE_TRACE ("ACE_Select_Reactor_T::notify_handle"); + // Check for removed handlers. + if (event_handler == 0) + return; + + int status = (event_handler->*ptmf) (handle); + + if (status < 0) + this->remove_handler_i (handle, mask); + else if (status > 0) + ready_mask.set_bit (handle); +} + +// Perform GET, CLR, SET, and ADD operations on the select() +// Handle_Sets. +// +// GET = 1, Retrieve current value +// SET = 2, Set value of bits to new mask (changes the entire mask) +// ADD = 3, Bitwise "or" the value into the mask (only changes +// enabled bits) +// CLR = 4 Bitwise "and" the negation of the value out of the mask +// (only changes enabled bits) +// +// Returns the original mask. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::mask_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::mask_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->bit_ops (handle, mask, + this->wait_set_, + ops); +} + +// Must be called with locks held. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **handler) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); + ACE_Event_Handler *h = this->handler_rep_.find (handle); + + if (h == 0) + return -1; + else + { + if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) + || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) + && this->wait_set_.rd_mask_.is_set (handle) == 0) + return -1; + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK) + && this->wait_set_.wr_mask_.is_set (handle) == 0) + return -1; + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) + && this->wait_set_.ex_mask_.is_set (handle) == 0) + return -1; + } + + if (handler != 0) + *handler = h; + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_i (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume"); + if (this->handler_rep_.find (handle) == 0) + return -1; + + if (this->suspend_set_.rd_mask_.is_set (handle)) + { + this->wait_set_.rd_mask_.set_bit (handle); + this->suspend_set_.rd_mask_.clr_bit (handle); + } + if (this->suspend_set_.wr_mask_.is_set (handle)) + { + this->wait_set_.wr_mask_.set_bit (handle); + this->suspend_set_.wr_mask_.clr_bit (handle); + } + if (this->suspend_set_.ex_mask_.is_set (handle)) + { + this->wait_set_.ex_mask_.set_bit (handle); + this->suspend_set_.ex_mask_.clr_bit (handle); + } + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_i (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend"); + if (this->handler_rep_.find (handle) == 0) + return -1; + + if (this->wait_set_.rd_mask_.is_set (handle)) + { + this->suspend_set_.rd_mask_.set_bit (handle); + this->wait_set_.rd_mask_.clr_bit (handle); + } + if (this->wait_set_.wr_mask_.is_set (handle)) + { + this->suspend_set_.wr_mask_.set_bit (handle); + this->wait_set_.wr_mask_.clr_bit (handle); + } + if (this->wait_set_.ex_mask_.is_set (handle)) + { + this->suspend_set_.ex_mask_.set_bit (handle); + this->wait_set_.ex_mask_.clr_bit (handle); + } + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); + + // Insert the <handle, event_handle> tuple into the Handler + // Repository. + return this->handler_rep_.bind (handle, event_handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); + + // Unbind this handle. + return this->handler_rep_.unbind (handle, mask); +} + +// Must be called with lock held. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &dispatch_set, + ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events"); + u_long width = 0; + ACE_Time_Value timer_buf (0); + ACE_Time_Value *this_timeout = &timer_buf; + + int number_of_active_handles = this->any_ready (dispatch_set); + + // If there are any bits enabled in the <ready_set_> then we'll + // handle those first, otherwise we'll block in select(). + + if (number_of_active_handles == 0) + { + do + { + if (this->timer_queue_->calculate_timeout (max_wait_time, + this_timeout) == 0) + this_timeout = 0; + + width = (u_long) this->handler_rep_.max_handlep1 (); + + dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; + dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; + dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; + + number_of_active_handles = ACE_OS::select (int (width), + dispatch_set.rd_mask_, + dispatch_set.wr_mask_, + dispatch_set.ex_mask_, + this_timeout); + } + while (number_of_active_handles == -1 && this->handle_error () > 0); + + // @@ Remove?! + if (number_of_active_handles > 0) + { +#if !defined (ACE_WIN32) + dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ()); + dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ()); + dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ()); +#endif /* ACE_WIN32 */ + } + } + + // Return the number of events to dispatch. + return number_of_active_handles; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_timer_handlers (void) +{ + int number_dispatched = this->timer_queue_->expire (); + return this->state_changed_ ? -1 : number_dispatched; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_notification_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set) +{ +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + // Check to see if the ACE_HANDLE associated with the + // Select_Reactor's notify hook is enabled. If so, it means that + // one or more other threads are trying to update the + // ACE_Select_Reactor_T's internal tables. We'll handle all these + // threads and then break out to continue the event loop. + + int number_dispatched = + this->notify_handler_->dispatch_notifications (number_of_active_handles, + dispatch_set.rd_mask_); + return this->state_changed_ ? -1 : number_dispatched; +#else + ACE_UNUSED_ARG (number_of_active_handles); + ACE_UNUSED_ARG (dispatch_set); + return 0; +#endif /* ACE_MT_SAFE */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback) +{ + ACE_HANDLE handle; + + ACE_Handle_Set_Iterator handle_iter (dispatch_mask); + + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE + && number_dispatched < number_of_active_handles + && this->state_changed_ == 0) + { + // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatching\n"))); + number_dispatched++; + this->notify_handle (handle, + mask, + ready_mask, + this->handler_rep_.find (handle), + callback); + } + + if (number_dispatched > 0 && this->state_changed_) + { + return -1; + } + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set) +{ + int number_dispatched = 0; + + // Handle output events (this code needs to come first to handle + // the obscure case of piggy-backed data coming along with the + // final handshake message of a nonblocking connection). + + // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatch - WRITE\n"))); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::WRITE_MASK, + dispatch_set.wr_mask_, + this->ready_set_.wr_mask_, + &ACE_Event_Handler::handle_output) == -1) + { + number_of_active_handles -= number_dispatched; + return -1; + } + + + // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatch - EXCEPT\n"))); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::EXCEPT_MASK, + dispatch_set.ex_mask_, + this->ready_set_.ex_mask_, + &ACE_Event_Handler::handle_exception) == -1) + { + number_of_active_handles -= number_dispatched; + return -1; + } + + // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatch - READ\n"))); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::READ_MASK, + dispatch_set.rd_mask_, + this->ready_set_.rd_mask_, + &ACE_Event_Handler::handle_input) == -1) + { + number_of_active_handles -= number_dispatched; + return -1; + } + + number_of_active_handles -= number_dispatched; + return number_dispatched; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch (int number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set) +{ + ACE_TRACE ("ACE_Select_Reactor_T::dispatch"); + + // The following do/while loop keeps dispatching as long as there + // are still active handles. Note that the only way we should ever + // iterate more than once through this loop is if signals occur + // while we're dispatching other handlers. + + do + { + // Note that we keep track of changes to our state. If any of + // the dispatch_*() methods below return -1 it means that the + // <wait_set_> state has changed as the result of an + // <ACE_Event_Handler> being dispatched. This means that we + // need to bail out and rerun the select() loop since our + // existing notion of handles in <dispatch_set> may no longer be + // correct. + // + // In the beginning, our state starts out unchanged. After + // every iteration (i.e., due to signals), our state starts out + // unchanged again. + + this->state_changed_ = 0; + + // Perform the Template Method for dispatching all the handlers. + + // Handle timers first since they may have higher latency + // constraints. + + if (this->dispatch_timer_handlers () == -1) + // State has changed or timer queue has failed, exit inner + // loop. + break; + + else if (number_of_active_handles <= 0) + // Bail out since we got here since select() was interrupted. + { + if (ACE_Sig_Handler::sig_pending () != 0) + { + ACE_Sig_Handler::sig_pending (0); + + // If any HANDLES in the <ready_set_> are activated as a + // result of signals they should be dispatched since + // they may be time critical... + number_of_active_handles = this->any_ready (dispatch_set); + } + else + return number_of_active_handles; + } + + // Next dispatch the notification handlers (if there are any to + // dispatch). These are required to handle multi-threads that + // are trying to update the <Reactor>. + + else if (this->dispatch_notification_handlers (number_of_active_handles, + dispatch_set) == -1) + break; // State has changed, exit inner loop. + + // Finally, dispatch the I/O handlers. + else if (this->dispatch_io_handlers (number_of_active_handles, + dispatch_set) == -1) + // State has changed, so exit the inner loop. + break; + } + while (number_of_active_handles > 0); + + return 1; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::release_token (void) +{ +#if defined (ACE_WIN32) + this->token_.release (); + return (int) EXCEPTION_CONTINUE_SEARCH; +#else + return 0; +#endif /* ACE_WIN32 */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events (ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); + + // Stash the current time -- the destructor of this object will + // automatically compute how much time elpased since this method was + // called. + ACE_Countdown_Time countdown (max_wait_time); + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1); + + if (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_) == 0) + return -1; + + // Update the countdown to reflect time waiting for the mutex. + countdown.update (); +#endif /* ACE_MT_SAFE */ + + return this->handle_events_i (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i (ACE_Time_Value *max_wait_time) +{ + int result = -1; + + ACE_SEH_TRY { + ACE_Select_Reactor_Handle_Set dispatch_set; + + int number_of_active_handles = + this->wait_for_multiple_events (dispatch_set, + max_wait_time); + + result = this->dispatch (number_of_active_handles, dispatch_set); + } + ACE_SEH_EXCEPT (this->release_token ()) { + // As it stands now, we catch and then rethrow all Win32 + // structured exceptions so that we can make sure to release the + // <token_> lock correctly. + } + + this->state_changed_ = 1; + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::check_handles (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::check_handles"); + +#if defined (ACE_WIN32) || defined (__MVS__) + ACE_Time_Value time_poll = ACE_Time_Value::zero; + ACE_Handle_Set rd_mask; +#endif /* ACE_WIN32 || MVS */ + + ACE_Event_Handler *eh = 0; + int result = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + { + ACE_HANDLE handle = eh->get_handle (); + + // Skip back to the beginning of the loop if the HANDLE is + // invalid. + if (handle == ACE_INVALID_HANDLE) + continue; + +#if defined (ACE_WIN32) || defined (__MVS__) + // Win32 needs to do the check this way because fstat won't work on + // a socket handle. MVS Open Edition needs to do it this way because, + // even though the docs say to check a handle with either select or + // fstat, the fstat method always says the handle is ok. + rd_mask.set_bit (handle); + + if (ACE_OS::select (int (handle) + 1, + rd_mask, 0, 0, + &time_poll) < 0) + { + result = 1; + this->remove_handler_i (handle, + ACE_Event_Handler::ALL_EVENTS_MASK); + } + rd_mask.clr_bit (handle); +#else /* !ACE_WIN32 && !MVS */ + struct stat temp; + + if (ACE_OS::fstat (handle, &temp) == -1) + { + result = 1; + this->remove_handler_i (handle, + ACE_Event_Handler::ALL_EVENTS_MASK); + } +#endif /* ACE_WIN32 || MVS */ + } + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dump (void) const +{ + ACE_TRACE ("ACE_Select_Reactor_T::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + this->timer_queue_->dump (); + this->handler_rep_.dump (); + this->signal_handler_->dump (); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("delete_signal_handler_ = %d\n"), + this->delete_signal_handler_)); + + ACE_HANDLE h; + + for (ACE_Handle_Set_Iterator handle_iter_wr (this->wait_set_.wr_mask_); + (h = handle_iter_wr ()) != ACE_INVALID_HANDLE; + ++handle_iter_wr) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("write_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("read_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_ex (this->wait_set_.ex_mask_); + (h = handle_iter_ex ()) != ACE_INVALID_HANDLE; + ++handle_iter_ex) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("except_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_wr_ready (this->ready_set_.wr_mask_); + (h = handle_iter_wr_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_wr_ready) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("write_handle_ready = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_rd_ready (this->ready_set_.rd_mask_); + (h = handle_iter_rd_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd_ready) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("read_handle_ready = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_ex_ready (this->ready_set_.ex_mask_); + (h = handle_iter_ex_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_ex_ready) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("except_handle_ready = %d\n"), h)); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("restart_ = %d\n"), this->restart_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nrequeue_position_ = %d\n"), this->requeue_position_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\ninitialized_ = %d\n"), this->initialized_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nowner_ = %d\n"), this->owner_)); + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + this->notify_handler_->dump (); + this->token_.dump (); +#endif /* ACE_MT_SAFE */ + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} +#endif /* ACE_SELECT_REACTOR_T_C */ diff --git a/ace/Select_Reactor_T.h b/ace/Select_Reactor_T.h new file mode 100644 index 00000000000..d65366a9b61 --- /dev/null +++ b/ace/Select_Reactor_T.h @@ -0,0 +1,605 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Select_Reactor_T.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_SELECT_REACTOR_T_H) +#define ACE_SELECT_REACTOR_T_H + +#include "ace/Select_Reactor_Base.h" + +// Forward declaration. +//template class ACE_Select_Reactor_T<cla + +template <class ACE_SELECT_REACTOR_MUTEX> +class ACE_Select_Reactor_Token_T : public ACE_SELECT_REACTOR_MUTEX +{ + // = TITLE + // Used as a synchronization mechanism to coordinate concurrent + // access to a Select_Reactor object. + // + // = DESCRIPTION + // This class is used to make the <ACE_Select_Reactor> + // thread-safe. By default, the thread that runs the + // <handle_events> loop holds the token, even when it is blocked + // in the <select> call. Whenever another thread wants to + // access the <ACE_Reactor> via its <register_handler>, + // <remove_handler>, etc. methods) it must ask the token owner + // for temporary release of the token. To accomplish this, the + // owner of a token must define a <sleep_hook> through which it + // can be notified to temporarily release the token if the + // current situation permits this. + // + // The owner of the token is responsible for deciding which + // request for the token can be granted. By using the + // <ACE_Token::renew> API, the thread that releases the token + // temporarily can specify to get the token back right after the + // other thread has completed using the token. Thus, there is a + // dedicated thread that owns the token ``by default.'' This + // thread grants other threads access to the token by ensuring + // that whenever somebody else has finished using the token the + // ``default owner'' first holds the token again, i.e., the + // owner has the chance to schedule other threads. + // + // The thread that most likely needs the token most of the time + // is the thread running the dispatch loop. Typically the token + // gets released prior to entering the <select> call and gets + // ``re-acquired'' as soon as the <select> call returns, which + // results probably in many calls to <release>/<acquire> that + // are not really needed since no other thread would need the + // token in the meantime. That's why the dispatcher thread is + // chosen to be the owner of the token. + // + // In case the token would have been released while in <select> + // there would be a good chance that the <fd_set> could have + // been modified while the <select> returns from blocking and + // trying to re-acquire the lock. Through the token mechanism + // it is ensured that while another thread is holding the token, + // the dispatcher thread is blocked in the <renew> call and not + // in <select>. Thus, it is not critical to change the + // <fd_set>. The implementation of the <sleep_hook> mechanism + // provided by the <ACE_Select_Reactor_Token> enables the + // default owner to be the thread that executes the dispatch + // loop. +public: + ACE_Select_Reactor_Token_T (ACE_Select_Reactor_Impl &r); + ACE_Select_Reactor_Token_T (void); + virtual ~ACE_Select_Reactor_Token_T (void); + + virtual void sleep_hook (void); + // Called just before the ACE_Event_Handler goes to sleep. + + ACE_Select_Reactor_Impl &select_reactor (void); + void select_reactor (ACE_Select_Reactor_Impl &); + // Set/Get methods + + virtual void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Select_Reactor_Impl *select_reactor_; +}; + +template <class ACE_SELECT_REACTOR_TOKEN> +class ACE_Select_Reactor_T : public ACE_Select_Reactor_Impl +{ + // = TITLE + // An object oriented event demultiplexor and event handler + // dispatcher. + // + // = DESCRIPTION + // The ACE_Select_Reactor is an object-oriented event + // demultiplexor and event handler dispatcher. The sources of + // events that the ACE_Select_Reactor waits for and dispatches + // includes I/O events, signals, and timer events. All public + // methods acquire the main <Select_Reactor> token lock and call + // down to private or protected methods, which assume that the + // lock is held and so therefore don't (re)acquire the lock. +public: + // = Initialization and termination methods. + + ACE_Select_Reactor_T (ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0, + ACE_Reactor_Notify *notify = 0); + // Initialize <ACE_Select_Reactor> with the default size. + + ACE_Select_Reactor_T (size_t size, + int restart = 0, + ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0, + ACE_Reactor_Notify *notify = 0); + // Initialize <ACE_Select_Reactor> with size <size>. + + virtual int open (size_t size = DEFAULT_SIZE, + int restart = 0, + ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0, + ACE_Reactor_Notify * = 0); + // Initialize <ACE_Select_Reactor> with size <size>. + + virtual int current_info (ACE_HANDLE, size_t & /* size */); + // Returns -1 (not used in this implementation); + + virtual int set_sig_handler (ACE_Sig_Handler *signal_handler); + // Use a user specified signal handler instead. + + virtual int set_timer_queue (ACE_Timer_Queue *timer_queue); + // Use a user specified timer queue instead. + + virtual int close (void); + // Close down the select_reactor and release all of its resources. + + virtual ~ACE_Select_Reactor_T (void); + // Close down the select_reactor and release all of its resources. + + // = Event loop drivers. + + virtual int handle_events (ACE_Time_Value *max_wait_time = 0); + virtual int alertable_handle_events (ACE_Time_Value *max_wait_time = 0); + // This event loop driver that blocks for <max_wait_time> before + // returning. It will return earlier if timer events, I/O events, + // or signal events occur. Note that <max_wait_time> can be 0, in + // which case this method blocks indefinitely until events occur. + // + // <max_wait_time> is decremented to reflect how much time this call + // took. For instance, if a time value of 3 seconds is passed to + // handle_events and an event occurs after 2 seconds, + // <max_wait_time> will equal 1 second. This can be used if an + // application wishes to handle events for some fixed amount of + // time. + // + // Returns the total number of <ACE_Event_Handler>s that were + // dispatched, 0 if the <max_wait_time> elapsed without dispatching + // any handlers, or -1 if something goes wrong. + // + // Current <alertable_handle_events> is identical to + // <handle_events>. + + virtual int handle_events (ACE_Time_Value &max_wait_time); + virtual int alertable_handle_events (ACE_Time_Value &max_wait_time); + // This method is just like the one above, except the + // <max_wait_time> value is a reference and can therefore never be + // NULL. + // + // Current <alertable_handle_events> is identical to + // <handle_events>. + + // = Register and remove <ACE_Event_Handler>s. + virtual int register_handler (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Register a <eh> with a particular <mask>. Note that the + // <Select_Reactor> will call eh->get_handle() to extract the + // underlying I/O handle. + + virtual int register_handler (ACE_HANDLE handle, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Register a <eh> with a particular <mask>. Note that since the + // <handle> is given the Select_Reactor will *not* call + // eh->get_handle() to extract the underlying I/O handle. + +#if defined (ACE_WIN32) + + // Originally this interface was available for all platforms, but + // because ACE_HANDLE is an int on non-Win32 platforms, compilers + // are not able to tell the difference between + // register_handler(ACE_Event_Handler*,ACE_Reactor_Mask) and + // register_handler(ACE_Event_Handler*,ACE_HANDLE). Therefore, we + // have restricted this method to Win32 only. + + virtual int register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle = ACE_INVALID_HANDLE); + // Not implemented. + +#endif /* ACE_WIN32 */ + + virtual int register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask); + // Not implemented. + + virtual int register_handler (const ACE_Handle_Set &handles, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Register <eh> with all the <handles> in the <Handle_Set>. + + virtual int register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0, + ACE_Event_Handler **old_sh = 0, + ACE_Sig_Action *old_disp = 0); + // Register <new_sh> to handle the signal <signum> using the + // <new_disp>. Returns the <old_sh> that was previously registered + // (if any), along with the <old_disp> of the signal handler. + + virtual int register_handler (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0); + // Registers <new_sh> to handle a set of signals <sigset> using the + // <new_disp>. + + virtual int remove_handler (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Removes the <mask> binding of <eh> from the Select_Reactor. If + // there are no more bindings for this <eh> then it is removed from + // the Select_Reactor. Note that the Select_Reactor will call + // eh->get_handle() to extract the underlying I/O handle. + + virtual int remove_handler (ACE_HANDLE handle, + ACE_Reactor_Mask); + // Removes the <mask> bind of <Event_Handler> whose handle is + // <handle> from the Select_Reactor. If there are no more bindings + // for this <eh> then it is removed from the Select_Reactor. + + virtual int remove_handler (const ACE_Handle_Set &handle_set, + ACE_Reactor_Mask); + // Removes all the <mask> bindings for handles in the <handle_set> + // bind of <Event_Handler>. If there are no more bindings for any + // of these handlers then they are removed from the Select_Reactor. + + virtual int remove_handler (int signum, + ACE_Sig_Action *new_disp, + ACE_Sig_Action *old_disp = 0, + int sigkey = -1); + // Remove the ACE_Event_Handler currently associated with <signum>. + // <sigkey> is ignored in this implementation since there is only + // one instance of a signal handler. Install the new disposition + // (if given) and return the previous disposition (if desired by the + // caller). Returns 0 on success and -1 if <signum> is invalid. + + virtual int remove_handler (const ACE_Sig_Set &sigset); + // Calls <remove_handler> for every signal in <sigset>. + + // = Suspend and resume Handlers. + + virtual int suspend_handler (ACE_Event_Handler *eh); + // Temporarily suspend the <Event_Handler> associated with <eh>. + + virtual int suspend_handler (ACE_HANDLE handle); + // Temporarily suspend the <Event_Handler> associated with <handle>. + + virtual int suspend_handler (const ACE_Handle_Set &handles); + // Suspend all <handles> in handle set temporarily. + + virtual int suspend_handlers (void); + // Suspend all the <Event_Handlers> in the Select_Reactor. + + virtual int resume_handler (ACE_Event_Handler *eh); + // Resume a temporarily suspend <Event_Handler> associated with + // <eh>. + + virtual int resume_handler (ACE_HANDLE handle); + // Resume a temporarily suspended <Event_Handler> associated with + // <handle>. + + virtual int resume_handler (const ACE_Handle_Set &handles); + // Resume all <handles> in handle set. + + virtual int resume_handlers (void); + // Resume all the <Event_Handlers> in the Select_Reactor. + + virtual int uses_event_associations (void); + // Return 1 if we any event associations were made by the reactor + // for the handles that it waits on, 0 otherwise. Since the + // Select_Reactor does not do any event associations, this function + // always return 0. + + // = Timer management. + virtual long schedule_timer (ACE_Event_Handler *, + const void *arg, + const ACE_Time_Value &delta_time, + const ACE_Time_Value &interval = ACE_Time_Value::zero); + // Schedule an <event_handler> that will expire after <delta_time> + // amount of time. If it expires then <arg> is passed in as the + // value to the <event_handler>'s <handle_timeout> callback method. + // If <interval> is != to <ACE_Time_Value::zero> then it is used to + // reschedule the <event_handler> automatically. This method + // returns a <timer_id> that uniquely identifies the <event_handler> + // in an internal list. This <timer_id> can be used to cancel an + // <event_handler> before it expires. The cancellation ensures that + // <timer_ids> are unique up to values of greater than 2 billion + // timers. As long as timers don't stay around longer than this + // there should be no problems with accidentally deleting the wrong + // timer. Returns -1 on failure (which is guaranteed never to be a + // valid <timer_id>. + + virtual int cancel_timer (ACE_Event_Handler *event_handler, + int dont_call_handle_close = 1); + // Cancel all <event_handlers> that match the address of + // <event_handler>. If <dont_call_handle_close> is 0 then the + // <handle_close> method of <event_handler> will be invoked. + // Returns number of handler's cancelled. + + virtual int cancel_timer (long timer_id, + const void **arg = 0, + int dont_call_handle_close = 1); + // Cancel the single <ACE_Event_Handler> that matches the <timer_id> + // value (which was returned from the <schedule> method). If arg is + // non-NULL then it will be set to point to the ``magic cookie'' + // argument passed in when the <Event_Handler> was registered. This + // makes it possible to free up the memory and avoid memory leaks. + // If <dont_call_handle_close> is 0 then the <handle_close> method + // of <event_handler> will be invoked. Returns 1 if cancellation + // succeeded and 0 if the <timer_id> wasn't found. + + // = High-level Event_Handler scheduling operations + + virtual int schedule_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // ADD the dispatch MASK "bit" bound with the <eh> and the <mask>. + + virtual int schedule_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + // ADD the dispatch MASK "bit" bound with the <handle> and the <mask>. + + virtual int cancel_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // CLR the dispatch MASK "bit" bound with the <eh> and the <mask>. + + virtual int cancel_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + // CLR the dispatch MASK "bit" bound with the <handle> and the <mask>. + + // = Notification methods. + virtual int notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0); + // Called by a thread when it wants to unblock the Select_Reactor. + // This wakeups the <ACE_Select_Reactor> if currently blocked in + // select()/poll(). Pass over both the <Event_Handler> *and* the + // <mask> to allow the caller to dictate which <Event_Handler> + // method the <Select_Reactor> will invoke. The <ACE_Time_Value> + // indicates how long to blocking trying to notify the + // <Select_Reactor>. If <timeout> == 0, the caller will block until + // action is possible, else will wait until the relative time + // specified in *<timeout> elapses). + + virtual void max_notify_iterations (int); + // Set the maximum number of times that the + // <ACE_Select_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. By default, + // this is set to -1, which means "iterate until the pipe is empty." + // Setting this to a value like "1 or 2" will increase "fairness" + // (and thus prevent starvation) at the expense of slightly higher + // dispatching overhead. + + virtual int max_notify_iterations (void); + // Get the maximum number of times that the + // <ACE_Select_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. + + virtual void requeue_position (int); + // Set position that the main ACE_Select_Reactor thread is requeued in the + // list of waiters during a notify() callback. + + virtual int requeue_position (void); + // Get position that the main ACE_Select_Reactor thread is requeued in the + // list of waiters during a notify() callback. + + // = Low-level wait_set mask manipulation methods. + virtual int mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops); + // GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and + // <mask>. + + virtual int mask_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops); + // GET/SET/ADD/CLR the dispatch MASK "bit" bound with the <handle> + // and <mask>. + + // = Low-level ready_set mask manipulation methods. + virtual int ready_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops); + // GET/SET/ADD/CLR the ready "bit" bound with the <eh> and <mask>. + + virtual int ready_ops (ACE_HANDLE handle, + ACE_Reactor_Mask, + int ops); + // GET/SET/ADD/CLR the ready "bit" bound with the <handle> and <mask>. + + virtual void wakeup_all_threads (void); + // Wake up all threads in waiting in the event loop + + // = Only the owner thread that can perform a <handle_events>. + + virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); + // Set the new owner of the thread and return the old owner. + + virtual int owner (ACE_thread_t *); + // Return the current owner of the thread. + + // = Miscellaneous Handler operations. + virtual int handler (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **eh = 0); + // Check to see if <handle> is associated with a valid Event_Handler + // bound to <mask>. Return the <eh> associated with this <handler> + // if <eh> != 0. + + virtual int handler (int signum, + ACE_Event_Handler ** = 0); + // Check to see if <signum> is associated with a valid Event_Handler + // bound to a signal. Return the <eh> associated with this + // <handler> if <eh> != 0. + + virtual int initialized (void); + // Returns true if we've been successfully initialized, else false. + + virtual size_t size (void); + // Returns the current size of the Reactor's internal descriptor + // table. + + virtual ACE_Lock &lock (void); + // Returns a reference to the <ACE_Select_Reactor_Token> that is + // used to serialize the internal Select_Reactor's processing logic. + // This can be useful for situations where you need to avoid + // deadlock efficiently when <ACE_Event_Handlers> are used in + // multiple threads. + + virtual void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + // = Internal methods that do the actual work. + + // All of these methods assume that the <Select_Reactor>'s token + // lock is held by the public methods that call down to them. + + virtual int register_handler_i (ACE_HANDLE handle, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Do the work of actually binding the <handle> and <eh> with the + // <mask>. + + virtual int register_handler_i (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask); + // Register a set of <handles>. + + virtual int remove_handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask); + // Do the work of actually unbinding the <handle> and <eh> with the + // <mask>. + + virtual int remove_handler_i (const ACE_Handle_Set &handles, + ACE_Reactor_Mask); + // Remove a set of <handles>. + + virtual int suspend_i (ACE_HANDLE handle); + // Suspend the <Event_Handler> associated with <handle> + + virtual int resume_i (ACE_HANDLE handle); + // Resume the <Event_Handler> associated with <handle> + + virtual int handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask, + ACE_Event_Handler ** = 0); + // Implement the public <handler> method. + + virtual int handler_i (int signum, ACE_Event_Handler ** = 0); + // Implement the public <handler> method. + + virtual int any_ready (ACE_Select_Reactor_Handle_Set &handle_set); + // Check if there are any HANDLEs enabled in the <ready_set_>, and + // if so, update the <handle_set> and return the number ready. If + // there aren't any HANDLEs enabled return 0. + + virtual int handle_error (void); + // Take corrective action when errors occur. + + virtual int check_handles (void); + // Make sure the handles are all valid. + + virtual int wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &, + ACE_Time_Value *); + // Wait for events to occur. + + // = Dispatching methods. + + virtual int dispatch (int nfound, + ACE_Select_Reactor_Handle_Set &); + // Template Method that dispatches <ACE_Event_Handler>s for time + // events, I/O events, and signal events. Returns the total number + // of <ACE_Event_Handler>s that were dispatched or -1 if something + // goes wrong. + + virtual int dispatch_timer_handlers (void); + // Dispatch any expired timer handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of timer + // handlers dispatched. + + virtual int dispatch_notification_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set); + // Dispatch any notification handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of handlers + // notified. + + virtual int dispatch_io_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set); + // Dispatch all the input/output/except handlers that are enabled in + // the <dispatch_set>. Returns -1 if the state of the <wait_set_> + // has changed, else returns number of handlers dispatched. + + virtual int dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback); + // Factors the dispatching of an io handle set (each WRITE, EXCEPT + // or READ set of handles). + // It updates the number of handles already dispatched and + // invokes this->notify_handle for all the handles in <dispatch_set> + // using the <mask>, <ready_set> and <callback> parameters. + // Must return -1 if this->state_changed otherwise it must return 0. + + virtual void notify_handle (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &, + ACE_Event_Handler *eh, + ACE_EH_PTMF callback); + // Notify the appropriate <callback> in the context of the <eh> + // associated with <handle> that a particular event has occurred. + + virtual void renew (void); + // Enqueue ourselves into the list of waiting threads at the + // appropriate point specified by <requeue_position_>. + + ACE_SELECT_REACTOR_TOKEN token_; + // Synchronization token for the MT_SAFE ACE_Select_Reactor. + + ACE_Lock_Adapter<ACE_SELECT_REACTOR_TOKEN> lock_adapter_; + // Adapter used to return internal lock to outside world. + + int release_token (void); + // Release the token lock when a Win32 structured exception occurs. + + int handle_events_i (ACE_Time_Value *max_wait_time = 0); + // Stops the VC++ compiler from bitching about exceptions and destructors + + +private: + ACE_Select_Reactor_T (const ACE_Select_Reactor_T &); + ACE_Select_Reactor_T &operator = (const ACE_Select_Reactor_T &); + // Deny access since member-wise won't work... +}; + +#if defined (__ACE_INLINE__) +#include "ace/Select_Reactor_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Select_Reactor_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Select_Reactor_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_SELECT_REACTOR_T_H */ diff --git a/ace/Select_Reactor_T.i b/ace/Select_Reactor_T.i new file mode 100644 index 00000000000..8d2432d5b5e --- /dev/null +++ b/ace/Select_Reactor_T.i @@ -0,0 +1,226 @@ +/* -*- C++ -*- */ +// $Id$ + +#include "ace/Reactor.h" + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_Event_Handler *h) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); + return this->resume_handler (h->get_handle ()); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (const ACE_Handle_Set &handles) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->resume_i (h) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (ACE_Event_Handler *h) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); + return this->suspend_handler (h->get_handle ()); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (const ACE_Handle_Set &handles) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->suspend_i (h) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp, + ACE_Event_Handler **old_sh, + ACE_Sig_Action *old_disp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + return this->signal_handler_->register_handler (signum, + new_sh, new_disp, + old_sh, old_disp); +} + +#if defined (ACE_WIN32) + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (event_handler); + ACE_UNUSED_ARG (event_handle); + ACE_NOTSUP_RETURN (-1); +} + +#endif /* ACE_WIN32 */ + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (event_handle); + ACE_UNUSED_ARG (io_handle); + ACE_UNUSED_ARG (event_handler); + ACE_UNUSED_ARG (mask); + ACE_NOTSUP_RETURN (-1); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler (int signum, ACE_Event_Handler **handler) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler"); + return this->handler_i (signum, handler); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (int signum, + ACE_Sig_Action *new_disp, + ACE_Sig_Action *old_disp, + int sigkey) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + return this->signal_handler_->remove_handler (signum, new_disp, old_disp, sigkey); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::uses_event_associations (void) +{ + // Since the Select_Reactor does not do any event associations, this + // function always return 0. + return 0; +} + +// = The remaining methods in this file must be called with locks +// held. Note the queue handles its own locking. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (ACE_Event_Handler *handler, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + return this->timer_queue_ != 0 && + this->timer_queue_->cancel (handler, dont_call_handle_close); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (long timer_id, + const void **arg, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + return this->timer_queue_->cancel (timer_id, + arg, + dont_call_handle_close); +} + +// Performs operations on the "ready" bits. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ready_ops (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ready_ops"); + return this->ready_ops (handler->get_handle (), mask, ops); +} + +// Performs operations on the "dispatch" masks. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::mask_ops (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::mask_ops"); + return this->mask_ops (handler->get_handle (), mask, ops); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_wakeup"); + return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_wakeup"); + return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_wakeup"); + return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_wakeup"); + return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE ACE_Lock & +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::lock (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::lock"); + return this->lock_adapter_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wakeup_all_threads (void) +{ + // Send a notification, but don't block if there's no one to receive + // it. + this->notify (0, ACE_Event_Handler::NULL_MASK, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::alertable_handle_events (ACE_Time_Value *max_wait_time) +{ + return this->handle_events (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::alertable_handle_events (ACE_Time_Value &max_wait_time) +{ + return this->handle_events (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE size_t +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::size (void) +{ + return this->handler_rep_.size (); +} diff --git a/ace/Synch.h b/ace/Synch.h index 6ff48eca1ef..aa548fb2ede 100644 --- a/ace/Synch.h +++ b/ace/Synch.h @@ -692,6 +692,18 @@ public: // Declare the dynamic allocation hooks. }; +class ACE_Export ACE_Noop_Token : public ACE_Null_Mutex +{ +public: + int renew (int = 0, ACE_Time_Value * =0); + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. +}; + class ACE_Export ACE_Null_Condition { // = TITLE @@ -1449,7 +1461,7 @@ class ACE_Export ACE_Guard<ACE_Null_Mutex> { // = TITLE // Template specialization of <ACE_Guard> for the - // <ACE_Null_Mutex>. + // <ACE_Null_Mutex>. // // = DESCRIPTION // This specialization is useful since it helps to speedup @@ -1480,9 +1492,9 @@ class ACE_Export ACE_Write_Guard<ACE_Null_Mutex> : public ACE_Guard<ACE_Null_Mut { // = TITLE public: - ACE_Write_Guard (ACE_Null_Mutex &m) + ACE_Write_Guard (ACE_Null_Mutex &m) : ACE_Guard<ACE_Null_Mutex> (m) {} - ACE_Write_Guard (ACE_Null_Mutex &m, int blocked) + ACE_Write_Guard (ACE_Null_Mutex &m, int blocked) : ACE_Guard<ACE_Null_Mutex> (m, blocked) {} int acquire_write (void) { return 0; } @@ -1499,9 +1511,9 @@ class ACE_Export ACE_Read_Guard<ACE_Null_Mutex> : public ACE_Guard<ACE_Null_Mute { // = TITLE public: - ACE_Read_Guard (ACE_Null_Mutex &m) + ACE_Read_Guard (ACE_Null_Mutex &m) : ACE_Guard<ACE_Null_Mutex> (m) {} - ACE_Read_Guard (ACE_Null_Mutex &m, int blocked) + ACE_Read_Guard (ACE_Null_Mutex &m, int blocked) : ACE_Guard<ACE_Null_Mutex> (m, blocked) {} int acquire_write (void) { return 0; } diff --git a/ace/Synch.i b/ace/Synch.i index dc4f41df118..f5d68e67606 100644 --- a/ace/Synch.i +++ b/ace/Synch.i @@ -805,6 +805,18 @@ ACE_Null_Mutex::dump (void) const { } +ACE_INLINE int +ACE_Noop_Token::renew (int, ACE_Time_Value *) +{ + return 0; +} + +ACE_INLINE void +ACE_Noop_Token::dump (void) const +{ +} + + ACE_INLINE ACE_Null_Condition::ACE_Null_Condition (ACE_Null_Mutex &m, int, LPCTSTR, void*) @@ -912,7 +924,7 @@ ACE_Auto_Event::~ACE_Auto_Event (void) } #if defined (ACE_HAS_THREADS) -ACE_INLINE +ACE_INLINE ACE_RW_Thread_Mutex::~ACE_RW_Thread_Mutex (void) { } |