// $Id$ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(ace, TP_Reactor, "$Id$") ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor) ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals) : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } ACE_TP_Reactor::ACE_TP_Reactor (size_t size, int rs, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals) : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } int ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); if (o_id) *o_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::owner (ACE_thread_t *t_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); *t_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); // The order of these events is very subtle, modify with care. // Try to grab the lock. If someone if already there, don't wake // them up, just queue up in the thread pool. int result = 0; if (max_wait_time) { ACE_Time_Value tv = ACE_OS::gettimeofday (); tv += *max_wait_time; ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook, 0, &tv)); } else { ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); } // Update the countdown to reflect time waiting for the token. countdown.update (); switch (result) { case 2: ACE_MT (this->token_.release ()); return 0; case -1: if (errno == ETIME) return 0; return -1; } // After acquiring the lock, check if we have been deactivated. if (this->deactivated_) { ACE_MT (this->token_.release ()); return -1; } // We got the lock, lets handle some events. Note: this method will // *not* dispatch any I/O handlers. It will dispatch signals, // timeouts, and notifications. ACE_EH_Dispatch_Info dispatch_info; result = this->dispatch_i_protected (max_wait_time, dispatch_info); if (result == -1) { ACE_MT (this->token_.release ()); return -1; } // If there is any event handler that is ready to be dispatched, the // dispatch information is recorded in dispatch_info. if (dispatch_info.dispatch ()) { // Suspend the handler so that other threads don't start // dispatching it. // Make sure we never suspend the notify_handler_ without holding // the lock. // @@ Actually, we don't even need to suspend the notify_handler_ // here. But let me get it to work first. if (dispatch_info.event_handler_ != this->notify_handler_) this->suspend_i (dispatch_info.handle_); } // Release the lock. Others threads can start waiting. ACE_MT (this->token_.release ()); // If there was an event handler ready, dispatch it. if (dispatch_info.dispatch ()) { if (this->notify_handle (dispatch_info) == 0) ++result; // Dispatched one more event if (dispatch_info.event_handler_ != this->notify_handler_) this->resume_handler (dispatch_info.handle_); } return result; } int ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops) { ACE_TRACE ("ACE_TP_Reactor::mask_ops"); ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, ace_mon, this->token_, -1)); int result = 0; // If it looks like the handle isn't suspended, then // set the ops on the wait_set_, otherwise set the suspend_set_. if (this->suspend_set_.rd_mask_.is_set (handle) == 0 && this->suspend_set_.wr_mask_.is_set (handle) == 0 && this->suspend_set_.ex_mask_.is_set (handle) == 0) result = this->bit_ops (handle, mask, this->wait_set_, ops); else result = this->bit_ops (handle, mask, this->suspend_set_, ops); return result; } void ACE_TP_Reactor::no_op_sleep_hook (void *) { } int ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, ACE_EH_Dispatch_Info &event) { int result = -1; event.reset (); // Nothing to dispatch yet // If the reactor handler state has changed, clear any remembered // ready bits and re-scan from the master wait_set. if (this->state_changed_) { this->ready_set_.rd_mask_.reset (); this->ready_set_.wr_mask_.reset (); this->ready_set_.ex_mask_.reset (); this->state_changed_ = 0; } else { // This is a hack... somewhere, under certain conditions (which // I don't understand...) the mask will have all of its bits clear, // yet have a size_ > 0. This is an attempt to remedy the affect, // without knowing why it happens. //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500)) // SunCC seems to be having problems with this piece of code // here. I am not sure why though. This works fine with other // compilers. As we dont seem to understand when this piece of // code is needed and as it creates problems for SunCC we will // not compile this. Most of the tests in TAO seem to be happy // without this in SunCC. this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); //# endif /* ! __SUNPRO_CC */ } int active_handle_count = this->wait_for_multiple_events (this->ready_set_, max_wait_time); int handlers_dispatched = 0; int signal_occurred = 0; // Note that we keep track of changes to our state. If any of // the dispatching ends up with this->state_changed_ being set, // state has changed as the result of an // being dispatched. This means that we // need to bail out and rerun the select() again since our // existing notion of handles in may no longer be // correct. // First check for interrupts. if (active_handle_count == -1) { // Bail out -- we got here since