// $Id$ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" //#include "ace/Thread.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(ace, TP_Reactor, "$Id$") ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor) int ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Token_Guard::grab_token"); // The order of these events is very subtle, modify with care. // Try to grab the lock. If someone if already there, don't wake // them up, just queue up in the thread pool. int result = 0; if (max_wait_time) { ACE_Time_Value tv = ACE_OS::gettimeofday (); tv += *max_wait_time; ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook, 0, &tv)); } else { ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); } // Now that this thread owns the token let us make // Check for timeouts and errors. if (result == -1) { if (errno == ETIME) return 0; else return -1; } // We got the token and so let us mark ourseleves as owner this->owner_ = 1; return result; } /************************************************************************/ // Methods for ACE_TP_Reactor /************************************************************************/ ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals) : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } ACE_TP_Reactor::ACE_TP_Reactor (size_t size, int rs, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals) : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } void ACE_TP_Reactor::max_notify_iterations (int /*iterations*/) { ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); ACE_ERROR ((LM_ERROR, "(%P|%t) This has no effect in the TP_Reactor.. \n")); } int ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); int result = 0; // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); // Instantiate the token guard which will try grabbing the token for // this thread. ACE_TP_Token_Guard guard (this->token_, max_wait_time, result); // If the guard is NOT the owner just return the retval if (!guard.is_owner ()) return result; // Update the countdown to reflect time waiting for the token. countdown.update (); // After acquiring the lock, check if we have been deactivated. If // we are deactivated, simply return without handling further // events. if (this->deactivated_) { return -1; } // We got the lock, lets handle some events. We collect the events // that we need to handle. We release the token and then handle // those events that needs handling. int event_count = this->get_event_for_dispatching (max_wait_time); // Dispatch signals if (event_count == -1) { // Looks like we dont do any upcalls in dispatch signals. If at // a later point of time, we decide to handle signals we have to // release the lock before we make any upcalls.. What is here // now is not the right thing... // @@ We need to do better.. return this->handle_signals (event_count, guard); } if (event_count > 0) { // If there are no signals and if we had received a proper // event_count then first look at dispatching timeouts. We need to // handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... int retval = this->handle_timer_events (event_count, guard); if (retval > 0) return retval; // Else just fall through for further handling } if (event_count > 0) { // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multiple-threads that // are trying to update the . int retval = this->handle_notify_events (event_count, guard); if (retval > 0) return retval; // Else just fall through for further handling } if (event_count > 0) { // Handle socket events return this->handle_socket_events (event_count, guard); } return 0; } int ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); return this->handle_events (&max_wait_time); } int ACE_TP_Reactor::resumable_handler (void) { return 1; } int ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops) { ACE_TRACE ("ACE_TP_Reactor::mask_ops"); ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, ace_mon, this->token_, -1)); int result = 0; // If it looks like the handle isn't suspended, then // set the ops on the wait_set_, otherwise set the suspend_set_. if (this->suspend_set_.rd_mask_.is_set (handle) == 0 && this->suspend_set_.wr_mask_.is_set (handle) == 0 && this->suspend_set_.ex_mask_.is_set (handle) == 0) result = this->bit_ops (handle, mask, this->wait_set_, ops); else result = this->bit_ops (handle, mask, this->suspend_set_, ops); return result; } int ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, int ops) { ACE_TRACE ("ACE_TP_Reactor::mask_ops"); return this->mask_ops (eh->get_handle (), mask, ops); } int ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); if (o_id) *o_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::owner (ACE_thread_t *t_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); *t_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching"); // If the reactor handler state has changed, clear any remembered // ready bits and re-scan from the master wait_set. if (this->state_changed_) { this->ready_set_.rd_mask_.reset (); this->ready_set_.wr_mask_.reset (); this->ready_set_.ex_mask_.reset (); this->state_changed_ = 0; } else { // This is a hack... somewhere, under certain conditions (which // I don't understand...) the mask will have all of its bits clear, // yet have a size_ > 0. This is an attempt to remedy the affect, // without knowing why it happens. this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); } return this->wait_for_multiple_events (this->ready_set_, max_wait_time); } int ACE_TP_Reactor::handle_signals (int & /*event_count*/, ACE_TP_Token_Guard & /*guard*/) { ACE_TRACE ("ACE_TP_Reactor::handle_signals"); /* * * THIS METHOD SEEMS BROKEN * * */ // First check for interrupts. // Bail out -- we got here since