diff options
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 629 | ||||
-rw-r--r-- | ace/Select_Reactor_T.h | 290 | ||||
-rw-r--r-- | ace/Select_Reactor_T.i | 182 |
3 files changed, 466 insertions, 635 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 52f3aded471..4f133a4702a 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -749,632 +749,3 @@ ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle, } return omask; } - -int -ACE_Select_Reactor_Impl::any_ready (ACE_Select_Reactor_Handle_Set &wait_set) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::fill_in_ready"); - -#if !defined (ACE_WIN32) - // Make this call signal safe. - ACE_Sig_Guard sb; -#endif /* ACE_WIN32 */ - - int number_ready = this->ready_set_.rd_mask_.num_set () - + this->ready_set_.wr_mask_.num_set () - + this->ready_set_.ex_mask_.num_set (); - - if (number_ready > 0) - { - wait_set.rd_mask_ = this->ready_set_.rd_mask_; - wait_set.wr_mask_ = this->ready_set_.wr_mask_; - wait_set.ex_mask_ = this->ready_set_.ex_mask_; - - this->ready_set_.rd_mask_.reset (); - this->ready_set_.wr_mask_.reset (); - this->ready_set_.ex_mask_.reset (); - } - - return number_ready; -} - -int -ACE_Select_Reactor_Impl::handler_i (int signum, ACE_Event_Handler **eh) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::handler_i"); - ACE_Event_Handler *handler = this->signal_handler_->handler (signum); - - if (handler == 0) - return -1; - else if (*eh != 0) - *eh = handler; - return 0; -} - -int -ACE_Select_Reactor_Impl::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::notify"); - - ssize_t n = 0; - - // Pass over both the Event_Handler *and* the mask to allow the - // caller to dictate which Event_Handler method the receiver - // invokes. Note that this call can timeout. - - n = this->notify_handler_->notify (eh, mask, timeout); - return n == -1 ? -1 : 0; -} - -int -ACE_Select_Reactor_Impl::set_sig_handler (ACE_Sig_Handler *signal_handler) -{ - if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0) - delete this->signal_handler_; - this->signal_handler_ = signal_handler; - this->delete_signal_handler_ = 0; - return 0; -} - -int -ACE_Select_Reactor_Impl::set_timer_queue (ACE_Timer_Queue *timer_queue) -{ - if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0) - delete this->timer_queue_; - this->timer_queue_ = timer_queue; - this->delete_timer_queue_ = 0; - return 0; -} - -int -ACE_Select_Reactor_Impl::remove_handler_i (const ACE_Handle_Set &handles, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::remove_handler_i"); - ACE_HANDLE h; - - ACE_Handle_Set_Iterator handle_iter (handles); - - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->remove_handler_i (h, mask) == -1) - return -1; - - return 0; -} - -int -ACE_Select_Reactor_Impl::register_handler_i (const ACE_Handle_Set &handles, - ACE_Event_Handler *handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::register_handler_i"); - ACE_HANDLE h; - - ACE_Handle_Set_Iterator handle_iter (handles); - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - if (this->register_handler_i (h, handler, mask) == -1) - return -1; - - return 0; -} - -int -ACE_Select_Reactor_Impl::register_handler (const ACE_Sig_Set &sigset, - ACE_Event_Handler *new_sh, - ACE_Sig_Action *new_disp) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::register_handler"); - - int result = 0; - -#if (ACE_NSIG > 0) - for (int s = 1; s < ACE_NSIG; s++) - if (sigset.is_member (s) - && this->signal_handler_->register_handler (s, new_sh, - new_disp) == -1) - result = -1; -#else - ACE_UNUSED_ARG (sigset); - ACE_UNUSED_ARG (new_sh); - ACE_UNUSED_ARG (new_disp); -#endif /* ACE_NSIG */ - return result; -} - -int -ACE_Select_Reactor_Impl::remove_handler (const ACE_Sig_Set &sigset) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::remove_handler"); - int result = 0; - -#if (ACE_NSIG > 0) - for (int s = 1; s < ACE_NSIG; s++) - if (sigset.is_member (s) - && this->signal_handler_->remove_handler (s) == -1) - result = -1; -#else - ACE_UNUSED_ARG (sigset); -#endif /* ACE_NSIG */ - - return result; -} - -// Main event loop driver that blocks for <max_wait_time> before -// returning (will return earlier if I/O or signal events occur). - -int -ACE_Select_Reactor_Impl::handle_events (ACE_Time_Value &max_wait_time) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::handle_events"); - - return this->handle_events (&max_wait_time); -} - -int -ACE_Select_Reactor_Impl::handle_error (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::handle_error"); - if (errno == EINTR) - return this->restart_; -#if defined (__MVS__) - // On MVS Open Edition, there can be a number of failure codes on a bad - // socket, so check_handles on anything other than EINTR. - else - return this->check_handles (); -#else - else if (errno == EBADF) - return this->check_handles (); - else - return -1; -#endif /* __MVS__ */ -} - -void -ACE_Select_Reactor_Impl::notify_handle (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Handle_Set &ready_mask, - ACE_Event_Handler *event_handler, - ACE_EH_PTMF ptmf) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::notify_handle"); - // Check for removed handlers. - if (event_handler == 0) - return; - - int status = (event_handler->*ptmf) (handle); - - if (status < 0) - this->remove_handler_i (handle, mask); - else if (status > 0) - ready_mask.set_bit (handle); -} - -// Must be called with locks held. - -int -ACE_Select_Reactor_Impl::handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - ACE_Event_Handler **handler) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::handler_i"); - ACE_Event_Handler *h = this->handler_rep_.find (handle); - - if (h == 0) - return -1; - else - { - if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) - || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) - && this->wait_set_.rd_mask_.is_set (handle) == 0) - return -1; - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK) - && this->wait_set_.wr_mask_.is_set (handle) == 0) - return -1; - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) - && this->wait_set_.ex_mask_.is_set (handle) == 0) - return -1; - } - - if (handler != 0) - *handler = h; - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor_Impl::resume_i (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::resume"); - if (this->handler_rep_.find (handle) == 0) - return -1; - - if (this->suspend_set_.rd_mask_.is_set (handle)) - { - this->wait_set_.rd_mask_.set_bit (handle); - this->suspend_set_.rd_mask_.clr_bit (handle); - } - if (this->suspend_set_.wr_mask_.is_set (handle)) - { - this->wait_set_.wr_mask_.set_bit (handle); - this->suspend_set_.wr_mask_.clr_bit (handle); - } - if (this->suspend_set_.ex_mask_.is_set (handle)) - { - this->wait_set_.ex_mask_.set_bit (handle); - this->suspend_set_.ex_mask_.clr_bit (handle); - } - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor_Impl::suspend_i (ACE_HANDLE handle) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::suspend"); - if (this->handler_rep_.find (handle) == 0) - return -1; - - if (this->wait_set_.rd_mask_.is_set (handle)) - { - this->suspend_set_.rd_mask_.set_bit (handle); - this->wait_set_.rd_mask_.clr_bit (handle); - } - if (this->wait_set_.wr_mask_.is_set (handle)) - { - this->suspend_set_.wr_mask_.set_bit (handle); - this->wait_set_.wr_mask_.clr_bit (handle); - } - if (this->wait_set_.ex_mask_.is_set (handle)) - { - this->suspend_set_.ex_mask_.set_bit (handle); - this->wait_set_.ex_mask_.clr_bit (handle); - } - return 0; -} - -// Must be called with locks held - -int -ACE_Select_Reactor_Impl::register_handler_i (ACE_HANDLE handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::register_handler_i"); - - // Insert the <handle, event_handle> tuple into the Handler - // Repository. - return this->handler_rep_.bind (handle, event_handler, mask); -} - -int -ACE_Select_Reactor_Impl::remove_handler_i (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::remove_handler_i"); - - // Unbind this handle. - return this->handler_rep_.unbind (handle, mask); -} - -// Must be called with lock held. - -int -ACE_Select_Reactor_Impl::wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &dispatch_set, - ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::wait_for_multiple_events"); - u_long width = 0; - ACE_Time_Value timer_buf (0); - ACE_Time_Value *this_timeout = &timer_buf; - - int number_of_active_handles = this->any_ready (dispatch_set); - - // If there are any bits enabled in the <ready_set_> then we'll - // handle those first, otherwise we'll block in select(). - - if (number_of_active_handles == 0) - { - do - { - if (this->timer_queue_->calculate_timeout (max_wait_time, - this_timeout) == 0) - this_timeout = 0; - - width = (u_long) this->handler_rep_.max_handlep1 (); - - dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; - dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; - dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; - - number_of_active_handles = ACE_OS::select (int (width), - dispatch_set.rd_mask_, - dispatch_set.wr_mask_, - dispatch_set.ex_mask_, - this_timeout); - } - while (number_of_active_handles == -1 && this->handle_error () > 0); - - // @@ Remove?! - if (number_of_active_handles > 0) - { -#if !defined (ACE_WIN32) - dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ()); - dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ()); - dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ()); -#endif /* ACE_WIN32 */ - } - } - - // Return the number of events to dispatch. - return number_of_active_handles; -} - -int -ACE_Select_Reactor_Impl::dispatch_timer_handlers (void) -{ - int number_dispatched = this->timer_queue_->expire (); - return this->state_changed_ ? -1 : number_dispatched; -} - -int -ACE_Select_Reactor_Impl::dispatch_notification_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - // Check to see if the ACE_HANDLE associated with the - // Select_Reactor's notify hook is enabled. If so, it means that - // one or more other threads are trying to update the - // ACE_Select_Reactor_T's internal tables. We'll handle all these - // threads and then break out to continue the event loop. - - int number_dispatched = - this->notify_handler_->dispatch_notifications (number_of_active_handles, - dispatch_set.rd_mask_); - return this->state_changed_ ? -1 : number_dispatched; -#else - ACE_UNUSED_ARG (number_of_active_handles); - ACE_UNUSED_ARG (dispatch_set); - return 0; -#endif /* ACE_MT_SAFE */ -} - -int -ACE_Select_Reactor_Impl::dispatch_io_set (int number_of_active_handles, - int& number_dispatched, - int mask, - ACE_Handle_Set& dispatch_mask, - ACE_Handle_Set& ready_mask, - ACE_EH_PTMF callback) -{ - ACE_HANDLE handle; - - ACE_Handle_Set_Iterator handle_iter (dispatch_mask); - - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE - && number_dispatched < number_of_active_handles - && this->state_changed_ == 0) - { - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatching\n"))); - number_dispatched++; - this->notify_handle (handle, - mask, - ready_mask, - this->handler_rep_.find (handle), - callback); - } - - if (number_dispatched > 0 && this->state_changed_) - { - return -1; - } - - return 0; -} - -int -ACE_Select_Reactor_Impl::dispatch_io_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ - int number_dispatched = 0; - - // Handle output events (this code needs to come first to handle - // the obscure case of piggy-backed data coming along with the - // final handshake message of a nonblocking connection). - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_T::dispatch - WRITE\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::WRITE_MASK, - dispatch_set.wr_mask_, - this->ready_set_.wr_mask_, - &ACE_Event_Handler::handle_output) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_Impl::dispatch - EXCEPT\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::EXCEPT_MASK, - dispatch_set.ex_mask_, - this->ready_set_.ex_mask_, - &ACE_Event_Handler::handle_exception) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Select_Reactor_Impl::dispatch - READ\n"))); - if (this->dispatch_io_set (number_of_active_handles, - number_dispatched, - ACE_Event_Handler::READ_MASK, - dispatch_set.rd_mask_, - this->ready_set_.rd_mask_, - &ACE_Event_Handler::handle_input) == -1) - { - number_of_active_handles -= number_dispatched; - return -1; - } - - number_of_active_handles -= number_dispatched; - return number_dispatched; -} - -int -ACE_Select_Reactor_Impl::dispatch (int number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::dispatch"); - - // The following do/while loop keeps dispatching as long as there - // are still active handles. Note that the only way we should ever - // iterate more than once through this loop is if signals occur - // while we're dispatching other handlers. - - do - { - // Note that we keep track of changes to our state. If any of - // the dispatch_*() methods below return -1 it means that the - // <wait_set_> state has changed as the result of an - // <ACE_Event_Handler> being dispatched. This means that we - // need to bail out and rerun the select() loop since our - // existing notion of handles in <dispatch_set> may no longer be - // correct. - // - // In the beginning, our state starts out unchanged. After - // every iteration (i.e., due to signals), our state starts out - // unchanged again. - - this->state_changed_ = 0; - - // Perform the Template Method for dispatching all the handlers. - - // Handle timers first since they may have higher latency - // constraints. - - if (this->dispatch_timer_handlers () == -1) - // State has changed or timer queue has failed, exit inner - // loop. - break; - - else if (number_of_active_handles <= 0) - // Bail out since we got here since select() was interrupted. - { - if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); - - // If any HANDLES in the <ready_set_> are activated as a - // result of signals they should be dispatched since - // they may be time critical... - number_of_active_handles = this->any_ready (dispatch_set); - } - else - return number_of_active_handles; - } - - // Next dispatch the notification handlers (if there are any to - // dispatch). These are required to handle multi-threads that - // are trying to update the <Reactor>. - - else if (this->dispatch_notification_handlers (number_of_active_handles, - dispatch_set) == -1) - break; // State has changed, exit inner loop. - - // Finally, dispatch the I/O handlers. - else if (this->dispatch_io_handlers (number_of_active_handles, - dispatch_set) == -1) - // State has changed, so exit the inner loop. - break; - } - while (number_of_active_handles > 0); - - return 1; -} - -int -ACE_Select_Reactor_Impl::handle_events_i (ACE_Time_Value *max_wait_time) -{ - int result = -1; - - ACE_SEH_TRY { - ACE_Select_Reactor_Handle_Set dispatch_set; - - int number_of_active_handles = - this->wait_for_multiple_events (dispatch_set, - max_wait_time); - - result = this->dispatch (number_of_active_handles, dispatch_set); - } - ACE_SEH_EXCEPT (this->release_token ()) { - // As it stands now, we catch and then rethrow all Win32 - // structured exceptions so that we can make sure to release the - // <token_> lock correctly. - } - - this->state_changed_ = 1; - - return result; -} - -int -ACE_Select_Reactor_Impl::check_handles (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Impl::check_handles"); - -#if defined (ACE_WIN32) || defined (__MVS__) - ACE_Time_Value time_poll = ACE_Time_Value::zero; - ACE_Handle_Set rd_mask; -#endif /* ACE_WIN32 || MVS */ - - ACE_Event_Handler *eh = 0; - int result = 0; - - for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); - iter.next (eh) != 0; - iter.advance ()) - { - ACE_HANDLE handle = eh->get_handle (); - - // Skip back to the beginning of the loop if the HANDLE is - // invalid. - if (handle == ACE_INVALID_HANDLE) - continue; - -#if defined (ACE_WIN32) || defined (__MVS__) - // Win32 needs to do the check this way because fstat won't work on - // a socket handle. MVS Open Edition needs to do it this way because, - // even though the docs say to check a handle with either select or - // fstat, the fstat method always says the handle is ok. - rd_mask.set_bit (handle); - - if (ACE_OS::select (int (handle) + 1, - rd_mask, 0, 0, - &time_poll) < 0) - { - result = 1; - this->remove_handler_i (handle, - ACE_Event_Handler::ALL_EVENTS_MASK); - } - rd_mask.clr_bit (handle); -#else /* !ACE_WIN32 && !MVS */ - struct stat temp; - - if (ACE_OS::fstat (handle, &temp) == -1) - { - result = 1; - this->remove_handler_i (handle, - ACE_Event_Handler::ALL_EVENTS_MASK); - } -#endif /* ACE_WIN32 || MVS */ - } - - return result; -} - diff --git a/ace/Select_Reactor_T.h b/ace/Select_Reactor_T.h index 008d506b749..d65366a9b61 100644 --- a/ace/Select_Reactor_T.h +++ b/ace/Select_Reactor_T.h @@ -9,9 +9,6 @@ // = FILENAME // Select_Reactor_T.h // -// = DESCRIPTION -// Implementations of Select_Reactor and the Token used by Select_Reactor. -// // = AUTHOR // Doug Schmidt // @@ -137,6 +134,15 @@ public: ACE_Reactor_Notify * = 0); // Initialize <ACE_Select_Reactor> with size <size>. + virtual int current_info (ACE_HANDLE, size_t & /* size */); + // Returns -1 (not used in this implementation); + + virtual int set_sig_handler (ACE_Sig_Handler *signal_handler); + // Use a user specified signal handler instead. + + virtual int set_timer_queue (ACE_Timer_Queue *timer_queue); + // Use a user specified timer queue instead. + virtual int close (void); // Close down the select_reactor and release all of its resources. @@ -146,6 +152,34 @@ public: // = Event loop drivers. virtual int handle_events (ACE_Time_Value *max_wait_time = 0); + virtual int alertable_handle_events (ACE_Time_Value *max_wait_time = 0); + // This event loop driver that blocks for <max_wait_time> before + // returning. It will return earlier if timer events, I/O events, + // or signal events occur. Note that <max_wait_time> can be 0, in + // which case this method blocks indefinitely until events occur. + // + // <max_wait_time> is decremented to reflect how much time this call + // took. For instance, if a time value of 3 seconds is passed to + // handle_events and an event occurs after 2 seconds, + // <max_wait_time> will equal 1 second. This can be used if an + // application wishes to handle events for some fixed amount of + // time. + // + // Returns the total number of <ACE_Event_Handler>s that were + // dispatched, 0 if the <max_wait_time> elapsed without dispatching + // any handlers, or -1 if something goes wrong. + // + // Current <alertable_handle_events> is identical to + // <handle_events>. + + virtual int handle_events (ACE_Time_Value &max_wait_time); + virtual int alertable_handle_events (ACE_Time_Value &max_wait_time); + // This method is just like the one above, except the + // <max_wait_time> value is a reference and can therefore never be + // NULL. + // + // Current <alertable_handle_events> is identical to + // <handle_events>. // = Register and remove <ACE_Event_Handler>s. virtual int register_handler (ACE_Event_Handler *eh, @@ -161,11 +195,47 @@ public: // <handle> is given the Select_Reactor will *not* call // eh->get_handle() to extract the underlying I/O handle. +#if defined (ACE_WIN32) + + // Originally this interface was available for all platforms, but + // because ACE_HANDLE is an int on non-Win32 platforms, compilers + // are not able to tell the difference between + // register_handler(ACE_Event_Handler*,ACE_Reactor_Mask) and + // register_handler(ACE_Event_Handler*,ACE_HANDLE). Therefore, we + // have restricted this method to Win32 only. + + virtual int register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle = ACE_INVALID_HANDLE); + // Not implemented. + +#endif /* ACE_WIN32 */ + + virtual int register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask); + // Not implemented. + virtual int register_handler (const ACE_Handle_Set &handles, ACE_Event_Handler *eh, ACE_Reactor_Mask mask); // Register <eh> with all the <handles> in the <Handle_Set>. + virtual int register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0, + ACE_Event_Handler **old_sh = 0, + ACE_Sig_Action *old_disp = 0); + // Register <new_sh> to handle the signal <signum> using the + // <new_disp>. Returns the <old_sh> that was previously registered + // (if any), along with the <old_disp> of the signal handler. + + virtual int register_handler (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0); + // Registers <new_sh> to handle a set of signals <sigset> using the + // <new_disp>. + virtual int remove_handler (ACE_Event_Handler *eh, ACE_Reactor_Mask mask); // Removes the <mask> binding of <eh> from the Select_Reactor. If @@ -185,6 +255,24 @@ public: // bind of <Event_Handler>. If there are no more bindings for any // of these handlers then they are removed from the Select_Reactor. + virtual int remove_handler (int signum, + ACE_Sig_Action *new_disp, + ACE_Sig_Action *old_disp = 0, + int sigkey = -1); + // Remove the ACE_Event_Handler currently associated with <signum>. + // <sigkey> is ignored in this implementation since there is only + // one instance of a signal handler. Install the new disposition + // (if given) and return the previous disposition (if desired by the + // caller). Returns 0 on success and -1 if <signum> is invalid. + + virtual int remove_handler (const ACE_Sig_Set &sigset); + // Calls <remove_handler> for every signal in <sigset>. + + // = Suspend and resume Handlers. + + virtual int suspend_handler (ACE_Event_Handler *eh); + // Temporarily suspend the <Event_Handler> associated with <eh>. + virtual int suspend_handler (ACE_HANDLE handle); // Temporarily suspend the <Event_Handler> associated with <handle>. @@ -194,6 +282,10 @@ public: virtual int suspend_handlers (void); // Suspend all the <Event_Handlers> in the Select_Reactor. + virtual int resume_handler (ACE_Event_Handler *eh); + // Resume a temporarily suspend <Event_Handler> associated with + // <eh>. + virtual int resume_handler (ACE_HANDLE handle); // Resume a temporarily suspended <Event_Handler> associated with // <handle>. @@ -204,6 +296,12 @@ public: virtual int resume_handlers (void); // Resume all the <Event_Handlers> in the Select_Reactor. + virtual int uses_event_associations (void); + // Return 1 if we any event associations were made by the reactor + // for the handles that it waits on, 0 otherwise. Since the + // Select_Reactor does not do any event associations, this function + // always return 0. + // = Timer management. virtual long schedule_timer (ACE_Event_Handler *, const void *arg, @@ -223,7 +321,56 @@ public: // timer. Returns -1 on failure (which is guaranteed never to be a // valid <timer_id>. + virtual int cancel_timer (ACE_Event_Handler *event_handler, + int dont_call_handle_close = 1); + // Cancel all <event_handlers> that match the address of + // <event_handler>. If <dont_call_handle_close> is 0 then the + // <handle_close> method of <event_handler> will be invoked. + // Returns number of handler's cancelled. + + virtual int cancel_timer (long timer_id, + const void **arg = 0, + int dont_call_handle_close = 1); + // Cancel the single <ACE_Event_Handler> that matches the <timer_id> + // value (which was returned from the <schedule> method). If arg is + // non-NULL then it will be set to point to the ``magic cookie'' + // argument passed in when the <Event_Handler> was registered. This + // makes it possible to free up the memory and avoid memory leaks. + // If <dont_call_handle_close> is 0 then the <handle_close> method + // of <event_handler> will be invoked. Returns 1 if cancellation + // succeeded and 0 if the <timer_id> wasn't found. + + // = High-level Event_Handler scheduling operations + + virtual int schedule_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // ADD the dispatch MASK "bit" bound with the <eh> and the <mask>. + + virtual int schedule_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + // ADD the dispatch MASK "bit" bound with the <handle> and the <mask>. + + virtual int cancel_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // CLR the dispatch MASK "bit" bound with the <eh> and the <mask>. + + virtual int cancel_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + // CLR the dispatch MASK "bit" bound with the <handle> and the <mask>. + // = Notification methods. + virtual int notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0); + // Called by a thread when it wants to unblock the Select_Reactor. + // This wakeups the <ACE_Select_Reactor> if currently blocked in + // select()/poll(). Pass over both the <Event_Handler> *and* the + // <mask> to allow the caller to dictate which <Event_Handler> + // method the <Select_Reactor> will invoke. The <ACE_Time_Value> + // indicates how long to blocking trying to notify the + // <Select_Reactor>. If <timeout> == 0, the caller will block until + // action is possible, else will wait until the relative time + // specified in *<timeout> elapses). virtual void max_notify_iterations (int); // Set the maximum number of times that the @@ -249,17 +396,33 @@ public: // Get position that the main ACE_Select_Reactor thread is requeued in the // list of waiters during a notify() callback. + // = Low-level wait_set mask manipulation methods. + virtual int mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops); + // GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and + // <mask>. + virtual int mask_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops); // GET/SET/ADD/CLR the dispatch MASK "bit" bound with the <handle> // and <mask>. + // = Low-level ready_set mask manipulation methods. + virtual int ready_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops); + // GET/SET/ADD/CLR the ready "bit" bound with the <eh> and <mask>. + virtual int ready_ops (ACE_HANDLE handle, ACE_Reactor_Mask, int ops); // GET/SET/ADD/CLR the ready "bit" bound with the <handle> and <mask>. + virtual void wakeup_all_threads (void); + // Wake up all threads in waiting in the event loop + // = Only the owner thread that can perform a <handle_events>. virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); @@ -276,9 +439,19 @@ public: // bound to <mask>. Return the <eh> associated with this <handler> // if <eh> != 0. + virtual int handler (int signum, + ACE_Event_Handler ** = 0); + // Check to see if <signum> is associated with a valid Event_Handler + // bound to a signal. Return the <eh> associated with this + // <handler> if <eh> != 0. + virtual int initialized (void); // Returns true if we've been successfully initialized, else false. + virtual size_t size (void); + // Returns the current size of the Reactor's internal descriptor + // table. + virtual ACE_Lock &lock (void); // Returns a reference to the <ACE_Select_Reactor_Token> that is // used to serialize the internal Select_Reactor's processing logic. @@ -293,19 +466,124 @@ public: // Declare the dynamic allocation hooks. protected: + // = Internal methods that do the actual work. + + // All of these methods assume that the <Select_Reactor>'s token + // lock is held by the public methods that call down to them. + + virtual int register_handler_i (ACE_HANDLE handle, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + // Do the work of actually binding the <handle> and <eh> with the + // <mask>. + + virtual int register_handler_i (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask); + // Register a set of <handles>. + + virtual int remove_handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask); + // Do the work of actually unbinding the <handle> and <eh> with the + // <mask>. + + virtual int remove_handler_i (const ACE_Handle_Set &handles, + ACE_Reactor_Mask); + // Remove a set of <handles>. + + virtual int suspend_i (ACE_HANDLE handle); + // Suspend the <Event_Handler> associated with <handle> + + virtual int resume_i (ACE_HANDLE handle); + // Resume the <Event_Handler> associated with <handle> + + virtual int handler_i (ACE_HANDLE handle, + ACE_Reactor_Mask, + ACE_Event_Handler ** = 0); + // Implement the public <handler> method. + + virtual int handler_i (int signum, ACE_Event_Handler ** = 0); + // Implement the public <handler> method. + + virtual int any_ready (ACE_Select_Reactor_Handle_Set &handle_set); + // Check if there are any HANDLEs enabled in the <ready_set_>, and + // if so, update the <handle_set> and return the number ready. If + // there aren't any HANDLEs enabled return 0. + + virtual int handle_error (void); + // Take corrective action when errors occur. + + virtual int check_handles (void); + // Make sure the handles are all valid. + + virtual int wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &, + ACE_Time_Value *); + // Wait for events to occur. + + // = Dispatching methods. + + virtual int dispatch (int nfound, + ACE_Select_Reactor_Handle_Set &); + // Template Method that dispatches <ACE_Event_Handler>s for time + // events, I/O events, and signal events. Returns the total number + // of <ACE_Event_Handler>s that were dispatched or -1 if something + // goes wrong. + + virtual int dispatch_timer_handlers (void); + // Dispatch any expired timer handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of timer + // handlers dispatched. + + virtual int dispatch_notification_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set); + // Dispatch any notification handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of handlers + // notified. + + virtual int dispatch_io_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set); + // Dispatch all the input/output/except handlers that are enabled in + // the <dispatch_set>. Returns -1 if the state of the <wait_set_> + // has changed, else returns number of handlers dispatched. + + virtual int dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback); + // Factors the dispatching of an io handle set (each WRITE, EXCEPT + // or READ set of handles). + // It updates the number of handles already dispatched and + // invokes this->notify_handle for all the handles in <dispatch_set> + // using the <mask>, <ready_set> and <callback> parameters. + // Must return -1 if this->state_changed otherwise it must return 0. + + virtual void notify_handle (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &, + ACE_Event_Handler *eh, + ACE_EH_PTMF callback); + // Notify the appropriate <callback> in the context of the <eh> + // associated with <handle> that a particular event has occurred. + virtual void renew (void); // Enqueue ourselves into the list of waiting threads at the // appropriate point specified by <requeue_position_>. - virtual int release_token (void); - // Release the token lock when a Win32 structured exception occurs. - ACE_SELECT_REACTOR_TOKEN token_; // Synchronization token for the MT_SAFE ACE_Select_Reactor. ACE_Lock_Adapter<ACE_SELECT_REACTOR_TOKEN> lock_adapter_; // Adapter used to return internal lock to outside world. + int release_token (void); + // Release the token lock when a Win32 structured exception occurs. + + int handle_events_i (ACE_Time_Value *max_wait_time = 0); + // Stops the VC++ compiler from bitching about exceptions and destructors + + private: ACE_Select_Reactor_T (const ACE_Select_Reactor_T &); ACE_Select_Reactor_T &operator = (const ACE_Select_Reactor_T &); diff --git a/ace/Select_Reactor_T.i b/ace/Select_Reactor_T.i index 15e04a75d7a..8d2432d5b5e 100644 --- a/ace/Select_Reactor_T.i +++ b/ace/Select_Reactor_T.i @@ -4,6 +4,13 @@ #include "ace/Reactor.h" template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_Event_Handler *h) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); + return this->resume_handler (h->get_handle ()); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (const ACE_Handle_Set &handles) { ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); @@ -20,6 +27,13 @@ ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (const ACE_Handle } template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (ACE_Event_Handler *h) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); + return this->suspend_handler (h->get_handle ()); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (const ACE_Handle_Set &handles) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); @@ -35,6 +49,149 @@ ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (const ACE_Handl return 0; } +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp, + ACE_Event_Handler **old_sh, + ACE_Sig_Action *old_disp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + return this->signal_handler_->register_handler (signum, + new_sh, new_disp, + old_sh, old_disp); +} + +#if defined (ACE_WIN32) + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (event_handler); + ACE_UNUSED_ARG (event_handle); + ACE_NOTSUP_RETURN (-1); +} + +#endif /* ACE_WIN32 */ + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (event_handle); + ACE_UNUSED_ARG (io_handle); + ACE_UNUSED_ARG (event_handler); + ACE_UNUSED_ARG (mask); + ACE_NOTSUP_RETURN (-1); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler (int signum, ACE_Event_Handler **handler) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler"); + return this->handler_i (signum, handler); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (int signum, + ACE_Sig_Action *new_disp, + ACE_Sig_Action *old_disp, + int sigkey) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + return this->signal_handler_->remove_handler (signum, new_disp, old_disp, sigkey); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::uses_event_associations (void) +{ + // Since the Select_Reactor does not do any event associations, this + // function always return 0. + return 0; +} + +// = The remaining methods in this file must be called with locks +// held. Note the queue handles its own locking. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (ACE_Event_Handler *handler, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + return this->timer_queue_ != 0 && + this->timer_queue_->cancel (handler, dont_call_handle_close); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (long timer_id, + const void **arg, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + return this->timer_queue_->cancel (timer_id, + arg, + dont_call_handle_close); +} + +// Performs operations on the "ready" bits. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ready_ops (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ready_ops"); + return this->ready_ops (handler->get_handle (), mask, ops); +} + +// Performs operations on the "dispatch" masks. + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::mask_ops (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::mask_ops"); + return this->mask_ops (handler->get_handle (), mask, ops); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_wakeup"); + return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_wakeup (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_wakeup"); + return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_wakeup"); + return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_wakeup (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_wakeup"); + return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK); +} + template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE ACE_Lock & ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::lock (void) { @@ -42,3 +199,28 @@ ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::lock (void) return this->lock_adapter_; } +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wakeup_all_threads (void) +{ + // Send a notification, but don't block if there's no one to receive + // it. + this->notify (0, ACE_Event_Handler::NULL_MASK, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::alertable_handle_events (ACE_Time_Value *max_wait_time) +{ + return this->handle_events (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::alertable_handle_events (ACE_Time_Value &max_wait_time) +{ + return this->handle_events (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_INLINE size_t +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::size (void) +{ + return this->handler_rep_.size (); +} |