From 654c72de191da54b4ed4d083b8149911ef281fa8 Mon Sep 17 00:00:00 2001 From: bala Date: Wed, 29 Aug 2001 22:47:04 +0000 Subject: *** empty log message *** --- ace/TP_Reactor.cpp | 460 +++++++++++++++++++++++++++-------------------------- ace/TP_Reactor.h | 42 ++--- ace/TP_Reactor.i | 3 +- 3 files changed, 255 insertions(+), 250 deletions(-) diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp index 8b8fbc97c40..48fa9981881 100644 --- a/ace/TP_Reactor.cpp +++ b/ace/TP_Reactor.cpp @@ -4,7 +4,6 @@ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" -#include "ace/Log_Msg.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" @@ -56,9 +55,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) return result; } -/************************************************************************/ -// Methods for ACE_TP_Reactor -/************************************************************************/ ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, @@ -80,23 +76,25 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size, this->supress_notify_renew (1); } - - -void -ACE_TP_Reactor::max_notify_iterations (int /*iterations*/) +int +ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + if (o_id) + *o_id = ACE_Thread::self (); + + return 0; - ACE_ERROR ((LM_ERROR, - "(%P|%t) This has no effect on the notify processing \n")); } int -ACE_TP_Reactor::max_notify_iterations (void) +ACE_TP_Reactor::owner (ACE_thread_t *t_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + *t_id = ACE_Thread::self (); return 0; + } @@ -105,13 +103,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); - int result = 0; - // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); + // The order of these events is very subtle, modify with care. + + int result = 0; // Instantiate the token guard which will try grabbing the token for // this thread. ACE_TP_Token_Guard guard (this->token_, @@ -126,21 +125,7 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) countdown.update (); - // After acquiring the lock, check if we have been deactivated. If - // we are deactivated, simply return without handling further - // events. - if (this->deactivated_) - { - return -1; - } - - // We got the lock, lets handle some events. We collect the events - // that we need to handle. We release the token and then handle - // those events that needs handling. - int event_count = - this->get_event_for_dispatching (max_wait_time); - - + int event_count = 0; // Dispatch signals if (event_count == -1) @@ -185,121 +170,17 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // Else just fall through for further handling } - - if (event_count > 0) - { - // Handle socket events - return this->handle_socket_events (event_count, - guard); - } - - return 0; -} - -int -ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::handle_events"); - return this->handle_events (&max_wait_time); -} - -int -ACE_TP_Reactor::resumable_handler (void) -{ - return 1; -} - -int -ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, - ace_mon, this->token_, -1)); - - int result = 0; - - // If it looks like the handle isn't suspended, then - // set the ops on the wait_set_, otherwise set the suspend_set_. - - if (this->suspend_set_.rd_mask_.is_set (handle) == 0 - && this->suspend_set_.wr_mask_.is_set (handle) == 0 - && this->suspend_set_.ex_mask_.is_set (handle) == 0) - - result = this->bit_ops (handle, mask, - this->wait_set_, - ops); - else - - result = this->bit_ops (handle, mask, - this->suspend_set_, - ops); - - return result; -} - - -int -ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - return this->mask_ops (eh->get_handle (), mask, ops); -} - - -int -ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - if (o_id) - *o_id = ACE_Thread::self (); - - return 0; - -} - -int -ACE_TP_Reactor::owner (ACE_thread_t *t_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - *t_id = ACE_Thread::self (); - - return 0; -} - - -int -ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching"); - // If the reactor handler state has changed, clear any remembered - // ready bits and re-scan from the master wait_set. - if (this->state_changed_) - { - this->ready_set_.rd_mask_.reset (); - this->ready_set_.wr_mask_.reset (); - this->ready_set_.ex_mask_.reset (); - this->state_changed_ = 0; - } - else + event_count = 1; + if (event_count > 0) { - // This is a hack... somewhere, under certain conditions (which - // I don't understand...) the mask will have all of its bits clear, - // yet have a size_ > 0. This is an attempt to remedy the affect, - // without knowing why it happens. - this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); - this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); - this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); + // Handle socket events + return this->handle_socket_events (max_wait_time, + guard); } - return this->wait_for_multiple_events (this->ready_set_, - max_wait_time); + return 0; } - int ACE_TP_Reactor::handle_signals (int & /*event_count*/, ACE_TP_Token_Guard & /*guard*/) @@ -406,43 +287,54 @@ ACE_TP_Reactor::handle_notify_events (int &event_count, } int -ACE_TP_Reactor::handle_socket_events (int &event_count, +ACE_TP_Reactor::handle_socket_events (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard) { - ACE_TRACE ("ACE_TP_Reactor::handle_socket_events"); + // We got the lock, lets handle some events. Note: this method will + // *not* dispatch any I/O handlers. It will dispatch signals, + // timeouts, and notifications. ACE_EH_Dispatch_Info dispatch_info; - - // Get the socket event dispatch information - // If there is any event handler that is ready to be dispatched, the - // dispatch information is recorded in dispatch_info. int result = - this->get_socket_event_info (dispatch_info); + this->dispatch_i (max_wait_time, dispatch_info); + if (result == -1) + { + return -1; + } - if (result) + // If there is any event handler that is ready to be dispatched, the + // dispatch information is recorded in dispatch_info. + if (dispatch_info.dispatch ()) { // Suspend the handler so that other threads don't start // dispatching it. - this->suspend_i (dispatch_info.handle_); - - // Decrement the number of events that needs handling yet. - event_count--; - - // Release the token before dispatching notifies... - guard.release_token (); + // Make sure we never suspend the notify_handler_ without holding + // the lock. + // @@ Actually, we don't even need to suspend the notify_handler_ + // here. But let me get it to work first. + if (dispatch_info.event_handler_ != this->notify_handler_) + this->suspend_i (dispatch_info.handle_); + } - result = this->dispatch_socket_events (dispatch_info); + // Release the lock. Others threads can start waiting. + guard.release_token (); + // If there was an event handler ready, dispatch it. + if (dispatch_info.dispatch ()) + { + if (this->notify_handle (dispatch_info) == 0) + ++result; // Dispatched one more event int flag = 0; if (dispatch_info.event_handler_ != 0) { - flag = - dispatch_info.event_handler_->resume_handler (); + flag = + dispatch_info.event_handler_->resume_handler (); } if (dispatch_info.handle_ != ACE_INVALID_HANDLE && + dispatch_info.event_handler_ != this->notify_handler_ && flag == 0) this->resume_handler (dispatch_info.handle_); } @@ -450,67 +342,94 @@ ACE_TP_Reactor::handle_socket_events (int &event_count, return result; } - -ACE_HANDLE -ACE_TP_Reactor::get_notify_handle (void) +int +ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) { - // Call the notify handler to get a handle on which we would have a - // notify waiting - ACE_HANDLE read_handle = - this->notify_handler_->notify_handle (); + ACE_TRACE ("ACE_TP_Reactor::mask_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, + ace_mon, this->token_, -1)); - // Check whether the rd_mask has been set on that handle. If so - // return the handle. - if (read_handle != ACE_INVALID_HANDLE && - this->ready_set_.rd_mask_.is_set (read_handle)) - { - return read_handle; - } + int result = 0; - return ACE_INVALID_HANDLE; -} + // If it looks like the handle isn't suspended, then + // set the ops on the wait_set_, otherwise set the suspend_set_. + + if (this->suspend_set_.rd_mask_.is_set (handle) == 0 + && this->suspend_set_.wr_mask_.is_set (handle) == 0 + && this->suspend_set_.ex_mask_.is_set (handle) == 0) + + result = this->bit_ops (handle, mask, + this->wait_set_, + ops); + else + result = this->bit_ops (handle, mask, + this->suspend_set_, + ops); + + return result; +} int -ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) +ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, + ACE_EH_Dispatch_Info &event) { + int result = -1; + + event.reset (); // Nothing to dispatch yet + + // If the reactor handler state has changed, clear any remembered + // ready bits and re-scan from the master wait_set. + if (this->state_changed_) + { + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + this->state_changed_ = 0; + } + else + { + // This is a hack... somewhere, under certain conditions (which + // I don't understand...) the mask will have all of its bits clear, + // yet have a size_ > 0. This is an attempt to remedy the affect, + // without knowing why it happens. + + //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500)) + // SunCC seems to be having problems with this piece of code + // here. I am not sure why though. This works fine with other + // compilers. As we dont seem to understand when this piece of + // code is needed and as it creates problems for SunCC we will + // not compile this. Most of the tests in TAO seem to be happy + // without this in SunCC. + this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); + this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); + this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); + //# endif /* ! __SUNPRO_CC */ + + } + + int active_handle_count = this->wait_for_multiple_events (this->ready_set_, + max_wait_time); + + int handlers_dispatched = 0; + int signal_occurred = 0; // Check for dispatch in write, except, read. Only catch one, but if // one is caught, be sure to clear the handle from each mask in case // there is more than one mask set for it. This would cause problems // if the handler is suspended for dispatching, but its set bit in // another part of ready_set_ kept it from being dispatched. + int found_io = 0; ACE_HANDLE handle; - // Look at the read masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); - - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; - - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::READ_MASK, - &ACE_Event_Handler::handle_input); - this->ready_set_.rd_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.ex_mask_.clr_bit (handle); - return 1; - } - } - - // We havent found any rd_masks for processing yet, so look for - // write masks { ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) { if (this->is_suspended_i (handle)) continue; @@ -523,45 +442,89 @@ ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) this->ready_set_.wr_mask_.clr_bit (handle); this->ready_set_.ex_mask_.clr_bit (handle); this->ready_set_.rd_mask_.clr_bit (handle); - return 1; + found_io = 1; } } - // We havent found any rd_mask and wr_masks for processing yet, so - // look for ex_masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::EXCEPT_MASK, + &ACE_Event_Handler::handle_exception); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + found_io = 1; + } + } - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::EXCEPT_MASK, - &ACE_Event_Handler::handle_exception); - this->ready_set_.ex_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.rd_mask_.clr_bit (handle); - return 1; - } + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::READ_MASK, + &ACE_Event_Handler::handle_input); + this->ready_set_.rd_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + found_io = 1; + } } - // We didnt find any.. - return 0; + result = signal_occurred + handlers_dispatched; + + return result; } int -ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) +ACE_TP_Reactor::dispatch_i_protected (ACE_Time_Value *max_wait_time, + ACE_EH_Dispatch_Info &event) { - ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events"); + int result; + + ACE_SEH_TRY + { + result = this->dispatch_i (max_wait_time, event); + } + ACE_SEH_EXCEPT (this->release_token ()) + { + // As it stands now, we catch and then rethrow all Win32 + // structured exceptions so that we can make sure to release the + // lock correctly. + } - ACE_HANDLE handle = info.handle_; - ACE_Event_Handler *event_handler = info.event_handler_; - ACE_Reactor_Mask mask = info.mask_; - ACE_EH_PTMF callback = info.callback_; + return result; + +} + + +// Dispatches a single event handler +int +ACE_TP_Reactor::notify_handle (ACE_EH_Dispatch_Info &dispatch_info) +{ + ACE_TRACE ("ACE_TP_Reactor::notify_handle"); + + ACE_HANDLE handle = dispatch_info.handle_; + ACE_Event_Handler *event_handler = dispatch_info.event_handler_; + ACE_Reactor_Mask mask = dispatch_info.mask_; + ACE_EH_PTMF callback = dispatch_info.callback_; // Check for removed handlers. if (event_handler == 0) @@ -583,17 +546,37 @@ ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) this->remove_handler (handle, mask); // As the handler is no longer valid, invalidate the handle - info.event_handler_ = 0; - info.handle_ = ACE_INVALID_HANDLE; + dispatch_info.event_handler_ = 0; + dispatch_info.handle_ = ACE_INVALID_HANDLE; return retval; } + // assert (status >= 0); + return 0; +} + +int +ACE_TP_Reactor::resumable_handler (void) +{ return 1; } +ACE_INLINE int +ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) +{ + return ACE_Select_Reactor::handle_events (max_wait_time); +} + +ACE_INLINE int +ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops) +{ + return this->mask_ops (eh->get_handle (), mask, ops); +} -void +ACE_INLINE void ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_Reactor_Mask, ACE_Handle_Set &, @@ -603,3 +586,22 @@ ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called"))); } + +ACE_HANDLE +ACE_TP_Reactor::get_notify_handle (void) +{ + // Call the notify handler to get a handle on which we would have a + // notify waiting + ACE_HANDLE read_handle = + this->notify_handler_->notify_handle (); + + // Check whether the rd_mask has been set on that handle. If so + // return the handle. + if (read_handle != ACE_INVALID_HANDLE && + this->ready_set_.rd_mask_.is_set (read_handle)) + { + return read_handle; + } + + return ACE_INVALID_HANDLE; +} diff --git a/ace/TP_Reactor.h b/ace/TP_Reactor.h index b916aa81a22..2dc0f48b69c 100644 --- a/ace/TP_Reactor.h +++ b/ace/TP_Reactor.h @@ -32,13 +32,12 @@ #include "ace/pre.h" #include "ace/Select_Reactor.h" - +#include "ace/Log_Msg.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ - /** * @class ACE_EH_Dispatch_Info * @@ -126,7 +125,6 @@ private: ACE_UNIMPLEMENTED_FUNC (ACE_TP_Token_Guard (void)) }; - /** * @class ACE_TP_Reactor * @@ -184,10 +182,6 @@ public: ACE_Timer_Queue * = 0, int mask_signals = 1); - // = Reactor calls - virtual void max_notify_iterations (int iter); - virtual int max_notify_iterations (void); - // = Event loop drivers. /** @@ -247,14 +241,24 @@ public: protected: // = Internal methods that do the actual work. - /// Get the event that needs dispatching.It could be either a - /// signal, timer, notification handlers or return possibly 1 I/O - /// handler for dispatching. In the most common use case, this would - /// return 1 I/O handler for dispatching - int get_event_for_dispatching (ACE_Time_Value *max_wait_time); + /** + * Dispatch signal, timer, notification handlers and return possibly + * 1 I/O handler for dispatching. Ideally, it would dispatch nothing, + * and return dispatch information for only one of (signal, timer, + * notification, I/O); however, the reactor mechanism is too enmeshed + * in the timer queue expiry functions and the notification class to + * do this without some significant redesign. + */ + int dispatch_i (ACE_Time_Value *max_wait_time, + ACE_EH_Dispatch_Info &event); + + int dispatch_i_protected (ACE_Time_Value *max_wait_time, + /// Only really does anything for Win32. Wraps a call to dispatch_i in an + /// ACE_SEH_TRY block. + ACE_EH_Dispatch_Info &event); int handle_signals (int &event_count, - ACE_TP_Token_Guard &g); + ACE_TP_Token_Guard &g); int handle_timer_events (int &event_count, ACE_TP_Token_Guard &g); @@ -262,7 +266,7 @@ protected: int handle_notify_events (int &event_count, ACE_TP_Token_Guard &g); - int handle_socket_events (int &event_count, + int handle_socket_events (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &g); /// This method shouldn't get called. @@ -271,13 +275,13 @@ protected: ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback); -private: - - ACE_HANDLE get_notify_handle (void); - int get_socket_event_info (ACE_EH_Dispatch_Info &info); + /// Notify the appropriate in the context of the + /// associated with that a particular event has occurred. + virtual int notify_handle (ACE_EH_Dispatch_Info &dispatch_info); - int dispatch_socket_events (ACE_EH_Dispatch_Info &info); +private: + ACE_HANDLE get_notify_handle (void); private: /// Deny access since member-wise won't work... diff --git a/ace/TP_Reactor.i b/ace/TP_Reactor.i index f169ddca174..f0b5c9fe935 100644 --- a/ace/TP_Reactor.i +++ b/ace/TP_Reactor.i @@ -71,7 +71,7 @@ ACE_TP_Token_Guard::~ACE_TP_Token_Guard (void) ACE_INLINE void ACE_TP_Token_Guard::release_token (void) { - if (this->owner_ == 1) + if (this->owner_) { ACE_MT (this->token_.release ()); @@ -90,7 +90,6 @@ ACE_TP_Token_Guard::is_owner (void) /************************************************************************/ // Methods for ACE_TP_Reactor /************************************************************************/ - ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook (void *) { -- cgit v1.2.1