diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-08-24 21:23:56 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-08-24 21:23:56 +0000 |
commit | a5ca76d4d27526f2905fff8b9bb0f0f297dd68ed (patch) | |
tree | ba223c951bf1eba2a7bb5a9e6043a82f3dadf2cf /ace | |
parent | 9be313215bd4606577708f16c7770bbe9913c7d4 (diff) | |
download | ATCD-a5ca76d4d27526f2905fff8b9bb0f0f297dd68ed.tar.gz |
ChangeLogTag: Fri Aug 24 16:10:20 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'ace')
-rw-r--r-- | ace/Reactor_Impl.h | 9 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 45 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 9 | ||||
-rw-r--r-- | ace/TP_Reactor.cpp | 636 | ||||
-rw-r--r-- | ace/TP_Reactor.h | 100 | ||||
-rw-r--r-- | ace/TP_Reactor.i | 93 | ||||
-rw-r--r-- | ace/Timer_Queue.cpp | 2 | ||||
-rw-r--r-- | ace/Timer_Queue.h | 2 | ||||
-rw-r--r-- | ace/Timer_Queue_T.cpp | 53 | ||||
-rw-r--r-- | ace/Timer_Queue_T.h | 43 | ||||
-rw-r--r-- | ace/Timer_Queue_T.i | 23 |
11 files changed, 696 insertions, 319 deletions
diff --git a/ace/Reactor_Impl.h b/ace/Reactor_Impl.h index 6dd4dcbe6b9..9a279e1dea8 100644 --- a/ace/Reactor_Impl.h +++ b/ace/Reactor_Impl.h @@ -69,6 +69,15 @@ public: virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) = 0; + /// Returns the ACE_HANDLE of the notify pipe on which the reactor + /// is listening for notifications so that other threads can unblock + /// the <Reactor_Impl> + virtual ACE_HANDLE notify_handle (void) = 0; + + /// Handle one of the notify call on the <handle>. This could be + /// because of a thread trying to unblock the <Reactor_Impl> + virtual int dispatch_notify (ACE_HANDLE handle) = 0; + /** * Set the maximum number of times that the <handle_input> method * will iterate and dispatch the <ACE_Event_Handlers> that are diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 4eef29a9b6d..12402471eba 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -766,23 +766,30 @@ ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles return 0; } + +ACE_HANDLE +ACE_Select_Reactor_Notify::notify_handle (void) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); + + return this->notification_pipe_.read_handle (); +} + + // Special trick to unblock <select> when updates occur in somewhere // other than the main <ACE_Select_Reactor> thread. All we do is // write data to a pipe that the <ACE_Select_Reactor> is listening on. // Thanks to Paul Stephenson for suggesting this approach. int -ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) +ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle) { - ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); - // Precondition: this->select_reactor_.token_.current_owner () == - // ACE_Thread::self (); + ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notify"); ACE_Notification_Buffer buffer; - ssize_t n; - int number_dispatched = 0; + ssize_t n = 0; - while ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) + if ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) { // Check to see if we've got a short read. if (n != sizeof buffer) @@ -895,6 +902,28 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + return 1; + } + + // Return -1 if things have gone seriously wrong. + if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) + return -1; + + return 0; +} + + +int +ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); + // Precondition: this->select_reactor_.token_.current_owner () == + // ACE_Thread::self (); + + int number_dispatched = 0; + int result = 0; + while ((result = this->dispatch_notify (handle) > 0)) + { number_dispatched++; // Bail out if we've reached the <notify_threshold_>. Note that @@ -906,7 +935,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) // Reassign number_dispatched to -1 if things have gone seriously // wrong. - if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) + if (result < 0) number_dispatched = -1; // Enqueue ourselves into the list of waiting threads. When we diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index dfffc0b35fc..9d60cb1f8aa 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -144,6 +144,15 @@ public: virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask); + /// Returns the ACE_HANDLE of the notify pipe on which the reactor + /// is listening for notifications so that other threads can unblock + /// the Select_Reactor + virtual ACE_HANDLE notify_handle (void); + + /// Handle one of the notify call on the <handle>. This could be + /// because of a thread trying to unblock the <Reactor_Impl> + virtual int dispatch_notify (ACE_HANDLE handle); + /// Called back by the <ACE_Select_Reactor> when a thread wants to /// unblock us. virtual int handle_input (ACE_HANDLE handle); diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp index 9b3881ea3b4..d7b38157e07 100644 --- a/ace/TP_Reactor.cpp +++ b/ace/TP_Reactor.cpp @@ -4,6 +4,7 @@ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" +//#include "ace/Thread.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" @@ -14,6 +15,51 @@ ACE_RCSID(ace, TP_Reactor, "$Id$") ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor) +int +ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_TP_Token_Guard::grab_token"); + + // The order of these events is very subtle, modify with care. + + // Try to grab the lock. If someone if already there, don't wake + // them up, just queue up in the thread pool. + int result = 0; + + if (max_wait_time) + { + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + + ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook, + 0, + &tv)); + } + else + { + ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); + } + + // Now that this thread owns the token let us make + // Check for timeouts and errors. + if (result == -1) + { + if (errno == ETIME) + return 0; + else + return -1; + } + + // We got the token and so let us mark ourseleves as owner + this->owner_ = 1; + + return result; +} + +/************************************************************************/ +// Methods for ACE_TP_Reactor +/************************************************************************/ + ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals) @@ -34,25 +80,15 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size, this->supress_notify_renew (1); } -int -ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - if (o_id) - *o_id = ACE_Thread::self (); - - return 0; -} -int -ACE_TP_Reactor::owner (ACE_thread_t *t_id) +void +ACE_TP_Reactor::max_notify_iterations (int /*iterations*/) { - ACE_TRACE ("ACE_TP_Reactor::owner"); - *t_id = ACE_Thread::self (); - - return 0; + ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_ERROR ((LM_ERROR, + "(%P|%t) This has no effect in the TP_Reactor.. \n")); } @@ -61,100 +97,107 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); + int result = 0; + // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); - // The order of these events is very subtle, modify with care. + // Instantiate the token guard which will try grabbing the token for + // this thread. + ACE_TP_Token_Guard guard (this->token_, + max_wait_time, + result); - // Try to grab the lock. If someone if already there, don't wake - // them up, just queue up in the thread pool. - int result = 0; - - if (max_wait_time) - { - ACE_Time_Value tv = ACE_OS::gettimeofday (); - tv += *max_wait_time; - - ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook, - 0, - &tv)); - } - else - { - ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); - } + // If the guard is NOT the owner just return the retval + if (!guard.is_owner ()) + return result; // Update the countdown to reflect time waiting for the token. countdown.update (); - // Check for timeouts and errors. - if (result == -1) - { - if (errno == ETIME) - return 0; - else - return -1; - } // After acquiring the lock, check if we have been deactivated. If // we are deactivated, simply return without handling further // events. if (this->deactivated_) { - ACE_MT (this->token_.release ()); return -1; } - // We got the lock, lets handle some events. Note: this method will - // *not* dispatch any I/O handlers. It will dispatch signals, - // timeouts, and notifications. - ACE_EH_Dispatch_Info dispatch_info; - result = this->dispatch_i_protected (max_wait_time, dispatch_info); - if (result == -1) + // We got the lock, lets handle some events. We collect the events + // that we need to handle. We release the token and then handle + // those events that needs handling. + int event_count = + this->get_event_for_dispatching (max_wait_time); + + + // Dispatch signals + if (event_count == -1) { - ACE_MT (this->token_.release ()); - return -1; + // Looks like we dont do any upcalls in dispatch signals. If at + // a later point of time, we decide to handle signals we have to + // release the lock before we make any upcalls.. What is here + // now is not the right thing... + // @@ We need to do better.. + return this->handle_signals (event_count, + guard); } - // If there is any event handler that is ready to be dispatched, the - // dispatch information is recorded in dispatch_info. - if (dispatch_info.dispatch ()) + if (event_count > 0) { - // Suspend the handler so that other threads don't start - // dispatching it. - // Make sure we never suspend the notify_handler_ without holding - // the lock. - // @@ Actually, we don't even need to suspend the notify_handler_ - // here. But let me get it to work first. - if (dispatch_info.event_handler_ != this->notify_handler_) - this->suspend_i (dispatch_info.handle_); - } + // If there are no signals and if we had received a proper + // event_count then first look at dispatching timeouts. We need to + // handle timers early since they may have higher latency + // constraints than I/O handlers. Ideally, the order of + // dispatching should be a strategy... + int retval = this->handle_timer_events (event_count, + guard); + + if (retval > 0) + return retval; - // Release the lock. Others threads can start waiting. - ACE_MT (this->token_.release ()); + // Else just fall through for further handling + } - // If there was an event handler ready, dispatch it. - if (dispatch_info.dispatch ()) + + if (event_count > 0) { - if (this->notify_handle (dispatch_info) == 0) - ++result; // Dispatched one more event - int flag = 0; + // Next dispatch the notification handlers (if there are any to + // dispatch). These are required to handle multiple-threads that + // are trying to update the <Reactor>. + int retval = this->handle_notify_events (event_count, + guard); - if (dispatch_info.event_handler_ != 0) - { - flag = - dispatch_info.event_handler_->resume_handler (); - } + if (retval > 0) + return retval; - if (dispatch_info.handle_ != ACE_INVALID_HANDLE && - dispatch_info.event_handler_ != this->notify_handler_ && - flag == 0) - this->resume_handler (dispatch_info.handle_); + // Else just fall through for further handling } - return result; + + if (event_count > 0) + { + // Handle socket events + return this->handle_socket_events (event_count, + guard); + } + + return 0; +} + +int +ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) +{ + ACE_TRACE ("ACE_TP_Reactor::handle_events"); + return this->handle_events (&max_wait_time); +} + +int +ACE_TP_Reactor::resumable_handler (void) +{ + return 1; } int @@ -164,7 +207,7 @@ ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, { ACE_TRACE ("ACE_TP_Reactor::mask_ops"); ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, - ace_mon, this->token_, -1)); + ace_mon, this->token_, -1)); int result = 0; @@ -187,20 +230,42 @@ ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, return result; } -void -ACE_TP_Reactor::no_op_sleep_hook (void *) + +int +ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops) { + ACE_TRACE ("ACE_TP_Reactor::mask_ops"); + return this->mask_ops (eh->get_handle (), mask, ops); } int -ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, - ACE_EH_Dispatch_Info &event) +ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { - int result = -1; + ACE_TRACE ("ACE_TP_Reactor::owner"); + if (o_id) + *o_id = ACE_Thread::self (); - event.reset (); // Nothing to dispatch yet + return 0; + +} + +int +ACE_TP_Reactor::owner (ACE_thread_t *t_id) +{ + ACE_TRACE ("ACE_TP_Reactor::owner"); + *t_id = ACE_Thread::self (); + return 0; +} + + +int +ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching"); // If the reactor handler state has changed, clear any remembered // ready bits and re-scan from the master wait_set. if (this->state_changed_) @@ -216,99 +281,226 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, // I don't understand...) the mask will have all of its bits clear, // yet have a size_ > 0. This is an attempt to remedy the affect, // without knowing why it happens. - - //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500)) - // SunCC seems to be having problems with this piece of code - // here. I am not sure why though. This works fine with other - // compilers. As we dont seem to understand when this piece of - // code is needed and as it creates problems for SunCC we will - // not compile this. Most of the tests in TAO seem to be happy - // without this in SunCC. this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); - //# endif /* ! __SUNPRO_CC */ - } - int active_handle_count = this->wait_for_multiple_events (this->ready_set_, - max_wait_time); - - int handlers_dispatched = 0; - int signal_occurred = 0; + return this->wait_for_multiple_events (this->ready_set_, + max_wait_time); +} - // Note that we keep track of changes to our state. If any of - // the dispatching ends up with this->state_changed_ being set, - // <wait_set_> state has changed as the result of an - // <ACE_Event_Handler> being dispatched. This means that we - // need to bail out and rerun the select() again since our - // existing notion of handles in <dispatch_set_> may no longer be - // correct. +int +ACE_TP_Reactor::handle_signals (int & /*event_count*/, + ACE_TP_Token_Guard & /*guard*/) +{ + ACE_TRACE ("ACE_TP_Reactor::handle_signals"); + + /* + * + * THIS METHOD SEEMS BROKEN + * + * + */ // First check for interrupts. - if (active_handle_count == -1) + // Bail out -- we got here since <select> was interrupted. + if (ACE_Sig_Handler::sig_pending () != 0) { - // Bail out -- we got here since <select> was interrupted. - if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); + ACE_Sig_Handler::sig_pending (0); + // This piece of code comes from the old TP_Reactor. We did not + // handle signals at all then. If we happen to handle signals + // in the TP_Reactor, we should then start worryiung about this + // - Bala 21-Aug- 01 #if 0 - // Not sure if this should be done in the TP_Reactor - // case... leave it out for now. -Steve Huston 22-Aug-00 - - // If any HANDLES in the <ready_set_> are activated as a - // result of signals they should be dispatched since - // they may be time critical... - active_handle_count = this->any_ready (dispatch_set); -#else - active_handle_count = 0; + // Not sure if this should be done in the TP_Reactor + // case... leave it out for now. -Steve Huston 22-Aug-00 + + // If any HANDLES in the <ready_set_> are activated as a + // result of signals they should be dispatched since + // they may be time critical... + active_handle_count = this->any_ready (dispatch_set); + #else + // active_handle_count = 0; #endif - // Record the fact that the Reactor has dispatched a - // handle_signal() method. We need this to return the - // appropriate count below. - signal_occurred = 1; + // Record the fact that the Reactor has dispatched a + // handle_signal() method. We need this to return the + // appropriate count. + return 1; + } + + return -1; +} + + +int +ACE_TP_Reactor::handle_timer_events (int &event_count, + ACE_TP_Token_Guard &guard) +{ + // Get the current time + ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () + + this->timer_queue_->timer_skew ()); + + // Look for a node in the timer queue whose timer <= the present + // time. + ACE_Timer_Node_Dispatch_Info info; + + if (this->timer_queue_->dispatch_info (cur_time, + info)) + { + // Decrement the number of events that needs handling yet. + event_count--; + + // Release the token before dispatching notifies... + guard.release_token (); + + // call the functor + this->timer_queue_->upcall (info.type_, + info.act_, + cur_time); + + // We have dispatched a timer + return 1; + } + + return 0; +} + + +int +ACE_TP_Reactor::handle_notify_events (int &event_count, + ACE_TP_Token_Guard &guard) +{ + // Get the handle on which notify calls could have occured + ACE_HANDLE notify_handle = + this->get_notify_handle (); + + if (notify_handle != ACE_INVALID_HANDLE) + { + // Clear the handle of the read_mask of our <ready_set_> + this->ready_set_.rd_mask_.clr_bit (notify_handle); + + // Decrement the number of events that needs handling yet. + event_count--; + + // Release the token before dispatching notifies... + guard.release_token (); + + // Dipatch and return + return + this->notify_handler_->dispatch_notify (notify_handle); + } + + return 0; +} + +int +ACE_TP_Reactor::handle_socket_events (int &event_count, + ACE_TP_Token_Guard &guard) +{ + ACE_TRACE ("ACE_TP_Reactor::handle_socket_events"); + + ACE_EH_Dispatch_Info dispatch_info; + + // Get the socket event dispatch information + // If there is any event handler that is ready to be dispatched, the + // dispatch information is recorded in dispatch_info. + int result = + this->get_socket_event_info (dispatch_info); + + if (result) + { + // Suspend the handler so that other threads don't start + // dispatching it. + this->suspend_i (dispatch_info.handle_); + + // Decrement the number of events that needs handling yet. + event_count--; + + // Release the token before dispatching notifies... + guard.release_token (); + + result = this->dispatch_socket_events (dispatch_info); + + int flag = 0; + + if (dispatch_info.event_handler_ != 0) + { + flag = + dispatch_info.event_handler_->resume_handler (); } - else - return -1; + + if (dispatch_info.handle_ != ACE_INVALID_HANDLE && + flag == 0) + this->resume_handler (dispatch_info.handle_); } - // Handle timers early since they may have higher latency - // constraints than I/O handlers. Ideally, the order of - // dispatching should be a strategy... - this->dispatch_timer_handlers (handlers_dispatched); + return result; +} - // If either the state has changed as a result of timer - // expiry, or there are no handles ready for dispatching, - // all done for now. - if (this->state_changed_ || active_handle_count == 0) - return signal_occurred + handlers_dispatched; - // Next dispatch the notification handlers (if there are any to - // dispatch). These are required to handle multi-threads that - // are trying to update the <Reactor>. +ACE_HANDLE +ACE_TP_Reactor::get_notify_handle (void) +{ + // Call the notify handler to get a handle on which we would have a + // notify waiting + ACE_HANDLE read_handle = + this->notify_handler_->notify_handle (); + + // Check whether the rd_mask has been set on that handle. If so + // return the handle. + if (read_handle != ACE_INVALID_HANDLE && + this->ready_set_.rd_mask_.is_set (read_handle)) + { + return read_handle; + } + + return ACE_INVALID_HANDLE; +} + - this->dispatch_notification_handlers (this->ready_set_, - active_handle_count, - handlers_dispatched); - // If one of those changed the state, return. - if (this->state_changed_ || active_handle_count == 0) - return signal_occurred + handlers_dispatched; + +int +ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) +{ // Check for dispatch in write, except, read. Only catch one, but if // one is caught, be sure to clear the handle from each mask in case // there is more than one mask set for it. This would cause problems // if the handler is suspended for dispatching, but its set bit in // another part of ready_set_ kept it from being dispatched. - int found_io = 0; ACE_HANDLE handle; + // Look at the read masks + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::READ_MASK, + &ACE_Event_Handler::handle_input); + this->ready_set_.rd_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + return 1; + } + } + + // We havent found any rd_masks for processing yet, so look for + // write masks { ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_); - while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) { if (this->is_suspended_i (handle)) continue; @@ -321,89 +513,45 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, this->ready_set_.wr_mask_.clr_bit (handle); this->ready_set_.ex_mask_.clr_bit (handle); this->ready_set_.rd_mask_.clr_bit (handle); - found_io = 1; + return 1; } } - if (!found_io) - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); - - while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; - - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::EXCEPT_MASK, - &ACE_Event_Handler::handle_exception); - this->ready_set_.ex_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.rd_mask_.clr_bit (handle); - found_io = 1; - } - } - - if (!found_io) - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); - - while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; - - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::READ_MASK, - &ACE_Event_Handler::handle_input); - this->ready_set_.rd_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.ex_mask_.clr_bit (handle); - found_io = 1; - } - } - - result = signal_occurred + handlers_dispatched; - - return result; -} + // We havent found any rd_mask and wr_masks for processing yet, so + // look for ex_masks + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); -int -ACE_TP_Reactor::dispatch_i_protected (ACE_Time_Value *max_wait_time, - ACE_EH_Dispatch_Info &event) -{ - int result; + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; - ACE_SEH_TRY - { - result = this->dispatch_i (max_wait_time, event); - } - ACE_SEH_EXCEPT (this->release_token ()) - { - // As it stands now, we catch and then rethrow all Win32 - // structured exceptions so that we can make sure to release the - // <token_> lock correctly. + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::EXCEPT_MASK, + &ACE_Event_Handler::handle_exception); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + return 1; + } } - return result; - + // We didnt find any.. + return 0; } - -// Dispatches a single event handler int -ACE_TP_Reactor::notify_handle (ACE_EH_Dispatch_Info &dispatch_info) +ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) { - ACE_TRACE ("ACE_TP_Reactor::notify_handle"); + ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events"); - ACE_HANDLE handle = dispatch_info.handle_; - ACE_Event_Handler *event_handler = dispatch_info.event_handler_; - ACE_Reactor_Mask mask = dispatch_info.mask_; - ACE_EH_PTMF callback = dispatch_info.callback_; + ACE_HANDLE handle = info.handle_; + ACE_Event_Handler *event_handler = info.event_handler_; + ACE_Reactor_Mask mask = info.mask_; + ACE_EH_PTMF callback = info.callback_; // Check for removed handlers. if (event_handler == 0) @@ -425,55 +573,23 @@ ACE_TP_Reactor::notify_handle (ACE_EH_Dispatch_Info &dispatch_info) this->remove_handler (handle, mask); // As the handler is no longer valid, invalidate the handle - dispatch_info.event_handler_ = 0; - dispatch_info.handle_ = ACE_INVALID_HANDLE; + info.event_handler_ = 0; + info.handle_ = ACE_INVALID_HANDLE; return retval; } - // assert (status >= 0); - return 0; -} - -int -ACE_TP_Reactor::resumable_handler (void) -{ return 1; } -ACE_EH_Dispatch_Info::ACE_EH_Dispatch_Info (void) -{ - this->reset (); -} - void -ACE_EH_Dispatch_Info::set (ACE_HANDLE handle, - ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask, - ACE_EH_PTMF callback) -{ - this->dispatch_ = 1; - - this->handle_ = handle; - this->event_handler_ = event_handler; - this->mask_ = mask; - this->callback_ = callback; -} - -void -ACE_EH_Dispatch_Info::reset (void) -{ - this->dispatch_ = 0; - - this->handle_ = ACE_INVALID_HANDLE; - this->event_handler_ = 0; - this->mask_ = ACE_Event_Handler::NULL_MASK; - this->callback_ = 0; -} - -int -ACE_EH_Dispatch_Info::dispatch (void) const +ACE_TP_Reactor::notify_handle (ACE_HANDLE, + ACE_Reactor_Mask, + ACE_Handle_Set &, + ACE_Event_Handler *, + ACE_EH_PTMF) { - return this->dispatch_; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called"))); } diff --git a/ace/TP_Reactor.h b/ace/TP_Reactor.h index 65f85383058..61043e01db2 100644 --- a/ace/TP_Reactor.h +++ b/ace/TP_Reactor.h @@ -32,12 +32,13 @@ #include "ace/pre.h" #include "ace/Select_Reactor.h" -#include "ace/Log_Msg.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ + /** * @class ACE_EH_Dispatch_Info * @@ -71,6 +72,61 @@ private: ACE_UNIMPLEMENTED_FUNC (ACE_EH_Dispatch_Info &operator= (const ACE_EH_Dispatch_Info &)) }; + +/** + * @class ACE_TP_Token_Guard + * + * @brief A helper class that helps grabbing, releasing and waiting + * on tokens for a thread that tries calling handle_events (). + * + * In short, this class will be owned by one thread by creating on the + * stack. This class gives the status of the ownership of the token + * and manages the ownership + */ + +class ACE_Export ACE_TP_Token_Guard +{ +public: + + /// Constructor that will grab the token for us + ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token, + ACE_Time_Value *max_wait_time, + int &result); + + /// Destructor. This will release the token if it hasnt been + /// released till this point + ~ACE_TP_Token_Guard (void); + + /// Release the token .. + void release_token (void); + + /// Returns whether the thread that created this object ownes the + /// token or not. + int is_owner (void); + +private: + + /// A helper method that grabs the token for us, after which the + /// thread that owns that can do some actual work. + int grab_token (ACE_Time_Value *max_wait_time); + +private: + + /// The Select Reactor token. + ACE_Select_Reactor_Token &token_; + + /// Flag that indicate whether the thread that created this object + /// owns the token or not. A value of 0 indicates that this class + /// hasnt got the token (and hence the thread) and a value of 1 + /// vice-versa. + int owner_; + +private: + + ACE_UNIMPLEMENTED_FUNC (ACE_TP_Token_Guard (void)) +}; + + /** * @class ACE_TP_Reactor * @@ -128,6 +184,9 @@ public: ACE_Timer_Queue * = 0, int mask_signals = 1); + // = Reactor calls + virtual void max_notify_iterations (int iter); + // = Event loop drivers. /** @@ -187,21 +246,23 @@ public: protected: // = Internal methods that do the actual work. - /** - * Dispatch signal, timer, notification handlers and return possibly - * 1 I/O handler for dispatching. Ideally, it would dispatch nothing, - * and return dispatch information for only one of (signal, timer, - * notification, I/O); however, the reactor mechanism is too enmeshed - * in the timer queue expiry functions and the notification class to - * do this without some significant redesign. - */ - int dispatch_i (ACE_Time_Value *max_wait_time, - ACE_EH_Dispatch_Info &event); + /// Get the event that needs dispatching.It could be either a + /// signal, timer, notification handlers or return possibly 1 I/O + /// handler for dispatching. In the most common use case, this would + /// return 1 I/O handler for dispatching + int get_event_for_dispatching (ACE_Time_Value *max_wait_time); + + int handle_signals (int &event_count, + ACE_TP_Token_Guard &g); + + int handle_timer_events (int &event_count, + ACE_TP_Token_Guard &g); - int dispatch_i_protected (ACE_Time_Value *max_wait_time, - /// Only really does anything for Win32. Wraps a call to dispatch_i in an - /// ACE_SEH_TRY block. - ACE_EH_Dispatch_Info &event); + int handle_notify_events (int &event_count, + ACE_TP_Token_Guard &g); + + int handle_socket_events (int &event_count, + ACE_TP_Token_Guard &g); /// This method shouldn't get called. virtual void notify_handle (ACE_HANDLE handle, @@ -209,10 +270,13 @@ protected: ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback); +private: + + ACE_HANDLE get_notify_handle (void); + + int get_socket_event_info (ACE_EH_Dispatch_Info &info); - /// Notify the appropriate <callback> in the context of the <eh> - /// associated with <handle> that a particular event has occurred. - virtual int notify_handle (ACE_EH_Dispatch_Info &dispatch_info); + int dispatch_socket_events (ACE_EH_Dispatch_Info &info); private: /// Deny access since member-wise won't work... diff --git a/ace/TP_Reactor.i b/ace/TP_Reactor.i index 091124adbef..3ea299aa131 100644 --- a/ace/TP_Reactor.i +++ b/ace/TP_Reactor.i @@ -1,27 +1,92 @@ /* -*- C++ -*- */ // $Id$ +/************************************************************************/ +// Methods for ACE_EH_Dispatch_Info +/************************************************************************/ + +ACE_INLINE +ACE_EH_Dispatch_Info::ACE_EH_Dispatch_Info (void) +{ + this->reset (); +} + +ACE_INLINE void +ACE_EH_Dispatch_Info::set (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + ACE_EH_PTMF callback) +{ + this->dispatch_ = 1; + + this->handle_ = handle; + this->event_handler_ = event_handler; + this->mask_ = mask; + this->callback_ = callback; +} + +ACE_INLINE void +ACE_EH_Dispatch_Info::reset (void) +{ + this->dispatch_ = 0; + + this->handle_ = ACE_INVALID_HANDLE; + this->event_handler_ = 0; + this->mask_ = ACE_Event_Handler::NULL_MASK; + this->callback_ = 0; +} + ACE_INLINE int -ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) +ACE_EH_Dispatch_Info::dispatch (void) const { - return ACE_Select_Reactor::handle_events (max_wait_time); + return this->dispatch_; +} + +/************************************************************************/ +// Methods for ACE_TP_Token_Guard +/************************************************************************/ + +ACE_INLINE +ACE_TP_Token_Guard::ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token, + ACE_Time_Value *max_wait_time, + int &result) + :token_ (token), + owner_ (0) +{ + result = this->grab_token (max_wait_time); +} + +ACE_INLINE +ACE_TP_Token_Guard::~ACE_TP_Token_Guard (void) +{ + if (this->owner_) + { + ACE_MT (this->token_.release ()); + this->owner_ = 0; + } +} + +ACE_INLINE void +ACE_TP_Token_Guard::release_token (void) +{ + ACE_MT (this->token_.release ()); + + // We are not the owner anymore.. + this->owner_ = 0; } ACE_INLINE int -ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops) +ACE_TP_Token_Guard::is_owner (void) { - return this->mask_ops (eh->get_handle (), mask, ops); + return this->owner_; } + +/************************************************************************/ +// Methods for ACE_TP_Reactor +/************************************************************************/ + ACE_INLINE void -ACE_TP_Reactor::notify_handle (ACE_HANDLE, - ACE_Reactor_Mask, - ACE_Handle_Set &, - ACE_Event_Handler *, - ACE_EH_PTMF) -{ - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called"))); +ACE_TP_Reactor::no_op_sleep_hook (void *) +{ } diff --git a/ace/Timer_Queue.cpp b/ace/Timer_Queue.cpp index 931da8697e0..cfcfcd13ae6 100644 --- a/ace/Timer_Queue.cpp +++ b/ace/Timer_Queue.cpp @@ -37,6 +37,7 @@ template class template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Event_Handler *> *>; template class ACE_Node<ACE_Timer_Node_T<ACE_Event_Handler *> *>; template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Event_Handler *> *>; +template class ACE_Timer_Node_Dispatch_Info_T<ACE_Event_Handler *>; template class ACE_Timer_Node_T<ACE_Event_Handler *>; template class ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX>, ACE_SYNCH_RECURSIVE_MUTEX>; template class ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX>, ACE_SYNCH_RECURSIVE_MUTEX>; @@ -47,6 +48,7 @@ template class ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX #pragma instantiate ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Event_Handler *> *> #pragma instantiate ACE_Node<ACE_Timer_Node_T<ACE_Event_Handler *> *> #pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Event_Handler *> *> +#pragma instantiate ACE_Timer_Node_Dispatch_Info_T<ACE_Event_Handler *> #pragma instantiate ACE_Timer_Node_T<ACE_Event_Handler *> #pragma instantiate ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX>, ACE_SYNCH_RECURSIVE_MUTEX> #pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_SYNCH_RECURSIVE_MUTEX>, ACE_SYNCH_RECURSIVE_MUTEX> diff --git a/ace/Timer_Queue.h b/ace/Timer_Queue.h index 45c5b5640d9..a9ea9cfa3d7 100644 --- a/ace/Timer_Queue.h +++ b/ace/Timer_Queue.h @@ -25,6 +25,8 @@ // The following typedef are here for ease of use and backward // compatibility. +typedef ACE_Timer_Node_Dispatch_Info_T<ACE_Event_Handler *> + ACE_Timer_Node_Dispatch_Info; typedef ACE_Timer_Node_T<ACE_Event_Handler *> ACE_Timer_Node; diff --git a/ace/Timer_Queue_T.cpp b/ace/Timer_Queue_T.cpp index 73c2ed5f1de..7aab05bf36a 100644 --- a/ace/Timer_Queue_T.cpp +++ b/ace/Timer_Queue_T.cpp @@ -45,6 +45,8 @@ ACE_Timer_Node_T<TYPE>::~ACE_Timer_Node_T (void) ACE_TRACE ("ACE_Timer_Node_T::~ACE_Timer_Node_T"); } + + template <class TYPE, class FUNCTOR, class ACE_LOCK> ACE_Timer_Queue_Iterator_T<TYPE, FUNCTOR, ACE_LOCK>::ACE_Timer_Queue_Iterator_T (void) { @@ -197,30 +199,57 @@ ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::mutex (void) return this->mutex_; } + // Run the <handle_timeout> method for all Timers whose values are <= // <cur_time>. - template <class TYPE, class FUNCTOR, class ACE_LOCK> int ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::expire (const ACE_Time_Value &cur_time) { ACE_TRACE ("ACE_Timer_Queue_T::expire"); ACE_MT (ACE_GUARD_RETURN (ACE_LOCK, ace_mon, this->mutex_, -1)); + // Keep looping while there are timers remaining and the earliest + // timer is <= the <cur_time> passed in to the method. + + if (this->is_empty ()) + return 0; + int number_of_timers_expired = 0; + int result = 0; - ACE_Timer_Node_T<TYPE> *expired; + ACE_Timer_Node_Dispatch_Info_T<TYPE> info; - // Keep looping while there are timers remaining and the earliest - // timer is <= the <cur_time> passed in to the method. + while ((result = this->dispatch_info_i (cur_time, + info)) != 0) + { + // call the functor + this->upcall (info.type_, info.act_, cur_time); + + number_of_timers_expired++; + + } + + return number_of_timers_expired; +} + + +template <class TYPE, class FUNCTOR, class ACE_LOCK> int +ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::dispatch_info_i (const ACE_Time_Value &cur_time, + ACE_Timer_Node_Dispatch_Info_T<TYPE> &info) +{ + ACE_TRACE ("ACE_Timer_Queue_T::dispatch_info_i"); if (this->is_empty ()) return 0; - while (this->earliest_time () <= cur_time) + ACE_Timer_Node_T<TYPE> *expired = 0; + + if (this->earliest_time () <= cur_time) { expired = this->remove_first (); - TYPE type = expired->get_type (); // Need a copy, not a reference! - const void *act = expired->get_act (); + + // Get the dispatch info + expired->get_dispatch_info (info); // Check if this is an interval timer. if (expired->get_interval () > ACE_Time_Value::zero) @@ -241,16 +270,10 @@ ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::expire (const ACE_Time_Value &cur_ti this->free_node (expired); } - // call the functor - this->upcall (type, act, cur_time); - - number_of_timers_expired++; - - if (this->is_empty ()) - break; + return 1; } - return number_of_timers_expired; + return 0; } diff --git a/ace/Timer_Queue_T.h b/ace/Timer_Queue_T.h index 574437042b8..721c4c7f4c6 100644 --- a/ace/Timer_Queue_T.h +++ b/ace/Timer_Queue_T.h @@ -25,6 +25,23 @@ #include "ace/Test_and_Set.h" /** + * @class ACE_Timer_Node_Dispatch_Info_T + * + * @brief Maintains generated dispatch information for Timer nodes. + * + */ +template <class TYPE> +class ACE_Timer_Node_Dispatch_Info_T +{ +public: + // The type of object held in the queue + TYPE type_; + + /// Asynchronous completion token associated with the timer. + const void *act_; +}; + +/** * @class ACE_Timer_Node_T * * @brief Maintains the state associated with a Timer entry. @@ -39,6 +56,9 @@ public: /// Dtor. ~ACE_Timer_Node_T (void); + /// Useful typedef .. + typedef ACE_Timer_Node_Dispatch_Info_T <TYPE> DISPATCH_INFO; + /// singly linked list void set (const TYPE &type, const void *a, @@ -100,6 +120,14 @@ public: /// set the timer_id. void set_timer_id (long timer_id); + /// Get the dispatch info. The dispatch information is got + /// through <info>. This form helps us in preventing allocation and + /// deleting data along the criticl path. + /// @@TODO: We may want to have a copying version too, so that our + /// interface will be complete.. + void get_dispatch_info (DISPATCH_INFO &info); + + /// Dump the state of an TYPE. void dump (void) const; @@ -259,6 +287,14 @@ public: virtual int expire (const ACE_Time_Value ¤t_time); /** + * Get the dispatch information for a timer whose value is <= <cur_time>. + * This does not account for <timer_skew>. Returns 1 if there is a + * node whose value <= <cur_time> else returns a 0. + */ + int dispatch_info (const ACE_Time_Value ¤t_time, + ACE_Timer_Node_Dispatch_Info_T<TYPE> &info); + + /** * Run the <functor> for all timers whose values are <= * <ACE_OS::gettimeofday>. Also accounts for <timer_skew>. Returns * the number of timers canceled. @@ -315,13 +351,14 @@ public: /// after it is returned by a method like <remove_first>. virtual void return_node (ACE_Timer_Node_T<TYPE> *); -protected: + /// This method will call the <functor> with the <type>, <act> and /// <time> /* virtual */ void upcall (TYPE &type, const void *act, const ACE_Time_Value &cur_time); +protected: /// Reschedule an "interval" <ACE_Timer_Node>. virtual void reschedule (ACE_Timer_Node_T<TYPE> *) = 0; @@ -331,6 +368,10 @@ protected: /// Factory method that frees a previously allocated node. virtual void free_node (ACE_Timer_Node_T<TYPE> *); + /// Non-locking version of dispatch_info () + int dispatch_info_i (const ACE_Time_Value ¤t_time, + ACE_Timer_Node_Dispatch_Info_T<TYPE> &info); + /// Synchronization variable for <ACE_Timer_Queue>. /// NOTE: the right name would be lock_, but HP/C++ will choke on that! ACE_LOCK mutex_; diff --git a/ace/Timer_Queue_T.i b/ace/Timer_Queue_T.i index 0ee6c74a5c9..612090f9475 100644 --- a/ace/Timer_Queue_T.i +++ b/ace/Timer_Queue_T.i @@ -120,6 +120,14 @@ ACE_Timer_Node_T<TYPE>::set_timer_id (long timer_id) this->timer_id_ = timer_id; } +template <class TYPE> ACE_INLINE void +ACE_Timer_Node_T<TYPE>::get_dispatch_info (DISPATCH_INFO &info) +{ + // Yes, do a copy + info.type_ = this->type_; + info.act_ = this->act_; +} + template <class TYPE, class FUNCTOR, class ACE_LOCK> ACE_INLINE void ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::timer_skew (const ACE_Time_Value &skew) { @@ -141,10 +149,20 @@ ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::expire (void) return 0; } +template <class TYPE, class FUNCTOR, class ACE_LOCK> int +ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::dispatch_info (const ACE_Time_Value &cur_time, + ACE_Timer_Node_Dispatch_Info_T<TYPE> &info) +{ + ACE_TRACE ("ACE_Timer_Queue_T::dispatch_info"); + ACE_MT (ACE_GUARD_RETURN (ACE_LOCK, ace_mon, this->mutex_, 0)); + + return this->dispatch_info_i (cur_time, info); +} + template <class TYPE, class FUNCTOR, class ACE_LOCK> ACE_INLINE void ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::upcall (TYPE &type, - const void *act, - const ACE_Time_Value &cur_time) + const void *act, + const ACE_Time_Value &cur_time) { this->upcall_functor ().timeout (*this, type, act, cur_time); } @@ -168,4 +186,3 @@ ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK>::upcall_functor (void) { return *this->upcall_functor_; } - |