diff options
Diffstat (limited to 'ace/TP_Reactor.cpp')
-rw-r--r-- | ace/TP_Reactor.cpp | 495 |
1 files changed, 259 insertions, 236 deletions
diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp index 8b8fbc97c40..2fffb8b6b9a 100644 --- a/ace/TP_Reactor.cpp +++ b/ace/TP_Reactor.cpp @@ -4,7 +4,6 @@ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" -#include "ace/Log_Msg.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" @@ -56,9 +55,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) return result; } -/************************************************************************/ -// Methods for ACE_TP_Reactor -/************************************************************************/ ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, @@ -80,23 +76,25 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size, this->supress_notify_renew (1); } - - -void -ACE_TP_Reactor::max_notify_iterations (int /*iterations*/) +int +ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + if (o_id) + *o_id = ACE_Thread::self (); + + return 0; - ACE_ERROR ((LM_ERROR, - "(%P|%t) This has no effect on the notify processing \n")); } int -ACE_TP_Reactor::max_notify_iterations (void) +ACE_TP_Reactor::owner (ACE_thread_t *t_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + *t_id = ACE_Thread::self (); return 0; + } @@ -105,42 +103,50 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); - int result = 0; - // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); + // The order of these events is very subtle, modify with care. + + // Instantiate the token guard which will try grabbing the token for // this thread. - ACE_TP_Token_Guard guard (this->token_, - max_wait_time, - result); + ACE_TP_Token_Guard guard (this->token_); + + + int result = guard.grab_token (max_wait_time); // If the guard is NOT the owner just return the retval if (!guard.is_owner ()) return result; + // After getting the lock just just for deactivation.. + if (this->deactivated_) + return -1; + // Update the countdown to reflect time waiting for the token. countdown.update (); - // After acquiring the lock, check if we have been deactivated. If - // we are deactivated, simply return without handling further - // events. - if (this->deactivated_) - { - return -1; - } + return this->dispatch_i (max_wait_time, + guard); +} - // We got the lock, lets handle some events. We collect the events - // that we need to handle. We release the token and then handle - // those events that needs handling. + +int +ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard) +{ int event_count = this->get_event_for_dispatching (max_wait_time); + int result = 0; + // Note: We are passing the <event_count> around, to have record of + // how many events still need processing. May be this could be + // useful in future. // Dispatch signals if (event_count == -1) @@ -161,14 +167,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... - int retval = this->handle_timer_events (event_count, - guard); + result = this->handle_timer_events (event_count, + guard); - if (retval > 0) - return retval; + if (result > 0) + return result; - // Else just fall through for further handling - } + // Else just fall through for further handling + } if (event_count > 0) @@ -176,128 +182,27 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multiple-threads that // are trying to update the <Reactor>. - int retval = this->handle_notify_events (event_count, - guard); + result = this->handle_notify_events (event_count, + guard); - if (retval > 0) - return retval; + if (result > 0) + return result; - // Else just fall through for further handling + // Else just fall through for further handling } - - if (event_count > 0) - { - // Handle socket events - return this->handle_socket_events (event_count, - guard); - } - - return 0; -} - -int -ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::handle_events"); - return this->handle_events (&max_wait_time); -} - -int -ACE_TP_Reactor::resumable_handler (void) -{ - return 1; -} - -int -ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, - ace_mon, this->token_, -1)); - - int result = 0; - - // If it looks like the handle isn't suspended, then - // set the ops on the wait_set_, otherwise set the suspend_set_. - - if (this->suspend_set_.rd_mask_.is_set (handle) == 0 - && this->suspend_set_.wr_mask_.is_set (handle) == 0 - && this->suspend_set_.ex_mask_.is_set (handle) == 0) - - result = this->bit_ops (handle, mask, - this->wait_set_, - ops); - else - - result = this->bit_ops (handle, mask, - this->suspend_set_, - ops); - - return result; -} - - -int -ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - return this->mask_ops (eh->get_handle (), mask, ops); -} - - -int -ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - if (o_id) - *o_id = ACE_Thread::self (); + if (event_count > 0) + { + // Handle socket events + return this->handle_socket_events (event_count, + guard); + } return 0; } -int -ACE_TP_Reactor::owner (ACE_thread_t *t_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - *t_id = ACE_Thread::self (); - return 0; -} - - -int -ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching"); - // If the reactor handler state has changed, clear any remembered - // ready bits and re-scan from the master wait_set. - if (this->state_changed_) - { - this->ready_set_.rd_mask_.reset (); - this->ready_set_.wr_mask_.reset (); - this->ready_set_.ex_mask_.reset (); - this->state_changed_ = 0; - } - else - { - // This is a hack... somewhere, under certain conditions (which - // I don't understand...) the mask will have all of its bits clear, - // yet have a size_ > 0. This is an attempt to remedy the affect, - // without knowing why it happens. - this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); - this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); - this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); - } - - return this->wait_for_multiple_events (this->ready_set_, - max_wait_time); -} int @@ -378,6 +283,7 @@ ACE_TP_Reactor::handle_timer_events (int &event_count, } + int ACE_TP_Reactor::handle_notify_events (int &event_count, ACE_TP_Token_Guard &guard) @@ -388,61 +294,83 @@ ACE_TP_Reactor::handle_notify_events (int &event_count, if (notify_handle != ACE_INVALID_HANDLE) { + // Clear the handle of the read_mask of our <ready_set_> this->ready_set_.rd_mask_.clr_bit (notify_handle); - // Decrement the number of events that needs handling yet. - event_count--; + // Now just do a read on the pipe.. + ACE_Notification_Buffer buffer; - // Release the token before dispatching notifies... - guard.release_token (); + if (this->notify_handler_->read_notify_pipe (notify_handle, + buffer) >= 0) + { + event_count--; + + // Release the token before dispatching notifies... + guard.release_token (); + + this->notify_handler_->dispatch_notify (buffer); + + this->renew (); + return 1; + } - // Dipatch and return - return - this->notify_handler_->dispatch_notify (notify_handle); + return 0; } return 0; } + int ACE_TP_Reactor::handle_socket_events (int &event_count, ACE_TP_Token_Guard &guard) { - ACE_TRACE ("ACE_TP_Reactor::handle_socket_events"); + // We got the lock, lets handle some events. Note: this method will + // *not* dispatch any I/O handlers. It will dispatch signals, + // timeouts, and notifications. ACE_EH_Dispatch_Info dispatch_info; - // Get the socket event dispatch information + this->get_socket_event_info (dispatch_info); + // If there is any event handler that is ready to be dispatched, the // dispatch information is recorded in dispatch_info. - int result = - this->get_socket_event_info (dispatch_info); - - - if (result) + if (dispatch_info.dispatch ()) { // Suspend the handler so that other threads don't start // dispatching it. - this->suspend_i (dispatch_info.handle_); + // Make sure we never suspend the notify_handler_ without holding + // the lock. + // @@ Actually, we don't even need to suspend the notify_handler_ + // here. But let me get it to work first. + if (dispatch_info.event_handler_ != this->notify_handler_) + this->suspend_i (dispatch_info.handle_); + } - // Decrement the number of events that needs handling yet. - event_count--; + /// Decrement the event left + --event_count; - // Release the token before dispatching notifies... - guard.release_token (); + // Release the lock. Others threads can start waiting. + guard.release_token (); - result = this->dispatch_socket_events (dispatch_info); + int result = 0; + // If there was an event handler ready, dispatch it. + if (dispatch_info.dispatch ()) + { + if (this->dispatch_socket_event (dispatch_info) == 0) + ++result; // Dispatched one more event int flag = 0; if (dispatch_info.event_handler_ != 0) { - flag = - dispatch_info.event_handler_->resume_handler (); + flag = + dispatch_info.event_handler_->resume_handler (); } if (dispatch_info.handle_ != ACE_INVALID_HANDLE && + dispatch_info.event_handler_ != this->notify_handler_ && flag == 0) this->resume_handler (dispatch_info.handle_); } @@ -450,67 +378,93 @@ ACE_TP_Reactor::handle_socket_events (int &event_count, return result; } +int +ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_TP_Reactor::mask_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, + ace_mon, this->token_, -1)); -ACE_HANDLE -ACE_TP_Reactor::get_notify_handle (void) + int result = 0; + + // If it looks like the handle isn't suspended, then + // set the ops on the wait_set_, otherwise set the suspend_set_. + + if (this->suspend_set_.rd_mask_.is_set (handle) == 0 + && this->suspend_set_.wr_mask_.is_set (handle) == 0 + && this->suspend_set_.ex_mask_.is_set (handle) == 0) + + result = this->bit_ops (handle, mask, + this->wait_set_, + ops); + else + + result = this->bit_ops (handle, mask, + this->suspend_set_, + ops); + + return result; +} + + + +int +ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) { - // Call the notify handler to get a handle on which we would have a - // notify waiting - ACE_HANDLE read_handle = - this->notify_handler_->notify_handle (); - // Check whether the rd_mask has been set on that handle. If so - // return the handle. - if (read_handle != ACE_INVALID_HANDLE && - this->ready_set_.rd_mask_.is_set (read_handle)) + // If the reactor handler state has changed, clear any remembered + // ready bits and re-scan from the master wait_set. + if (this->state_changed_) { - return read_handle; + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + this->state_changed_ = 0; } + else + { + // This is a hack... somewhere, under certain conditions (which + // I don't understand...) the mask will have all of its bits clear, + // yet have a size_ > 0. This is an attempt to remedy the affect, + // without knowing why it happens. - return ACE_INVALID_HANDLE; -} - + //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500)) + // SunCC seems to be having problems with this piece of code + // here. I am not sure why though. This works fine with other + // compilers. As we dont seem to understand when this piece of + // code is needed and as it creates problems for SunCC we will + // not compile this. Most of the tests in TAO seem to be happy + // without this in SunCC. + this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); + this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); + this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); + //# endif /* ! __SUNPRO_CC */ + } + return this->wait_for_multiple_events (this->ready_set_, + max_wait_time); +} int ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) { + event.reset (); // Nothing to dispatch yet // Check for dispatch in write, except, read. Only catch one, but if // one is caught, be sure to clear the handle from each mask in case // there is more than one mask set for it. This would cause problems // if the handler is suspended for dispatching, but its set bit in // another part of ready_set_ kept it from being dispatched. + int found_io = 0; ACE_HANDLE handle; - // Look at the read masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); - - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; - - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::READ_MASK, - &ACE_Event_Handler::handle_input); - this->ready_set_.rd_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.ex_mask_.clr_bit (handle); - return 1; - } - } - - // We havent found any rd_masks for processing yet, so look for - // write masks { ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) { if (this->is_suspended_i (handle)) continue; @@ -523,45 +477,67 @@ ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) this->ready_set_.wr_mask_.clr_bit (handle); this->ready_set_.ex_mask_.clr_bit (handle); this->ready_set_.rd_mask_.clr_bit (handle); - return 1; + found_io = 1; } } - // We havent found any rd_mask and wr_masks for processing yet, so - // look for ex_masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::EXCEPT_MASK, + &ACE_Event_Handler::handle_exception); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + found_io = 1; + } + } - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::EXCEPT_MASK, - &ACE_Event_Handler::handle_exception); - this->ready_set_.ex_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.rd_mask_.clr_bit (handle); - return 1; - } + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::READ_MASK, + &ACE_Event_Handler::handle_input); + this->ready_set_.rd_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + found_io = 1; + } } - // We didnt find any.. - return 0; + return found_io; } + + +// Dispatches a single event handler int -ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) +ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info) { - ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events"); + ACE_TRACE ("ACE_TP_Reactor::notify_handle"); - ACE_HANDLE handle = info.handle_; - ACE_Event_Handler *event_handler = info.event_handler_; - ACE_Reactor_Mask mask = info.mask_; - ACE_EH_PTMF callback = info.callback_; + ACE_HANDLE handle = dispatch_info.handle_; + ACE_Event_Handler *event_handler = dispatch_info.event_handler_; + ACE_Reactor_Mask mask = dispatch_info.mask_; + ACE_EH_PTMF callback = dispatch_info.callback_; // Check for removed handlers. if (event_handler == 0) @@ -583,17 +559,37 @@ ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) this->remove_handler (handle, mask); // As the handler is no longer valid, invalidate the handle - info.event_handler_ = 0; - info.handle_ = ACE_INVALID_HANDLE; + dispatch_info.event_handler_ = 0; + dispatch_info.handle_ = ACE_INVALID_HANDLE; return retval; } + // assert (status >= 0); + return 0; +} + +int +ACE_TP_Reactor::resumable_handler (void) +{ return 1; } +ACE_INLINE int +ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) +{ + return ACE_Select_Reactor::handle_events (max_wait_time); +} + +ACE_INLINE int +ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops) +{ + return this->mask_ops (eh->get_handle (), mask, ops); +} -void +ACE_INLINE void ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_Reactor_Mask, ACE_Handle_Set &, @@ -603,3 +599,30 @@ ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called"))); } + +ACE_HANDLE +ACE_TP_Reactor::get_notify_handle (void) +{ + // Call the notify handler to get a handle on which we would have a + // notify waiting + ACE_HANDLE read_handle = + this->notify_handler_->notify_handle (); + + // Check whether the rd_mask has been set on that handle. If so + // return the handle. + // if (read_handle != ACE_INVALID_HANDLE && + //this->ready_set_.rd_mask_.is_set (read_handle)) + if (read_handle != ACE_INVALID_HANDLE) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + ACE_HANDLE handle = ACE_INVALID_HANDLE; + + while ((handle = handle_iter ()) == read_handle) + { + return read_handle; + } + } + + // None found.. + return ACE_INVALID_HANDLE; +} |