// $Id$ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(ace, TP_Reactor, "$Id$") ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor) int ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Token_Guard::grab_token"); // The order of these events is very subtle, modify with care. // Try to grab the lock. If someone if already there, don't wake // them up, just queue up in the thread pool. int result = 0; if (max_wait_time) { ACE_Time_Value tv = ACE_OS::gettimeofday (); tv += *max_wait_time; ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook, 0, &tv)); } else { ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); } // Now that this thread owns the token let us make // Check for timeouts and errors. if (result == -1) { if (errno == ETIME) return 0; else return -1; } // We got the token and so let us mark ourselves as owner this->owner_ = 1; return result; } int ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Token_Guard::acquire_token"); // Try to grab the lock. If someone if already there, don't wake // them up, just queue up in the thread pool. int result = 0; if (max_wait_time) { ACE_Time_Value tv = ACE_OS::gettimeofday (); tv += *max_wait_time; ACE_MT (result = this->token_.acquire (0, 0, &tv)); } else { ACE_MT (result = this->token_.acquire ()); } // Now that this thread owns the token let us make // Check for timeouts and errors. if (result == -1) { if (errno == ETIME) return 0; else return -1; } // We got the token and so let us mark ourseleves as owner this->owner_ = 1; return result; } ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals, int s_queue) : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } ACE_TP_Reactor::ACE_TP_Reactor (size_t size, int rs, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int mask_signals, int s_queue) : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals, s_queue) { ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor"); this->supress_notify_renew (1); } int ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); if (o_id) *o_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::owner (ACE_thread_t *t_id) { ACE_TRACE ("ACE_TP_Reactor::owner"); *t_id = ACE_Thread::self (); return 0; } int ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); // Stash the current time -- the destructor of this object will // automatically compute how much time elapsed since this method was // called. ACE_Countdown_Time countdown (max_wait_time); // The order of these events is very subtle, modify with care. // Instantiate the token guard which will try grabbing the token for // this thread. ACE_TP_Token_Guard guard (this->token_); int result = guard.grab_token (max_wait_time); // If the guard is NOT the owner just return the retval if (!guard.is_owner ()) return result; // After getting the lock just just for deactivation.. if (this->deactivated_) return -1; // Update the countdown to reflect time waiting for the token. countdown.update (); return this->dispatch_i (max_wait_time, guard); } int ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh, ACE_Reactor_Mask mask) { int result = 0; // Artificial scoping for grabbing and releasing the token { ACE_TP_Token_Guard guard (this->token_); // Acquire the token result = guard.acquire_token (); if (!guard.is_owner ()) return result; // Call the remove_handler_i () with a DONT_CALL mask. We dont // want to call the handle_close with the token held. result = this->remove_handler_i (eh->get_handle (), mask | ACE_Event_Handler::DONT_CALL); if (result == -1) return -1; } // Close down the unless we've been instructed not // to. if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)) eh->handle_close (ACE_INVALID_HANDLE, mask); return 0; } int ACE_TP_Reactor::remove_handler (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_Event_Handler *eh = 0; int result = 0; // Artificial scoping for grabbing and releasing the token { ACE_TP_Token_Guard guard (this->token_); // Acquire the token result = guard.acquire_token (); if (!guard.is_owner ()) return result; size_t slot = 0; eh = this->handler_rep_.find (handle, &slot); if (eh == 0) return -1; // Call the remove_handler_i () with a DONT_CALL mask. We dont // want to call the handle_close with the token held. result = this->remove_handler_i (handle, mask | ACE_Event_Handler::DONT_CALL); if (result == -1) return -1; } // Close down the unless we've been instructed not // to. // @@ Note: The check for result ==0 may be redundant, but shouldnt // be a problem. if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)) eh->handle_close (handle, mask); return 0; } int ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles, ACE_Reactor_Mask m) { // Array of corresponding to ACE_Event_Handler **aeh = 0; // Allocate memory for the size of the handle set ACE_NEW_RETURN (aeh, ACE_Event_Handler *[handles.num_set ()], -1); size_t index = 0; // Artificial scoping for grabbing and releasing the token { ACE_TP_Token_Guard guard (this->token_); // Acquire the token int result = guard.acquire_token (); if (!guard.is_owner ()) return result; ACE_HANDLE h; ACE_Handle_Set_Iterator handle_iter (handles); while ((h = handle_iter ()) != ACE_INVALID_HANDLE) { size_t slot = 0; ACE_Event_Handler *eh = this->handler_rep_.find (h, &slot); if (this->remove_handler_i (h, m | ACE_Event_Handler::DONT_CALL) == -1) { delete [] aeh; return -1; } aeh [index] = eh; index ++; } } // Close down the unless we've been instructed not // to. if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0) { for (size_t i = 0; i < index; i++) aeh[i]->handle_close (ACE_INVALID_HANDLE, m); } delete [] aeh; return 0; } int ACE_TP_Reactor::remove_handler (int /*signum*/, ACE_Sig_Action * /*new_disp*/, ACE_Sig_Action * /*old_disp*/, int /*sigkey*/) { ACE_NOTSUP_RETURN (-1); } int ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & /*sigset*/) { ACE_NOTSUP_RETURN (-1); } int ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard) { int event_count = this->get_event_for_dispatching (max_wait_time); int result = 0; // Note: We are passing the around, to have record of // how many events still need processing. May be this could be // useful in future. // Dispatch signals if (event_count == -1) { // Looks like we dont do any upcalls in dispatch signals. If at // a later point of time, we decide to handle signals we have to // release the lock before we make any upcalls.. What is here // now is not the right thing... // @@ We need to do better.. return this->handle_signals (event_count, guard); } // If there are no signals and if we had received a proper // event_count then first look at dispatching timeouts. We need to // handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... // NOTE: The event count does not have the number of timers that // needs dispatching. But we are still passing this along. We dont // need to do that. In the future we *may* have the timers also // returned through the . Just passing that along for // that day. result = this->handle_timer_events (event_count, guard); if (result > 0) return result; // Else justgo ahead fall through for further handling if (event_count > 0) { // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multiple-threads that // are trying to update the . result = this->handle_notify_events (event_count, guard); if (result > 0) return result; // Else just fall through for further handling } if (event_count > 0) { // Handle socket events return this->handle_socket_events (event_count, guard); } return 0; } int ACE_TP_Reactor::handle_signals (int & /*event_count*/, ACE_TP_Token_Guard & /*guard*/) { ACE_TRACE ("ACE_TP_Reactor::handle_signals"); /* * * THIS METHOD SEEMS BROKEN * * */ // First check for interrupts. // Bail out -- we got here since